# Lume: How to auto-map data in your code

After running a job to create a mapper, you can use it as an embedded pipeline in your code. This is especially helpful to create pipelines where similar data will come in, and you need to embed a static mapper after creating it with Lume.

❓ See a video walkthrough of this notebook [here](https://www.loom.com/share/63a42b2f4b6d4439a45e461ea543033c)
### Overview

This notebook contains the following 1 section:

- **Map incoming source data using an existing pipeline:** Specify a set of functions and use the Lume API to map data.

## Map incoming source data using an existing pipeline

Define your API key here:

In [None]:
api_key = '<YOUR_API_KEY>'

### Utilities

First let's define a few utilities for making calls to the Lume API.

In [None]:
%pip install httpx

In [None]:
import httpx 
import traceback
import asyncio
import os
import json


url = "https://api.lume.ai"

In [None]:
async def get_pipeline(pipeline_id):
    new_url = f'{url}/pipelines/{pipeline_id}'
    headers = {"lume-api-key": api_key}
    async with httpx.AsyncClient(timeout=60) as client:
        job = await client.get(new_url, headers=headers)
        job = job.json()
    return job

In [None]:
async def create_job(pipeline_id, data):
    new_url = f'{url}/pipelines/{pipeline_id}/jobs'
    headers = {"lume-api-key": api_key}
    payload = {
        "data": data
    }
    async with httpx.AsyncClient(timeout=60) as client:
        job = await client.post(new_url, headers=headers, json=payload)
        job = job.json()
    return job

In [None]:
async def run_job(job_id):
    new_url = f'{url}/jobs/{job_id}/run'
    headers = {"lume-api-key": api_key}
    payload = {
        "immediate_return": True # required to set this to True for polling.
    }
    async with httpx.AsyncClient(timeout=6000) as client:
        job = await client.post(new_url, headers=headers, json=payload)
        job = job.json()
    return job

In [None]:
async def get_result(result_id):
    new_url = f'{url}/results/{result_id}'
    headers = {"lume-api-key": api_key}
    async with httpx.AsyncClient(timeout=60) as client:
        job = await client.get(new_url, headers=headers)
        job = job.json()
    return job

In [None]:
async def poll_result(result_id, interval=3):
    while True:
        result = await get_result(result_id)
        if result['status'] != 'running':
            return result
        await asyncio.sleep(interval)  # Wait for the specified interval before polling again.

In [None]:
async def get_mappings_from_result(result_id, page=1, size=50):
    new_url = f'{url}/results/{result_id}/mappings'
    headers = {"lume-api-key": api_key}
    params = {
        'page': page,  # Assuming you want to access the second page
        'size': size  # Number of records per page
    }
    async with httpx.AsyncClient(timeout=60) as client:
        job = await client.get(new_url, headers=headers, params=params)
        job = job.json()
    return job 

In [None]:
# helper method to iterate over all pages of pipelines to get all pipelines
async def get_all_pipelines():
    new_url = f'{url}/pipelines' 
    headers = {"lume-api-key": api_key} 
    all_pipelines = []
    page = 1
    total_pages = None

    async with httpx.AsyncClient(timeout=60) as client:
        while total_pages is None or page <= total_pages:
            response = await client.get(f"{new_url}?page={page}", headers=headers)
            data = response.json()
            all_pipelines.extend(data['items'])
            if total_pages is None:
                total_items = data['total']
                page_size = data['size']
                total_pages = (total_items + page_size - 1) // page_size  # Calculate total pages
            page += 1

    return all_pipelines

In [None]:
async def search(model, params):
    new_url = f'{url}/search'
    headers = {"lume-api-key": api_key}
    payload = {
        "model": model,
        "params": params
    }
    async with httpx.AsyncClient(timeout=6000) as client:
        retPage = await client.post(new_url, headers=headers, json=payload)
        retPage = retPage.json()
    return retPage['items']

In [None]:
async def get_pipeline_with_name(pipeline_name):
    pipelines = await search('pipeline', {'name': pipeline_name})
    if len(pipelines) < 1:
        raise ValueError(f"Pipeline with name {pipeline_name} not found")
    pipeline = pipelines[0]
    print("Fetched pipeline", pipeline['name'])

In [None]:
async def get_all_mappings(result_id):
    mappings = []
    first_page = await get_mappings_from_result(result_id)
    mappings.extend(first_page['items'])

    total_items = first_page['total']
    page_size = first_page['size']
    total_pages = total_items // page_size + 1

    for page in range(2, total_pages + 1):
        new_mappings_page = await get_mappings_from_result(result_id, page=page)
        mappings.extend(new_mappings_page['items'])
    return mappings

### Prior Context

This cookbook assumes a pipeline has already been created, called `ecomm_test`. The existing pipeline is meant to map source ecommerce data to an internal ecommerce data model. The target schema used in the pipeline is in this cookbook's folder, as `target_schema.json`. The cell below loads the target schema. You can view it in detail in taret_schema.json within this directory.

In [None]:
target_schema_path = os.path.join(os.getcwd(), 'target_schema.json')
with open(target_schema_path) as f:
    target_schema = json.load(f)

### Getting Started

Let's access our source data and use a Lume pipeline to map it automatically.

The source data is in this cookbook's folder, as `source_data.json`. The cell below loads the source data.

In [None]:
source_data_path = os.path.join(os.getcwd(), 'source_data.json')
with open(source_data_path) as f:
    source_data = json.load(f)

Now we want to use Lume to map this source data automatically, using an existing pipeline. To do so,
1. Get the corresponding pipeline for your data.
2. Create a job for the pipeline, and provide the source data.
3. Run the job
4. Get output from the finished job

##### 1. Get the corresponding pipeline for your data
Depending on where your source data arrived (system x, api y, etc), use that knowledge to fetch the corresponding pipeline via the pipeline name, `ecomm_test` in this case. Pipeline names allow you use easily find pipelines, and customers typically use semantic text (e.g. systemA_pipeline_to_X_target) or hashes to discern this. 

In [None]:
pipeline_name = 'ecommerce-demo'
pipeline = await get_pipeline_with_name(pipeline_name)
if pipeline:
    print(f"Found pipeline: {pipeline['name']}") #printing only certain pipeline properties, for brevity. The pipeline object also stores the target schema.
else:
    print("Pipeline not found.")

##### 2. Create a job for the pipeline, and provide the source data.
Use the id of the returned pipeline to create a job. Also pass in the source data to map.

In [None]:
job = await create_job(pipeline['id'], source_data)
job

##### 3. Run the job
First call run_job, which will return a result immediately. To wait for a job to finish, poll the result endpoint until the status of the job is not `running`. Once done, the result will be `finished`.

In [None]:
initial_result = await run_job(job['id'])
result = await poll_result(initial_result['id'])
print(f"Job completed with result: {result}")

Once a job run executes to completion, a Result object is returned. The Result provides a few pieces of key information:
1. Status: the status of the job. If finished, the job executed with no errors or flags. 
2. spec: the high-order mapping logic and lookup table of the pipeline used on this job. 
3. id: the Result id. Use this to access the output mappings of the job run via Get Mappings For Result.

##### 4. Get output from the finished job

A finished job allows you to then access the spec and mapped data. First, let's access the spec via result. The spec indicates:
1. Which source properties are used to map to each target property
2. The default value is the source property is not present, or if there is no source property indicated.

There is a spec for every target property. For simplicity, let's access the spec for the `product.name` target property.

In [None]:
spec = result['spec']
product_name_spec = spec['product']['name']
product_name_spec

You can see that this mapping is doing a 1:1 extract from the `product.title` source property, and it is defaulting to `null` if the source property does not exist in the record.

Now, let's access the mapped data by querying the underlying mappings.

In [None]:
mappingsPage = await get_mappings_from_result(result['id'], page=1, size=10)
mappings = mappingsPage['items']

Now, the source data was successfully mapped by Lume to your desired target schema, and you have access to the data via `mappings`. For each source record, you can access the corresponding mapped data via the `index` and `mapped_data` properties. `mappings` is a list where each object of mapped data represents a source record.

For more detail, we can zoom in to the first record of `mappings`. Each object will have information on:
1. `index`: The index of the source record in question
2. `source_record`
2. `mapped_data`: The final mapped data record
3. `message`: either `success` or a json error. If a record is not marked as `success`, it failed a Lume type check against your specified target schema.

In [None]:
mappings[0]

##### 5. Pipe the output to your end destination
Iterate through the mappings to get the final mapped data, and send it to the next step of your workflow. Use the `get_all_mappings` utility function to iterate through all the pages of data, if needed.

In [None]:
all_mappings = await get_all_mappings(result['id'])
all_mappings

# TODO: send all_mappings to the next step in the pipeline