In [0]:
# 1. first the convert has to happen so that the sql is analyzed
# 2. then the lineage is done. 


# to be situation: 
#     A process so that i can pick a flow and generate:
#         1. lineage diagram starting from a CV
#         2. Lineage for all the columns in the CV down to tables. 
#             table output should be: Nr of lines is = number of fields in the Top CV

In [0]:
import re

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from datetime import datetime


In [0]:
%run ./init

In [0]:
dbutils.widgets.text("data_flow", "", "Enter Data Flow Value: ")

In [0]:
sa = "stlpdel01dev"
container_name = "codeconverter" # Replace with your actual container name
spark.conf.set("fs.azure.account.auth.type", "CustomAccessToken")
spark.conf.set("fs.azure.account.custom.token.provider.class", spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName"))


In [0]:
data_flow = dbutils.widgets.get("data_flow")
base_url = f"abfss://{container_name}@{sa}.dfs.core.windows.net/AcceleratorSAPFiles/Lineage_SAP/input/{data_flow}"
output_url = f"abfss://{container_name}@{sa}.dfs.core.windows.net/AcceleratorSAPFiles/Lineage_SAP/output/{data_flow}/"
dbutils.fs.ls(base_url)

In [0]:

# List files in the specified directory
files = dbutils.fs.ls(base_url)

# Check if the directory is empty and print the output URL or a message
if files:
    print(files)
else:
    print("There is no file")


# Create the INSERT for source to target table/column/transformation

In [0]:

archive_directory_path = f"abfss://{container_name}@{sa}.dfs.core.windows.net/AcceleratorSAPFiles/Lineage_SAP/InputArchivedFiles/"
print(archive_directory_path)
current_date = datetime.now().strftime('%Y-%m-%d')
archive_input_date_folder = f"{archive_directory_path.rstrip('/')}/{current_date}/"
dbutils.fs.mkdirs(archive_input_date_folder)

files = dbutils.fs.ls(base_url)

# Define the pattern to match everything ending with 'validated_code'
pattern = r".validated_code.*$"


catalog_schema = "hive_metastore.codeconverter_config"
display(files)

initial_prompt = f"""generate insert scripts for each column to to understand the source table. the target table will be the one from create statement. generate insert for a table having the columns: target table name, target column name, source table name, source column name, transformation. Consider that the lineage table name is {catalog_schema}.{data_flow.lower()}_lineage. 
In case a query is having multiple ctes, to take the source table go through all ctes and take the base table for that specific column.
For each specific column go through all ctes and add all steps of transformations
Generate only the insert statement without any other comments. In case there are 2 or more tables inserted into source_table_name column, give them aliases. Use these aliases in the source_column_name as well to know where the column is coming from. Generate multiple-row INSERT statement. 
All the inserted values should be string. Don't put column names in quotation marks
Take into consideration all the above remarks above the table naming conventions.  """

for file_info in files:

    file_content = read_file_content(file_info.path)
    if file_content is None:
                continue
    # Convert file content to a single string
    file_content_as_string = "\n".join(file_content)
    final_prompt = initial_prompt + file_content_as_string
    model_output = call_model_o1(deployment_name="o1", prompt=final_prompt)

    #Ensure file_name is a string
    file_name = str(file_info.name)
    modified_file_name = re.sub(pattern, "_lineage.txt", file_name)
    # Replace the matched pattern with '_lineage.txt'
    output_file_path = output_url + modified_file_name

    print(output_file_path)
    
    dbutils.fs.put(output_file_path, model_output, overwrite=True)

    dbutils.fs.mv(file_info.path, archive_input_date_folder)


In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE hive_metastore.codeconverter_config.{data_flow.lower()}_lineage
(
  TARGET_TABLE_NAME string,
TARGET_COLUMN_NAME string,
SOURCE_TABLE_NAME string,
SOURCE_COLUMN_NAME string,
TRANSFORMATION string
)

""")


In [0]:
# Create an archive folder with the current date
archive_directory_path = f"abfss://{container_name}@{sa}.dfs.core.windows.net/AcceleratorSAPFiles/Lineage_SAP/OutputArchivedFiles/"
print(archive_directory_path)
current_date = datetime.now().strftime('%Y-%m-%d')
archive_date_folder = f"{archive_directory_path.rstrip('/')}/{current_date}/"
dbutils.fs.mkdirs(archive_date_folder)

files = dbutils.fs.ls(output_file_path)

# Process each file in the cleaned directory
for file_info in files:
    file_path = file_info.path
    if file_info.isFile():
        print(f"Reading file: {file_path}")
        try:
            # Read file content
            file_content = read_file_content(file_path)
            if file_content is None:
                continue
            
            # Convert file content to a single string
            sql_code = "\n".join(file_content)
            
            # Split the SQL code into individual statements
            statements = sql_code.split(";")
            
            for statement in statements:
                statement = statement.strip()
                if statement:
                    # Execute each SQL statement individually
                    print(f"Executing SQL statement: {statement}")
                    #spark.sql(statement)
            
            print(f"Successfully executed SQL code for file: {file_path}")
            dbutils.fs.mv(file_path, archive_date_folder)
        except Exception as e:
            print(f"Error processing file {file_path}")
            print(f"Exception: {e}")
    else:
        print(f"Skipping directory: {file_path}")

In [0]:

df_lineage = spark.sql(f"""SELECT * FROM codeconverter_config.{data_flow.lower()}_lineage""")
display(df_lineage)

#One row per column lineage


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

# # Query initial dataframe
start_df = spark.table(f"hive_metastore.codeconverter_config.{data_flow.lower()}_lineage")

# Retrieve list of unique target table names and target columns names
# unique_filter = start_df.filter((col("target_table_name").isin("nntst.CV_030_POLICY_DERIVATIONS", "nntst.CV_020_ZORRO_POL_TARIFF"))
#                                                & (col("target_column_name").isin("ZSTDTPROJ", "ZSNP_DT_ULTIMO")))
# unique_tgt_tbl_col = unique_filter.select("target_column_name", "target_table_name").distinct()

# unique_filter = start_df.filter((col("target_table_name").isin("nntst.CV_030_POLICY_DERIVATIONS"))
#                                                & (col("target_column_name").isin("ZSTDTPROJ")))
unique_tgt_tbl_col = start_df.select("target_column_name", "target_table_name").distinct()

#display(unique_tgt_tbl_col)

#Define an empty dataframe which will store the final lineage for all columns
results_df = []

# Iterate through all target columns and table names
for row in unique_tgt_tbl_col.collect():
    target_column_name = row["target_column_name"]
    target_table_name = row["target_table_name"]

    order_no = 1
    lineage_df = start_df.filter((col("target_column_name") == target_column_name) & 
                                 (col("target_table_name") == target_table_name))
    #lineage_df = lineage_df.withColumn("order_no", lit(order_no))

    current_item_df = lineage_df
    while True:
        next_item_df = current_item_df.alias("i").join(start_df.alias("s"), 
                                                       (col("i.source_table_name") == col("s.target_table_name")) & 
                                                       (col("i.source_column_name") == col("s.target_column_name")), 
                                                       how="inner").select("s.*")
        order_no += 1
        next_item_df = next_item_df.withColumn("order_no", lit(order_no))

        #renaming columns
        next_item_df_renamed = next_item_df.withColumnRenamed("TARGET_TABLE_NAME", f"TARGET_TABLE_NAME{order_no}")
        next_item_df_renamed = next_item_df_renamed.withColumnRenamed("source_table_name", f"source_table_name{order_no}")
        next_item_df_renamed = next_item_df_renamed.withColumnRenamed("target_column_name", f"target_column_name{order_no}")
        next_item_df_renamed = next_item_df_renamed.withColumnRenamed("source_column_name", f"source_column_name{order_no}")
        next_item_df_renamed = next_item_df_renamed.withColumnRenamed("transformation", f"transformatin{order_no}")
        next_item_df_renamed = next_item_df_renamed.withColumnRenamed("order_no", f"order_no{order_no}")

        if next_item_df.count() == 0:
            break
        
        lineage_df = lineage_df.crossJoin(next_item_df_renamed)
        current_item_df = next_item_df
        

    # Add this lineage_df to the overall results_df list
    results_df.append(lineage_df)

# Union all results to get the final dataframe
final_results_df = results_df[0]

for df in results_df[1:]:
    final_results_df = final_results_df.unionByName(df, allowMissingColumns=True)

# Show final results (In actual usage, you might save this to a table)
final_results_df.display()




#Create lineage diagram

In [0]:
# strict digraph {
#     a [shape="ellipse" style="filled" fillcolor="#1f77b4"]
#     b [shape="polygon" style="filled" fillcolor="#ff7f0e"]
#     a -> b [fillcolor="#a6cee3" color="#1f78b4"]
#}

In [0]:
from pyspark.sql.functions import substring


lineage_df = spark.table(f"codeconverter_config.{data_flow.lower()}_lineage")

lineage_distinct_df = lineage_df.select("target_table_name", "source_table_name").distinct()
lineage_col_trim_df = lineage_distinct_df.withColumn("target_table_name", substring("target_table_name", 7, 1000)).withColumn("source_table_name", substring("source_table_name", 7, 1000))

#display(lineage_col_trim_df)

table_list_target = [row["target_table_name"] for row in lineage_col_trim_df.collect()]
table_list_source = [row["source_table_name"] for row in lineage_col_trim_df.collect()]
table_list = [item for item in set(table_list_target + table_list_source) if item != ""]


digraph_string = "strict digraph {"


for table in table_list:
    digraph_string = digraph_string + f'\n "{table}" [shape="ellipse" style="filled" fillcolor="#1f77b4"]'
    #print(table)

for row in lineage_col_trim_df.collect():
    digraph_string += f'\n "{row["target_table_name"]}" -> "{row["source_table_name"]}" [fillcolor="#a6cee3" color="#1f78b4"]'
    #print(row["target_table_name"],  row["source_table_name"])

digraph_string = digraph_string + "\n }"

print(digraph_string)