In [None]:
pip install lume-py

# Lume: Pipeline Workflow 

This cookbook walks you through pipeline creation, retreival, and running.







### Overview

This notebook covers the following topics:

- **Creating, Running, and Retrieving Results:** Learn how to define and use pipelines.

- **Embedding the Mapper in a Pipeline:** Instructions on how to retreive a mapper from your created or retreived pipeline for efficient data processing.

- **Handling Different Data Types:** Strategies when running multiple jobs associated with one/many pipeline.



In [None]:
import lume_py as lume

lume.set_api_key("...")

### Usage

To see piplines created under your user account, the library must be configured to your account's api-key which can be available. You can set this up by replacing the api-key quote above. Now for your user you can retrieve any pipeline that you have created in the past.

### Async

Asynchronous versions of request-making methods are available by suffixing the method name
with `_async`.

```python
# With Lume run await for any function! Tnis is because lume is a global client in its essence. 
pipeline = await lume.Pipeline.get_pipelines_data_page()


```



### Prior Context

This cookbook assumes a pipeline has already been created, called `ecomm_test`. The existing pipeline is meant to map source ecommerce data to an internal ecommerce data model. The target schema used in the pipeline is in this cookbook's folder, as `target_schema.json`. The cell below loads the target schema and source data (`source_schema.json`). 

In [3]:
import os
import json

target_data_path = os.path.join(os.getcwd(), "./data/target.json")
with open(target_data_path) as f:
    target_data = json.load(f)

source_data_path = os.path.join(os.getcwd(), "./data/source.json")
with open(source_data_path) as f:
    source_data = json.load(f)

In [None]:
# get all pipelines
pipelines = await lume.Pipeline.get_pipelines_data_page()
print(pipelines)

In [None]:
# retreive an existing pipeline by id or name
pipeline = await lume.Pipeline.get_pipeline_by_id("pipeline_id/pipeline_name")
print(pipeline)

##### 1. Get the corresponding pipeline for your data
By passing in your pipeline_id you can retrieve the pipeline you created:
- status, id, target_schema_id, source_schema_id, ...

In [None]:
pipeline = await lume.Pipeline.get_pipeline_by_id("...")

##### 2. Create a job for a created pipeline, and run that job.

In [4]:
# create a pipeline
new_pipeline = await lume.Pipeline.create(
    name="testing-cookbook-sdk-5",
    description="ecomm target schema",
    target_schema=target_data,
)
# creating and running that job
job = await new_pipeline.create_job(source_data=source_data)
result = await job.run(immediate=True)  # job runs until finished

### Retreive Mappings via Result or through running the pipeline directly.
```python
await result.get_mappings()  # Retrieves the list of associated mappings associated with a specific result.
```

In [None]:
await new_pipeline.run_pipeline(source_data=source_data)  # or retreive mapper directly

# Workflow - Seamless Pipeline Management with Async Iteration

### Automatically Iterate Through Pipelines

- **Creating, Running, and Retrieving Results:** 

In [None]:
# Example: Update all pipelines
async def update_all_pipelines():
    try:
        # Get the list of pipelines
        pipeline_data_page = await lume.Pipeline.get_pipelines_data_page()

        # Iterate through the list and update each pipeline
        for pipeline in pipeline_data_page:
            try:
                await pipeline.update(
                    name=f"updated_{pipeline.name}",
                    description="updated my pipelines w/ new target schema",
                )
                print(f"Updated pipeline {pipeline.id}")
            except Exception as e:
                print(
                    f"Failed to update pipeline, consider updating the page size. {pipeline.id}: {e}"
                )

    except Exception as e:
        print(f"Failed to retrieve pipelines: {e}")


# Run the function within an event loop
if __name__ == "__main__":
    import asyncio

    asyncio.run(update_all_pipelines())