# **Cocoon: Semantic Data Lineage**


## **You Need...**

1. LLM API (cost is typically <50 cents per table)
2. Compiled DBT directory



In [None]:
! pip install cocoon_data==0.1.114

In [1]:
from cocoon_data import *

In [None]:
#@title  Download example dbt project (skip this step if you have your own)
import requests
import os
import base64

def download_github_directory(repo_owner, repo_name, directory_path, local_path):
    # GitHub API endpoint
    api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{directory_path}"

    # Send a GET request to the GitHub API
    response = requests.get(api_url)

    # Check if the request was successful
    if response.status_code == 200:
        # Create the local directory if it doesn't exist
        os.makedirs(local_path, exist_ok=True)

        # Parse the JSON response
        contents = response.json()

        # Loop through each file in the directory
        for item in contents:
            if item['type'] == 'file':
                # Get the file content
                file_content = requests.get(item['download_url']).content

                # Save the file locally
                file_path = os.path.join(local_path, item['name'])
                with open(file_path, 'wb') as file:
                    file.write(file_content)
                print(f"Downloaded: {item['name']}")
            elif item['type'] == 'dir':
                # If it's a subdirectory, recursively download its contents
                subdir_path = os.path.join(directory_path, item['name'])
                local_subdir_path = os.path.join(local_path, item['name'])
                download_github_directory(repo_owner, repo_name, subdir_path, local_subdir_path)
    else:
        print(f"Failed to retrieve directory contents. Status code: {response.status_code}")

# Usage
repo_owner = "Cocoon-Data-Transformation"
repo_name = "cocoon"
directory_path = "documentation/dbt_projects/dbt_amazon_ads"
dbt_directory = "./dbt_amazon_ads"

download_github_directory(repo_owner, repo_name, directory_path, dbt_directory)

In [2]:
#@title  Read your dbt project
dbt_path = './dbt_amazon_ads/'
# please compile your dbt project and generate docs, as we need the relavant json
# by default, we will read manifest.json from target and catalog.json from docs
# if they are in different paths, please provide:
# manifest_path = os.path.join(dbt_path, 'target', 'manifest.json')
# catalog_path = os.path.join(dbt_path, 'docs', 'catalog.json')

# make sure the following variables are available
# nodes are a list of model name
# edges are a list of edge (from_idx, to_idx) for table lineage
# sql_mapping mpas model name to sql
# column mapping maps model name to column details
nodes, edges, sql_mapping, column_mapping = build_lineage_graph(dbt_path)
# nodes, edges, sql_mapping, column_mapping = build_lineage_graph(dbt_path, manifest_path=manifest_path, catalog_path=catalog_path)

image = generate_workflow_image(nodes, edges, format='svg')
display(HTML(wrap_image_in_html(image, format='svg')))

print("Nodes:")
for i, node in enumerate(nodes):
    sql_content = sql_mapping.get(node, "No SQL content found").replace('\n', ' ')
    columns = column_mapping.get(node, [])
    print(f"{i}: {node}")
    print(f"Columns: {columns}")
    print(f"SQL Content: {sql_content[:30]}...")  # Print first 10 characters of SQL content

print("\nEdges:")
print(edges)

Nodes:
0: model.amazon_ads.amazon_ads__account_report
Columns: {'date_day': {'type': 'date', 'index': 1, 'name': 'date_day', 'comment': None}, 'account_name': {'type': 'text', 'index': 2, 'name': 'account_name', 'comment': None}, 'account_id': {'type': 'text', 'index': 3, 'name': 'account_id', 'comment': None}, 'country_code': {'type': 'text', 'index': 4, 'name': 'country_code', 'comment': None}, 'profile_id': {'type': 'text', 'index': 5, 'name': 'profile_id', 'comment': None}, 'cost': {'type': 'double precision', 'index': 6, 'name': 'cost', 'comment': None}, 'clicks': {'type': 'bigint', 'index': 7, 'name': 'clicks', 'comment': None}, 'impressions': {'type': 'bigint', 'index': 8, 'name': 'impressions', 'comment': None}}
SQL Content:   with report as (     select ...
1: model.amazon_ads.amazon_ads__ad_group_report
Columns: {'date_day': {'type': 'date', 'index': 1, 'name': 'date_day', 'comment': None}, 'account_name': {'type': 'text', 'index': 2, 'name': 'account_name', 'comment': None},

In [7]:
#@title Provide your LLM API

# if you use Open AI, please ensure GPT-4 is available
# openai.api_key  = ''

# # if you use anthropic, please ensure Claude 3.5 is available
os.environ["ANTHROPIC_API_KEY"] = ""
openai.api_type ='Anthropic'

# # if you use Vertex AI, please ensure Claude 3.5 is available
# openai.api_type = 'AnthropicVertex'
# os.environ['AnthropicVertex_region'] = "us-east5"
# os.environ['AnthropicVertex_project_id'] = ""

# test if LLM works
test_message = "hello"
messages = [{"role": "user", "content": test_message}]
response = call_llm_chat(messages, temperature=0.1, top_p=0.1)
print(response['choices'][0]['message']['content'])

In [4]:
#@title Build DBT Lineage by LLMs
main_workflow = create_cocoon_dbt_explore_workflow(dbt_directory=dbt_path, viewer=True)
main_workflow.start()

IntProgress(value=1, max=2)

GridspecLayout(children=(HTML(value='<b>input_model</b>', layout=Layout(grid_area='widget001')), HTML(value='<…

Button(button_style='success', description='Next Step', icon='check', style=ButtonStyle())

Workflow DBT Project Explore Workflow completed.


In [5]:
#@title Save the result to disk
main_workflow.para['dbt_lineage'].save_to_disk(db_name="cocoon_lineage.db")

Database saved to cocoon_lineage.db


In [10]:
#@title Load the result from disk
dbt_lineage = DbtLineage(db_name="cocoon_lineage.db")

Database imported from cocoon_lineage.db


In [16]:
#@title Display the Table Lineage
dbt_lineage.interactive_lineage_display()

Dropdown(description='Select Model:', options=(('1. model.amazon_ads.amazon_ads__account_report', 'model.amazo…

Output()

In [20]:
#@title Display the Column Lineage
dbt_lineage.display_lineage_summary(model_name="model.amazon_ads_source.stg_amazon_ads__portfolio_history_tmp", column_name="last_updated_date")
# dbt_lineage.display_lineage_summary(model_name="model.amazon_ads_source.stg_amazon_ads__portfolio_history_tmp", column_name="profile_id")