In [None]:
from dotenv import load_dotenv
from agents import Agent, Runner, trace
from agents.mcp import MCPServerStdio
from IPython.display import display, Markdown

load_dotenv(override=True)

In [57]:
# Now let's use our accounts server as an MCP server

params = {"command": "/opt/homebrew/bin/uv", "args": ["run", "lineage_aql_server.py"]}
async with MCPServerStdio(params=params, client_session_timeout_seconds=30) as server:
    mcp_tools = await server.list_tools()


In [None]:
mcp_tools

In [58]:
instructions = "use sql_lineage_struct to analyze the following sql query: "
request = """WITH active_users AS (
    SELECT user_id, region_id
    FROM users
    WHERE is_active = TRUE
),

valid_orders AS (
    SELECT o.order_id, o.user_id, o.order_date, o.total_amount
    FROM orders o
    INNER JOIN active_users au ON o.user_id = au.user_id
    WHERE o.is_valid = TRUE
),

order_details AS (
    SELECT 
        oi.order_id,
        oi.product_id,
        oi.quantity,
        oi.unit_price,
        p.category_id,
        c.category_name,
        vo.order_date,
        vo.total_amount,
        au.region_id
    FROM order_items oi
    INNER JOIN valid_orders vo ON oi.order_id = vo.order_id
    INNER JOIN products p ON oi.product_id = p.product_id
    INNER JOIN categories c ON p.category_id = c.category_id
    INNER JOIN active_users au ON vo.user_id = au.user_id
),

returns_summary AS (
    SELECT 
        oi.order_id,
        COUNT(r.return_id) AS total_returns,
        SUM(oi.quantity) AS total_returned_qty
    FROM returns r
    INNER JOIN order_items oi ON r.order_item_id = oi.order_item_id
    GROUP BY oi.order_id
),

payments_summary AS (
    SELECT 
        o.order_id,
        SUM(p.amount) AS total_paid
    FROM payments p
    INNER JOIN orders o ON p.order_id = o.order_id
    WHERE p.payment_date IS NOT NULL
    GROUP BY o.order_id
),

subscription_lifetime_value AS (
    SELECT 
        u.user_id,
        SUM(DATEDIFF(COALESCE(s.end_date, CURRENT_DATE), s.start_date)) AS total_days_subscribed
    FROM subscriptions s
    INNER JOIN users u ON s.user_id = u.user_id
    GROUP BY u.user_id
),

-- Final aggregation
monthly_sales_summary AS (
    SELECT 
        DATE_TRUNC('month', od.order_date) AS sale_month,
        r.region_name,
        od.category_name,
        COUNT(DISTINCT od.order_id) AS total_orders,
        SUM(od.quantity * od.unit_price) AS gross_revenue,
        COALESCE(rs.total_returns, 0) AS items_returned,
        COALESCE(ps.total_paid, 0) AS amount_paid,
        AVG(slv.total_days_subscribed) AS avg_lifetime_days
    FROM order_details od
    INNER JOIN regions r ON od.region_id = r.region_id
    LEFT JOIN returns_summary rs ON od.order_id = rs.order_id
    LEFT JOIN payments_summary ps ON od.order_id = ps.order_id
    LEFT JOIN subscription_lifetime_value slv ON od.order_id IN (
        SELECT order_id FROM valid_orders WHERE user_id = slv.user_id
    )
    GROUP BY 1, 2, 3, rs.total_returns, ps.total_paid
)

-- Final output
SELECT * 
FROM monthly_sales_summary
ORDER BY sale_month DESC, region_name, category_name;
"""
model = "gpt-4.1-mini"

In [None]:

async with MCPServerStdio(params=params, client_session_timeout_seconds=30) as mcp_server:
    agent = Agent(name="lineage_struct_agent", instructions=instructions, model=model, mcp_servers=[mcp_server])
    with trace("lineage_struct_agent"):
        sp = await Runner.run(agent, request)
    display(Markdown(sp.final_output))


In [60]:
import json
final_output = json.loads(sp.final_output)
sql_parts = [value["sql"] for value in final_output.values() if "sql" in value]


In [61]:
instructions = "use sql_lineage_field_mapping to analyze the following sub query parts: "
request = sql_parts

In [None]:
fm=[]
async with MCPServerStdio(params=params, client_session_timeout_seconds=30) as mcp_server:
    agent = Agent(name="lineage_field_mapping_agent", instructions=instructions, model=model, mcp_servers=[mcp_server])
    for sql_part in sql_parts:
        with trace("lineage_field_mapping_agent"):
            fm_part = await Runner.run(agent, sql_part)
        fm.append(fm_part.final_output)
print(fm)

In [63]:
instructions = "use sql_lineage_operation_logic to analyze the following sub query parts: "
request= sql_parts


In [None]:
op=[]
async with MCPServerStdio(params=params, client_session_timeout_seconds=30) as mcp_server:
    agent = Agent(name="lineage_operation_logic_agent", instructions=instructions, model=model, mcp_servers=[mcp_server])
    for sql_part in sql_parts:
        with trace("lineage_operation_logic_agent"):
            op_part = await Runner.run(agent, sql_part)
        op.append(op_part.final_output)
print(op)

In [None]:
combined_dict = {
    "Parsed SQL": sql_parts,
    "Field Mappings": fm,
    "Logical Operators": op
}

# To create a combined string with keys preserved (optional formatting)
combined_string = ''
for key, lst in combined_dict.items():
    combined_string += f"{key}:\n"
    combined_string += '\n'.join(str(item) for item in lst)
    combined_string += '\n\n'

print(combined_string)

In [None]:
instructions = "use sql_lineage_aggregate to extract lineage from different source of information you have: "
request = combined_string

async with MCPServerStdio(params=params, client_session_timeout_seconds=30) as mcp_server:
    agent = Agent(name="lineage_aggregate_agent", instructions=instructions, model=model, mcp_servers=[mcp_server])
    with trace("lineage_aggregate_agent"):
        agg_output = await Runner.run(agent, request, max_turns=3)
    display(Markdown(agg_output.final_output))