#### Notebook Description 
    1. A shorcut to the Warehouse table 'gold_scin.sql_dependency_map' is created in the lakehouse.
    2. A table is created in the lh_dataanalytics_stage using the processing steps that fetch the model-wise tables used in the warehouse.
    3. This notebook reads these tables, calculates the 'Root to Leaf Node Paths' for each view used in the different Models and writes the resultant dataframe into table
       'lh_dataanalytics.w_sm_wh_vw_lineage'.
       

In [0]:
import sempy.fabric as labs
import time
import pandas as pd
import pytz
from datetime import datetime,timedelta,date
from zoneinfo import ZoneInfo
import json

KSA = pd.Timedelta(hours= 3)


In [0]:
# import com.microsoft.spark.fabric
# from com.microsoft.spark.fabric.Constants import Constants

<font size="5">**POC I : MODEL LEVEL TABLES & BASE VIEWS FROM WAREHOUSE AUTOMATION**</font>

In [0]:
model_dict = {
            #  'FTG Semantic Model'        : { 'dataset' : 'FTG Semantic Model'          , 'workspace' : 'ws_retail_sm_qa' }, --> Model Depricated
            'Global Semantic Model'       : { 'dataset' : 'Global Semantic Model'       , 'workspace' : 'ws_retail_sm_qa' },
            'HR Semantic Model'           : { 'dataset' : 'HR Semantic Model'           , 'workspace' : 'ws_hr_sm_qa' },
            'Supply Chain Semantic Model' : { 'dataset' : 'Supply Chain Semantic Model' , 'workspace' : 'ws_sci_sm_qa' },
            'Initiative Semantic Model'   : { 'dataset' : 'Initiative Semantic Model'   , 'workspace' : 'ws_retail_sm_qa' },
            'Quality Semantic Model'      : { 'dataset' : 'Quality Semantic Model'      , 'workspace' : 'ws_retail_sm_qa' },
            'Sales Semantic Model'        : { 'dataset' : 'Sales Semantic Model'        , 'workspace' : 'ws_retail_sm_qa' },
            'Finance Semantic Model'      : { 'dataset' : 'Finance Semantic Model'      , 'workspace' : 'ws_finance_sm_qa' },
            'Inventory Semantic Model'    : { 'dataset' : 'Inventory_Semantic_Model'    , 'workspace' : 'ws_sci_sm_qa' },
            'Payment Semantic Model'      : { 'dataset' : 'Payment Semantic Model'      , 'workspace' : 'ws_retail_sm_qa' },
            'Clinics Semantic Model'      : { 'dataset' : 'Clinics Semantic Model'      , 'workspace' : 'ws_clinics_sm_qa' },
            'IT Semantic Model'           : { 'dataset' : 'IT Semantic Model'           , 'workspace' : 'ws_it_sm_qa' },
            'Secured Semantic Model'      : { 'dataset' : 'Secured Semantic Model'      , 'workspace' : 'ws_secured_qa' },
            'Self service semantic model' : { 'dataset' : 'Self service semantic model' , 'workspace' : 'ws_selfservice_sm_qa' },
            'Executive Semantic Model'    : { 'dataset' : 'Executive Semantic Model'   , 'workspace' : 'ws_executive_qa' },
            }

In [0]:
# labs.refresh_dataset(refresh_type = 'full',objects= [{'table' : 'FN CCC'}],dataset= 'Finance Semantic Model',workspace= 'ws_finance_sm_qa')

In [0]:

import re
def extract_schema_item(m_code: str,table,partition):
    
    if m_code is None:
        return None

    

    # # Get the start of the expression
    # start = m_code.index(marker) + len(marker)
    # remainder = m_code[start:]

    # # Extract Schema
    # comma_index = remainder.index(",")
    # schema = remainder[:comma_index].strip()
    

    # Extract Item
    # item_marker = 'Item = "'
    # item_start = remainder.index(item_marker) + len(item_marker)
    # item_end = remainder.index('"', item_start)
    # item = remainder[item_start:item_end]

    # pattern =  r'Item\s*=\s*"([^"]+)"'
    # m_code = re.sub(r'\t+', ' ', m_code.strip())
    pattern = r'Item\s*=\s*["“]([^"\n\r]+)["”]'
    match = re.search(pattern,m_code)
    
    if match :
       return match.group(1)
       
    else :
        print(f"No Item found for {table} : partition {partition}")
        print(m_code)
        return None


    

In [0]:
df_partitions = pd.DataFrame(columns= ['Table Name','Partition Name','Partition Refresh Time','Query','Query Group','Source Schema','Source Table','Model'])
for model in model_dict.keys():
    print(f"Executing for {model}")
    ## Get the schema name from the model
    df =labs.list_expressions(dataset = model_dict[model]['dataset'] , workspace = model_dict[model]['workspace'])
    df = df[df['Name'].str.lower() == 'SchemaSource'.lower()]['Expression']
    schema = df.values[0].split(" ")[0][1:-1]

    ## get the partitoins and their source tables 
    ## The below part works if there are no partitions set on date range.

    # df_partitions_temp = labs.list_partitions(dataset = model_dict[model]['dataset'], workspace=  model_dict[model]['workspace'])
    # df_partitions_temp = df_partitions_temp[['Table Name', 'Partition Name','Query','Query Group' ]]
   

    rows =[]
    tom_wrapper = labs.TOMWrapper(dataset = model_dict[model]['dataset'] , workspace = model_dict[model]['workspace'],readonly= True)
    
    for partition in tom_wrapper.all_partitions:
        directory = None
        refreshed_time = str(partition.get_RefreshedTime())
        source_type = str(partition.get_SourceType() )
        for item in partition.get_Table().Annotations:
                if item.get_Name() == 'TabularEditor_TableGroup':
                    directory = item.get_Value()
                else :
                    pass
        if directory == None :
            if partition.get_QueryGroup() != None:
                directory = str(partition.get_QueryGroup().get_Name())
            # else :
            #     print(str(partition.get_Name()))
                
    

        if source_type in ('PolicyRangePartitionSource','PolicyRange'):
            
            rows.append({
                'Table Name' : partition.get_Table().get_Name(),
                'Partition Name' : partition.get_Name(),
                'Query' : partition.get_Source().Partition.Table.get_RefreshPolicy().get_SourceExpression(),
                'Query Group' : directory  ,
                'Partition Refresh Time' : refreshed_time       
                 })
        elif source_type in ( 'M' , 'Table'):
                rows.append({
                    'Table Name' : partition.get_Table().get_Name(),
                    'Partition Name' : partition.get_Name(),
                    'Query' : partition.get_Source().get_Expression(),
                    'Query Group' : directory ,
                    'Partition Refresh Time' : refreshed_time        
                    })
        else :
            if not str(partition.get_SourceType()) == 'Calculated':
                print(f"Attention Needed for {partition.get_Name()},parent is {str(partition.get_SourceType())}")
    df_partitions_temp = pd.DataFrame(rows)
    df_partitions_temp['Source Schema'] = schema
    df_partitions_temp['Model'] = model_dict[model]['dataset']
    df_partitions_temp = df_partitions_temp.dropna(subset=['Query'])
    df_partitions_temp['Source Table'] = df_partitions_temp.apply(lambda row: extract_schema_item(row['Query'],row['Table Name'],row['Partition Name']),axis = 1)
    df_partitions_temp['Partition Refresh Time'] = pd.to_datetime(df_partitions_temp['Partition Refresh Time'],format='%m/%d/%Y %I:%M:%S%p')+ KSA
    df_partitions_temp['Partition Refresh Time'] = df_partitions_temp['Partition Refresh Time'].dt.strftime('%m/%d/%Y %I:%M:%S %p')
    
    df_partitions = pd.concat([df_partitions,df_partitions_temp])
        





In [0]:
df_new = df_partitions.dropna(subset = ['Source Table'])
df_new.drop_duplicates(ignore_index=True)
df_new.head(10)

In [0]:
spark_df = spark.createDataFrame(df_new )
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark_df_new = spark_df.toDF(*[col.replace(" ", "_") for col in spark_df.columns])
spark_df_new.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("lh_dataanalytics_stage.w_sm_wh_vw_lineage")

<font size="5">**POC II : MODEL LEVEL VIEWS & BASE TABLES LINEAGE AUTOMATION**</font>

In [0]:
# Read the tables into spark dataframes and conver them to pandas DF's
df_spark = spark.read.format("delta").load("Tables/sql_dependency_map")
df = df_spark.toPandas()

df_model_spark = spark.read.format("delta").load("Tables/w_sm_wh_vw_lineage")
df_model = df_model_spark.toPandas()

In [0]:
## collects the parent and childs into a dict and get the unique name for each table using the schema.table/view_name 2 level namespace

parent_child_dict ={}
for index, row in df.iterrows():
    if pd.isna(row['child_name']) or pd.isna(row['parent_name']) or pd.isna(row['parent_schema']) or pd.isna(row['child_schema_name']):
        continue
    # print(row['parent_schema'])
    parent = row['parent_schema']+"."+ row['parent_name']
    # print(parent)
    # print(row['child_schema_name'])
    child = row['child_schema_name']+"."+ row['child_name']
    
    if pd.isna(parent):
        continue
   
    if parent not in parent_child_dict.keys():
        parent_child_dict[parent] = [child]
    elif parent not in parent_child_dict[parent] and child is not None and child != parent:
        parent_child_dict[parent].append(child)
    else : 
        print(f"something wrong about {parent} : {child}. Here is dict if it helps {parent_child_dict[parent]}")


In [0]:
# Finds the Root - Leaf Node Path given a Root_Node and the parent_child_Realtions

buff_dict = {}
def get_root_leaf_paths(parent_node, parent_child_rel,buffer_dict):
    if parent_node in buff_dict.keys():
        return buff_dict[parent_node]
    else :
        if parent_node not in parent_child_rel.keys():
            return [parent_node]
        else:
            paths = []
            for child in parent_child_rel[parent_node]:
                child_paths = get_root_leaf_paths(child, parent_child_rel, buffer_dict)
                for path in child_paths:
                    paths.append(parent_node + " <- " + path)
            buff_dict[parent_node] = paths
            return paths

root_level_paths = {}


In [0]:
# Select only the required cols and for each View Used in the models , find the Lineage
df_model_tables = df_model[['Source_Schema','Source_Table','Model']].copy()
df_model_tables['Root_Node'] = df_model_tables['Source_Schema']+"."+ df_model_tables['Source_Table']
df_model_tables['Paths'] = df_model_tables['Root_Node'].apply(lambda x : get_root_leaf_paths(x, parent_child_dict,buff_dict))

In [0]:
# Use the Lineage and De-Normalize the paths such that each child used in the Root has different Row
flattened_rows = []
for index,row in df_model_tables[['Root_Node','Paths','Model']].iterrows():
    for p in row['Paths'] :
        nodes = p.split("<-")
        for i in range(2,len(nodes)+1) :
            
            flattened_rows.append({
                'Root_Node': row['Root_Node'],
                'Leaf_Node': nodes[i-1],
                'Flattened_Path': "<-".join(nodes[:i])

            }) 


df_flat = pd.DataFrame(flattened_rows)


In [0]:
# Drop any duplicates present and join the both dataframes to get a single consolidated table 
df_flat = df_flat.drop_duplicates(ignore_index= True)
df_final = pd.merge(df_model_tables,df_flat,on = 'Root_Node',how = 'inner')

In [0]:
# Write the results into the table 
spark_df = spark.createDataFrame(df_final )
spark_df_final = spark_df.toDF(*[col.replace(" ", "_") for col in spark_df.columns])
spark_df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("lh_dataanalytics_stage.model_vw_bt_f")

In [0]:
mssparkutils.notebook.exit('Success')