## Data Pipeline Tutorial


To get started, you will need your `API_KEY` and `TENANT` in your .env file, check out the readme for more information.  This also assumes you have all of the dependancies installed in requirements.txt, like pandas, requests, and load_dotenv.

Data can be extracted from ediphi using the `external/data/pipeline` endpoint.  This is authenticated with an api key, and is a POST call with a payload.  First lets set up a method to call the pipe, grabbing our api key and tenant first.


In [None]:
import os
import json
import requests
from dotenv import load_dotenv

load_dotenv()


def call_pipe(payload):
    url = "https://api.ediphi.com/api/external/data/pipeline"
    headers = {
        "api-token": os.getenv("API_KEY"),
        "api-tenant": os.getenv("TENANT"),
        "Content-Type": "application/json",
    }
    res = requests.post(url, headers=headers, json=payload, timeout=30)
    if res.status_code != 200:
        error_msg = (
            res.json().get("error", {}).get("message", "Unknown pipeline error")
        ) + json.dumps(payload)
        raise requests.exceptions.HTTPError(error_msg)
    return res.json().get("data", {}).get("load", {})

Now lets look at the payload that we pass to this endpoint, here's a method that will return all records of a table by passing the table name:

In [None]:
def get_table(table_name, properties=None):
    if table_name == "line_items" and not properties:
        raise OverflowError("line_items table is too large to load without properties")

    payload = {
        f"#{table_name}#": {
            "table": table_name,
            "operation": {"method": "load.multiple()"},
        }
    }
    if properties:
        payload[f"#{table_name}#"]["operation"]["properties"] = properties

    data = call_pipe(payload)
    return data.get(table_name, {})

Notice the first if statement. Please don't ever make a call like `get_table('line_items')` without filtering by properties because this table is typically bigger than all the other tables combined size wise.  This is a big hual of data that could be in the GBs.

Generally, other tables like regions can be returned like this:

In [None]:
regions = get_table("regions")
regions

Or we can get resources based on one or more properties like project if we know the project name

In [None]:
project_name = "UPC"
project = get_table("projects", {"name": project_name})
project

So the payload looks like this, with comments added

```
{
    '#project#':{
        'table': 'projects' // check out sample_data.js to see commonly used tables
        'operation': {
            'method': 'load' // load for one record, load.multiple() for more
            'properties':{ // check out the sample_data.js for table properties
                'id': '80d8a417-de00-4bdc-bafa-ddd2d753fea5'
            }
        }
    }
}
```

The `#project#` can be any string wrapped in `#` and can then used as a property for another resource in the request.  And you can use `load.multiple()` as the method to return more than one record.  

For instance, check out this request that returns all estimates for a project based on the project name

In [None]:
project_name = "UPC"
payload = {
    "#project#": {
        "table": "projects",
        "operation": {
            "method": "load",
            "properties": {
                "name": project_name
                }
            }
        },
    "#estimates#": {
        "table": "estimates",
        "operation": {
            "method": "load.multiple()",
            "properties": {
                "project": "#project#"
                }
            }
        }
    }
data = call_pipe(payload)
estimates = data["estimates"]

The response from the data/pipeline endpoint look like this:
```
{
    'success': True,
    'data':{
        'idMap':{},
        'variables':{}, 
        'load':{},
        'save':{},
    }
}
```

Within the `data` key, `variables` and `save` are typically `{}`, `idMap` contains all the ids of the returned records, but really the only value you need is `load`, which returns all the records that you requested.  Which is why in the `call_pipe` method, the last line is: `return res.json().get("data", {}).get("load", {})`

Using pandas to visualize the result

In [None]:
import pandas as pd
pd.DataFrame(estimates)

Another common need is retrieving sort fields and sort codes. Sort fields are like additional columns for line items so that the items can be cross coded. Then there are sort codes in each sort field that can be applied to the line items.  So this payload will get a sort field by name, and all of it's sort codes. 

In [None]:
properties = {"name": "Bid Package"}
payload = {
    "#sort_field#": {
        "table": "sort_fields",
        "operation": {
            "method": "load",
            "properties": properties,
        },
    },
    "#sort_code_{index.count()}#": {
        "table": "sort_codes",
        "operation": {
            "method": "load.multiple()",
            "properties": {"sort_field": "#sort_field#"},
        },
    },
}
data = call_pipe(payload)
data

This is commonly used, so we've added it to the helper functions in here as well.  Another helper function returns all estimate line items, but if you check out the sample data, the sort fields of a line item are within its `extras` property and follow a pattern like `{sort_field.id: sort_code.id}` so the helper function fetches those resources and applies it to the line items is a more useful way


In [None]:
from utils.helpers import get_line_items_for_estimate
line_items = get_line_items_for_estimate(estimates[0]["id"])
pd.DataFrame(line_items)

There's a lot more to an ediphi estimate than just line items, such as markups, areas of each floor, rooms, and more. There is a dedicated endpoint to return all of the data of an estimate.  

In [None]:
def get_full_estimate(estimate_id):

    url = f"https://api.ediphi.com/api/external/estimates/{estimate_id}"
    headers = {
        "api-token": os.getenv("API_KEY"),
        "api-tenant": os.getenv("TENANT"),
        "Content-Type": "application/json",
    }
    res = requests.get(url, headers=headers, timeout=30)
    if res.status_code != 200:
        error_msg = (
            res.json().get("error", {}).get("message", "Unknown pipeline error")
        )
        raise requests.exceptions.HTTPError(error_msg)
    return res.json().get("sanitizedEstimate", {})

estimate = get_full_estimate(estimates[0]["id"])
estimate

In some cases, you may only want to download data that has recently changed, such as estimates that have been updated within the last X days. Currently, we don’t support passing a “date greater than” filter directly in the request payload. However, the key objects that contain significant data—such as Estimates, GCGRs, Bid Tabs, and Schedules—are structured in a way that makes it easy to track changes.

For example, with Estimates (more on other objects in separate tutorials, though they follow a similar structure), the database record for an `estimate` contains only its metadata. However, we trigger updates to the `updated_at` timestamp whenever any part of an `estimate` or any of its related child objects changes. Additionally, the parent project’s `updated_at` field is also updated in such cases.

A common pattern for retrieving recently updated data is:

1.	First, fetch all projects and identify the ones with recent updates.
2.	Then, fetch the recent estimates for each of those projects.

In [None]:
from datetime import datetime, timedelta
from dateutil import parser

def filter_recent_updates(data, days_within):
    cutoff_date = datetime.now() - timedelta(days=days_within)
    
    recent_items = [
        item for item in data 
        if parser.isoparse(item["updated_at"]).replace(tzinfo=None) > cutoff_date 
        and item.get("deleted_at") is None
    ]
    
    return recent_items

projects = get_table("projects")
days_within = 7
recent_projects = filter_recent_updates(projects, days_within)
recent_estimates = []

for project in recent_projects:
    estimates = get_table("estimates", {"project": project["id"]})
    recent_estimates.extend(filter_recent_updates(estimates, days_within))

recent_estimates

Then each full estimate can be requested using the special purpose endpoint used in the `get_full_estimate` method created earlier.  Each one of these is a sizable data haul, so the pattern is to find the ones of interest using their metadata, then only requesting the ones you need.

We are planning on improving our api for these use cases to mitigate the need to return all projects. But projects are also a very small as a record, but have many child objects that can be retrieved more directly from this approach.

The only resources we support creating with the data pipeline are projects by using `operation.method: save` like this

In [None]:
def create_project(name, region_id, owner_id, industry, task_number):
    payload = {
        "#project#": {
            "table": "projects",
            "operation": {
                "method": "save",
                "properties": {
                    "name": name,
                    "region": region_id,
                    "owner": owner_id,
                    "industry": industry,
                    "task_number": task_number,
                },
            },
        }
    }
    
    url = "https://api.ediphi.com/api/external/data/pipeline"
    
    headers = {
            "api-token": os.getenv("API_KEY"),
            "api-tenant": os.getenv("TENANT"),
            "Content-Type": "application/json",
        }
    
    res = requests.post(url, headers=headers, json=payload, timeout=30)

    if res.status_code != 200:
        raise requests.exceptions.HTTPError("pipeline error")
    return res.json().get("data", {}).get("save", {})


You can see that the minimum required fields are `name, region_id, owner_id, industry, task_number` so lets get some values for those first.

In [None]:
name = "test"
regions = get_table("regions")
region_id = regions[0]["id"]
mike = get_table("users", {"email": "michael@ediphi.com"})[0]
owner_id = mike["id"]
industries = get_table("industries")
industry = industries[0]
task_number = "1.618"

and now lets create the project

In [None]:
response = create_project(name, region_id, owner_id, industry, task_number)
new_project = response["projects"][0]

It might be helpful to turn this response into a url you can use to check out your new project, or share the url with the project owner.

In [None]:

url = f"https://{os.getenv('TENANT')}.ediphi.com/projects/{new_project['id']}"
url

This should give you a base understanding of the data pipeline.  If you have any questions reach out to one of our leaders of our data engineering team:

Swan Sodja, Senior Data Engineer 
swan@ediphi.com

Colby Ajoku, Director of Partnerships & Integrations
colby@ediphi.com

Mike Navarro, CTO
michael@ediphi.com