In [256]:
import duckdb
import re
import ast
con = duckdb.connect(config={'allow_unsigned_extensions' : 'true'})
con.execute("CALL dbgen(sf=0.1);")
print(con.execute("LOAD '../build/release/repository/v1.3.0/osx_amd64/lineage.duckdb_extension'").df())

def get_plan():
    meta = con.execute("select * from pragma_latest_qid()").df()
    if len(meta) == 0: return None, None
    latest = len(meta)-1
    query_id = meta['query_id'][latest]
    plan_str = meta['plan'][latest]
    print(plan_str)
    plan_safe = re.sub(r": (\w+)", r': "\1"', plan_str)
    plan = ast.literal_eval(plan_safe)
    return plan, query_id


end2end_lineage = f"""
WITH RECURSIVE lineage_tree as   (
    -- Base case: start from root
    SELECT
        source_table,
        source_opid,
        sink_opid,
        out_rowid,
        in_rowid,
        0 AS depth,
        CAST(source_opid AS VARCHAR) AS path
    FROM global_lineage()
    WHERE sink_opid =-1

    UNION ALL

    -- Recursive step: find children
    SELECT
        c.source_table,
        c.source_opid,
        c.sink_opid,
        p.out_rowid,
        c.in_rowid,
        p.depth + 1,
        path || ' -> ' || c.source_opid
    FROM global_lineage() c
    JOIN lineage_tree p ON c.sink_opid = p.source_opid and c.out_rowid=p.in_rowid
),

-- end-to-end lineage
lineage_e2e AS (
    SELECT source_table, source_opid,
        out_rowid,
        LIST(DISTINCT in_rowid) AS prov
    FROM lineage_tree
    GROUP BY out_rowid, source_opid, source_table
)

SELECT *
FROM lineage_e2e WHERE  CAST(source_table AS VARCHAR) NOT LIKE 'LOGICAL_%'
""" 

Empty DataFrame
Columns: [Success]
Index: []


In [257]:
qid = 1
qfile = f"../queries/q{str(qid).zfill(2)}.sql"
text_file = open(qfile, "r")
query = text_file.read().strip()
query = ' '.join(query.split())
text_file.close()
con.execute("PRAGMA threads=1")
con.execute("PRAGMA set_debug_lineage(False)")
con.execute("PRAGMA set_lineage(True)")
print(query)
print(con.execute(query).df())
con.execute("PRAGMA set_lineage(False)")
#lineage = con.execute("select * from global_lineage()").df()
#print(lineage)
con.execute(end2end_lineage).df()

SELECT l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, sum(l_extendedprice) AS sum_base_price, sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, avg(l_quantity) AS avg_qty, avg(l_extendedprice) AS avg_price, avg(l_discount) AS avg_disc, count(*) AS count_order FROM lineitem WHERE l_shipdate <= CAST('1998-09-02' AS date) GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus
  l_returnflag l_linestatus    sum_qty  sum_base_price  sum_disc_price  \
0            A            F  3774200.0    5.320754e+09    5.054096e+09   
1            N            F    95257.0    1.337378e+08    1.271324e+08   
2            N            O  7459297.0    1.051227e+10    9.986238e+09   
3            R            F  3785523.0    5.337951e+09    5.071819e+09   

     sum_charge    avg_qty     avg_price  avg_disc  count_order  
0  5.256751e+09  25.537587  36002.123829  0.050145       147790  
1  1.322863e+08  

Unnamed: 0,source_table,source_opid,out_rowid,prov
0,lineitem,8,0,"[9, 10, 12, 16, 17, 31, 32, 33, 45, 46, 47, 56..."
1,lineitem,8,1,"[211, 417, 447, 779, 781, 905, 906, 940, 955, ..."
2,lineitem,8,2,"[0, 1, 2, 3, 4, 5, 6, 13, 18, 19, 20, 21, 22, ..."
3,lineitem,8,3,"[7, 8, 11, 14, 15, 34, 55, 59, 77, 78, 80, 82,..."


In [258]:
# TODO: construct lineage queries for delim

# TODO: support delim join
def extract_lineage_queries(query_id, node):
    queries = []

    opid = node["opid"]
    table = "-"  if len(node["table"]) == 0 else node["table"]
    has_lineage = node.get("has_lineage", "false") == "true"
    children = node.get("children", [])
    is_join = "JOIN" in node["name"] and len(children) == 2
    needs_unnest = node["name"] in ["LOGICAL_AGGREGATE_AND_GROUP_BY"]
    has_children = children != "NULL" and children != None
    # If current node has lineage, emit a query
    if has_lineage:
        parent_id = node["sink_id"]
        src_opid = node["source_id"][0]
        src_table = node["source_table"][0]
        if needs_unnest:
            queries.append(f"""
            SELECT '{src_table}' as source_table, {src_opid} AS source_opid, {parent_id} AS sink_opid,
            out_rowid, in_elem AS in_rowid
            FROM (
                SELECT out_rowid, UNNEST(in_rowid) AS in_elem
                FROM lineage_scan({query_id}, {opid}, 0)
                     AS ls(out_rowid BIGINT, in_rowid LIST(BIGINT))
            )
            """.strip())
        else:
            queries.append(f"""SELECT  '{src_table}' as source_table, {src_opid} AS source_opid, {parent_id} AS sink_opid, out_rowid,  in_rowid
            FROM lineage_scan({query_id}, {opid}, 0)  AS ls(out_rowid BIGINT, in_rowid BIGINT)""".strip())
        if is_join:
            rhs_opid = node["source_id"][1]
            rhs_table = node["source_table"][1]
            queries.append(f"""SELECT  '{rhs_table}' as source_table, {rhs_opid} AS source_opid, {parent_id} AS sink_opid, out_rowid,  in_rowid
            FROM lineage_scan({query_id}, {opid}, 1)  AS ls(out_rowid BIGINT, in_rowid BIGINT)""".strip())

    if not has_children: return queries
    for i, child in enumerate(children):
        queries.extend(extract_lineage_queries(query_id, child))

    return queries
    
plan, query_id = get_plan()
#lineage = None
if plan:
    queries = extract_lineage_queries(query_id, plan)
    final_sql = "\nUNION ALL\n".join(queries)
    #print(final_sql)
    #lineage = con.execute(final_sql).df()
    #con.execute("PRAGMA clear_lineage")
#lineage

{"opid": 1,"name": "LOGICAL_ORDER_BY","sink_id": -1,"source_id": [5],"table": "","source_table": [""],"has_lineage": true,"children": [{"opid": 2,"name": "LOGICAL_PROJECTION","sink_id": -1,"source_id": [],"table": "","source_table": [],"has_lineage": false,"children": [{"opid": 3,"name": "LOGICAL_PROJECTION","sink_id": -1,"source_id": [],"table": "","source_table": [],"has_lineage": false,"children": [{"opid": 4,"name": "LOGICAL_PROJECTION","sink_id": -1,"source_id": [],"table": "","source_table": [],"has_lineage": false,"children": [{"opid": 5,"name": "LOGICAL_AGGREGATE_AND_GROUP_BY","sink_id": 5,"source_id": [8],"table": "","source_table": ["lineitem"],"has_lineage": true,"children": [{"opid": 6,"name": "LOGICAL_PROJECTION","sink_id": -1,"source_id": [],"table": "","source_table": [],"has_lineage": false,"children": [{"opid": 7,"name": "LOGICAL_PROJECTION","sink_id": -1,"source_id": [],"table": "","source_table": [],"has_lineage": false,"children": [{"opid": 8,"name": "LOGICAL_GET","