# Test KFP Integration

- create an experiment
- create a run
- check run passes

In [1]:
!pip install kfp==1.8.22 tenacity -q

In [2]:
import kfp
import kfp.dsl as dsl

from tenacity import retry, stop_after_attempt, wait_exponential

In [3]:
client = kfp.Client()

In [4]:
EXPERIMENT_NAME = 'Simple notebook pipeline' 

In [5]:
def add(a: float, b: float) -> float:
    '''Calculates sum of two arguments'''
    print(a, '+', b, '=', a + b)
    return a + b

In [6]:
add_op = kfp.components.func_to_container_op(
    func=add,
    base_image='python:3.7',
)

In [7]:
@dsl.pipeline(
   name='Calculation pipeline',
   description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(a: float = 0, b: float = 7):
    add_task = add_op(a, 4) 
    add_2_task = add_op(a, b)
    add_3_task = add_op(add_task.output, add_2_task.output)

In [8]:
arguments = {'a': '7', 'b': '8'}
run = client.create_run_from_pipeline_func(
    calc_pipeline,
    arguments=arguments,
    experiment_name=EXPERIMENT_NAME,
)

In [9]:
client.list_experiments().experiments

[{'created_at': datetime.datetime(2023, 7, 3, 15, 1, 11, tzinfo=tzlocal()),
  'description': None,
  'id': 'ed235fc2-79db-4324-b120-e0611afe266b',
  'name': 'Simple notebook pipeline',
  'resource_references': [{'key': {'id': 'test', 'type': 'NAMESPACE'},
                           'name': None,
                           'relationship': 'OWNER'}],
  'storage_state': 'STORAGESTATE_AVAILABLE'}]

In [10]:
@retry(
    wait=wait_exponential(multiplier=2, min=1, max=10),
    stop=stop_after_attempt(30),
    reraise=True,
)
def assert_run_succeeded(client, run_id):
    """Wait for the run to complete successfully."""
    status = client.get_run(run_id).run.status
    assert status == "Succeeded", f"KFP run in {status} state."

In [11]:
# fetch KFP experiment to ensure it exists
client.get_experiment(experiment_name=EXPERIMENT_NAME)

assert_run_succeeded(client, run.run_id)