In [None]:
import pandas as pd
import re

df = pd.read_csv("object_definitions.csv")


def robust_clean_sql(sql_query):
    sql_text = str(sql_query)

    sql_text = sql_text.replace('\\n', '\n').replace('\\t', '\t')

    # Remove single-line comments (-- ...)
    sql_text = re.sub(r'--.*', '', sql_text)
    # Remove multi-line comments (/* ... */)
    sql_text = re.sub(r'/\*.*?\*/', '', sql_text, flags=re.DOTALL)

    # Replace multiple newlines with a single newline
    sql_text = re.sub(r'\n\s*\n', '\n', sql_text)
    # Collapse horizontal spaces (tabs/spaces) into one space
    sql_text = re.sub(r'[ \t]+', ' ', sql_text)
    
    return sql_text.strip()


df = df[df["ObjectType"] == "SQL_STORED_PROCEDURE"]
# object_schema = "tmp"
# object_name = "usp_Cobachi_Consumption_Volumes_Actual_Email"#"usp_Fleet_snapshot"
# df = df.query(f"Schema == '{object_schema}' and Object == '{object_name}'")
df

In [None]:
sql_text=robust_clean_sql(df.iloc[0,4])
print(sql_text)

In [None]:
prompt = f"""
You are a SQL data lineage extractor specializing in T-SQL stored procedures.

OBJECTIVE:
Extract DIRECT source-to-target mappings between PERSISTENT database objects ONLY.
Trace data flow through ALL intermediate steps (temp tables, CTEs, subqueries) but report only the FINAL persistent objects.

OBJECT CLASSIFICATION:

PERSISTENT OBJECTS (Report these):
- Tables: schema.table, [schema].[table], database.schema.table
- Views: schema.view, [schema].[view]
- Stored procedures (when used as data sources via EXEC INSERT)

INTERMEDIATE OBJECTS (Trace through, but DO NOT report):
- Temp tables: #temp, ##global_temp
- Table variables: @table
- CTEs: WITH cte_name AS (...)
- Subqueries and derived tables
- Variables: @variable

EXTRACTION RULES:

1. TRACE THROUGH INTERMEDIATES:
   - If temp table #T is populated from table A, then #T is inserted into table B
   - Report: A → B (not A → #T or #T → B)

2. HANDLE MULTI-STEP FLOWS:
   - Step 1: A → #temp1
   - Step 2: #temp1 → #temp2  
   - Step 3: #temp2 → B
   - Report: A → B

3. MULTIPLE SOURCES TO ONE TARGET:
   - Create separate lineage entries for each source
   - Example: A → C, B → C (two separate JSON objects)

4. ONE SOURCE TO MULTIPLE TARGETS:
   - Create separate lineage entries for each target
   - Example: A → X, A → Y (two separate JSON objects)

5. COMPLEX QUERIES:
   - Trace through all JOINs, subqueries, CTEs
   - Extract base tables from nested SELECT statements
   - Follow data flow through UNION, EXCEPT, INTERSECT operations

6. IGNORE:
   - Table hints: (NOLOCK), WITH (NOLOCK), (INDEX=...), etc.
   - System tables/views unless explicitly part of business logic
   - The stored procedure name itself as a source

7. DELETE/TRUNCATE OPERATIONS:
   - These affect targets but have no sources
   - Omit from lineage (or include with "source": null if you need to track modifications)

8. EXEC STORED PROCEDURES:
   - If "INSERT INTO table EXEC stored_proc", treat stored_proc as a source
   - Otherwise, you may need to trace into that procedure separately

OUTPUT FORMAT:

{{
  "lineage": [
    {{
      "source": "schema.table_name",
      "target": "schema.table_name"
    }}
  ]
}}

RULES ENFORCEMENT:
✓ Output ONLY valid JSON
✓ No explanations, comments, or markdown
✓ No temp tables (#temp) in final output
✓ No CTEs or table variables in final output
✓ Each lineage pair must have exactly one source and one target
✓ Use fully qualified names when available (schema.table)
✓ Remove all table hints from object names

EXAMPLE:

Given SQL:
```sql
-- Step 1: Read from A, B into temp
SELECT * INTO #temp FROM A JOIN B ON A.id = B.id

-- Step 2: Read from #temp and C into final table
INSERT INTO Z 
SELECT * FROM #temp JOIN C ON #temp.id = C.id
```

Correct output:
{{
  "lineage": [
    {{"source": "A", "target": "Z"}},
    {{"source": "B", "target": "Z"}},
    {{"source": "C", "target": "Z"}}
  ]
}}

Incorrect output (DO NOT DO THIS):
{{
  "lineage": [
    {{"source": "A", "target": "#temp"}},
    {{"source": "B", "target": "#temp"}},
    {{"source": "#temp", "target": "Z"}},
    {{"source": "C", "target": "Z"}}
  ]
}}

SQL TO ANALYZE:
{sql_text}
"""

In [None]:
import ollama
import json

MODEL_NAME = "qwen2.5-coder:14b"
result= ollama.generate(model=MODEL_NAME,
prompt= prompt,
format="json"
)

lineage_json = json.loads(result["response"])
print(json.dumps(lineage_json, indent=4))

In [None]:

import os
import pandas as pd
import re
EXCEL_FILE = "object_definitions.csv"
df = pd.read_csv(EXCEL_FILE)

df = df[df["ObjectType"] == "SQL_STORED_PROCEDURE"]
OUTPUT_FOLDER = "lineage_outputs"
import hashlib

# Add this function after your imports
def get_file_identifier(schema, object_name):
    """Create a unique hash-based identifier for a SQL object."""
    unique_string = f"{schema}|{object_name}"
    return hashlib.md5(unique_string.encode('utf-8')).hexdigest()[:12]

def get_processed_identifiers():
    """Extract identifiers from existing JSON files in the output folder."""
    processed = set()
    if os.path.exists(OUTPUT_FOLDER):
        for filename in os.listdir(OUTPUT_FOLDER):
            if filename.endswith('.json'):
                # Extract identifier from filename pattern: "identifier--schema--object.json"
                match = re.match(r'^([a-f0-9]{12})--', filename)
                if match:
                    processed.add(match.group(1))
    return processed

processed_identifiers = get_processed_identifiers()

print(processed_identifiers)

## Validation Code

In [None]:
import pandas as pd

df = pd.read_csv("lineage_analysis_local/databases_lineage_extracted.csv")

lineage_objects = []
for (schema, object_name), group in df.groupby(["Dependent_Schema", "Dependent_Object_Name"]):
    lineage_dict = {
            "object_name" : schema+"."+object_name,
            "lineage_objects_list": []
        }
    for row in group.itertuples():
        lineage_dict["lineage_objects_list"].append(row[7]+'.'+row[8])
    lineage_objects.append(lineage_dict)

In [None]:
for element in lineage_objects:
    if element['object_name'] == 'tmp.usp_SN_YearMonth_History_Report_pre_prd':
        print(element['lineage_objects_list'])

In [None]:
import os
import json

path = "lineage_analysis_local/view_dependency_jsons/"
names_list = []
files_list = os.listdir(path)

for i in range(0, len(files_list)):
    with open(path + files_list[i], "r") as f:
        json_file = json.load(f)
    type(json_file)
    break
    files_list[i] = files_list[i].replace(".json", "")
    splitted_file_name = files_list[i].split("--")
    file_name = splitted_file_name[1] + '.' + splitted_file_name[2]
    names_list.append(file_name)
    

In [30]:
import pandas as pd

df1 = pd.read_csv("object_definitions.csv")
df2 = pd.read_csv("UAT_object_definitions.csv")

df1 = df1[(df1["ObjectType"] == "SQL_STORED_PROCEDURE") & (df1["Schema"] == "tmp")]
df2 = df2[(df2["ObjectType"] == "SQL_STORED_PROCEDURE") & (df2["Schema"] == "tmp")]

In [31]:
# df1_length = len(df1.iloc[:,[1,2]])
# df2_length = len(df2.iloc[:,[1,2]])
# print(df2_length, df1_length, df1_length + df2_length )
# result = pd.concat([df1.iloc[:,[1,2]], df2.iloc[:,[1,2]]], ignore_index=True)
# result

df1_set = set()
df2_set = set()

for row in df1.iloc[:,[1,2]].itertuples():
    name = row[1] +'.' + row[2]
    df1_set.add(name)

for row in df2.iloc[:,[1,2]].itertuples():
    name = row[1] +'.' + row[2]
    df2_set.add(name)

print(len(df1_set))
print(len(df2_set))

print(len(df2_set - df1_set))
print(df2_set - df1_set)


object_schema = "tmp"
object_name = "usp_SNV_sc_item_option_mtom_ERM_STG"#"usp_Fleet_snapshot"
df_uat_query = df2.query(f"Schema == '{object_schema}' and Object == '{object_name}'")
df_dev_query = df1.query(f"Schema == '{object_schema}' and Object == '{object_name}'")

561
454
15
{'tmp.usp_SNV_sc_req_item_STG_FULL', 'tmp.usp_snv_awa_interaction_work_item_stg', 'tmp.usp_ERM_STG_to_ERM', 'tmp.usp_SNV_metric_definition_STG', 'tmp.usp_Cobachi_Consumption_Volumes_Calculation', 'tmp.usp_erm_SNV_dmn_demand', 'tmp.usp_SNV_u_ast_data_stg', 'tmp.usp_erm_SNV_asmt_assessment_instance', 'tmp.usp_MAHSAzureBilling_STG_old', 'tmp.usp_Cobachi_Consumption_Volumes_Calculation_NULL_STG', 'tmp.usp_SNV_sys_user_skill_history_STG', 'tmp.usp_SN_All_Cumulative_Report', 'tmp.usp_SNV_Application_Business_UnrelatedCI', 'tmp.usp_erm_SNV_asmt_assessment_instance_question', 'tmp.usp_SNV_Application_Business_UnRelCI'}


In [35]:
print(df_uat_query.iloc[0,4])
print(df_dev_query.iloc[0,4])

\n\nCREATE  PROCEDURE  [tmp].[usp_SNV_sc_item_option_mtom_ERM_STG]  \n  \nAS  \n  \nSet Rowcount 0  \nSet Nocount On  \n-- Delete null records  \nBEGIN  \n DELETE FROM [pst].[SNV_sc_item_option_mtom_erm] where sys_id is null  \nEND  \n-- Delete duplicate records  \nBEGIN  \n;With CTE as   (  Select sys_id, row_number() over (partition by sys_id order by sys_id) as RN from [pst].[SNV_sc_item_option_mtom_erm]  )  \ndelete from CTE where RN > 1  \nEND  \n    \nBEGIN  \n  \n --TRUNCATE TABLE [stg].[SNV_sc_item_option_mtom_mlt]  \n  \n INSERT INTO [stg].[SNV_sc_item_option_mtom_erm]  \n(    \n    [request_item],  \n [sc_item_option],  \n [sys_created_by],  \n [sys_created_on],  \n [sys_id],  \n [sys_mod_count],  \n [sys_updated_by],  \n [sys_updated_on]  \n )  \nSELECT   \n  \n    [dv_request_item],  \n [dv_sc_item_option],  \n [dv_sys_created_by],  \n CAST(NULLIF([dv_sys_created_on],'') AS Datetime2(7)),  \n [dv_sys_id],  \n [dv_sys_mod_count],  \n [dv_sys_updated_by],  \n CAST(NULLIF([dv_