In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm

from sql_connector import *

In [2]:
def cleanCode(code):
    if code != None:
        dct = {
            ord('\n'): ' ',
            ord('\t'): ' ',
            ord('\r'): ' ',
            ord('['): '',
            ord(']'): '',
        }

        code = code.lower().translate(dct)
    
    return code

In [3]:
def getProcedureCode(stored_procedure_name):
    stored_procedure_db = stored_procedure_name.split('.')[0]
    stored_procedure_name = stored_procedure_name.split(stored_procedure_db)[1][1:]
    
    sql = sql_connector('DNAPROD', stored_procedure_db)
    stored_procedure_code = sql.read_query(f"SELECT OBJECT_DEFINITION (OBJECT_ID(N'{stored_procedure_name}'))").values.flatten()[0]
    stored_procedure_code = cleanCode(stored_procedure_code)
    
    return stored_procedure_code

# preparing table for data lineage documentation

In [73]:
# get list of all tables and views from all databases

databases = sql.read_query('SELECT name FROM sys.databases').values.flatten()
all_server_tables = pd.DataFrame()
all_server_views = pd.DataFrame()
for db in tqdm(databases):
    sql = sql_connector('DNAPROD', db)
    
    # get list of tables
    query = "SELECT (SCHEMA_NAME(schema_id) + '.' + name) as TableName FROM sys.tables"
    tables_new = sql.read_query(query)
    all_server_tables = pd.concat((all_server_tables, tables_new))
    
    # get list of views
    query = "SELECT (schema_name(schema_id) + '.' + name) as ViewName FROM sys.views"
    views_new = sql.read_query(query)
    all_server_views = pd.concat((all_server_views, views_new))
    
all_server_views['ViewName'] = all_server_views.ViewName.apply(lambda x: x.lower()) 
all_server_tables['TableName'] = all_server_tables.TableName.apply(lambda x: x.lower()) 

all_server_tables = all_server_tables.reset_index(drop = True)
all_server_views = all_server_views.reset_index(drop = True)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 25/25 [00:31<00:00,  1.26s/it]


In [97]:
# get list of all stored procedures

sql = sql_connector('DNAPROD', 'Stage')
stored_procedures = pd.DataFrame()
for db in tqdm(databases):
    query = f"""
    SELECT 
        (specific_catalog + '.' + specific_schema + '.' + specific_name) as 'procedureName'
    FROM 
        {db}.INFORMATION_SCHEMA.ROUTINES
    WHERE 
        ROUTINE_TYPE = 'PROCEDURE'
    """

    df = sql.read_query(query)
    stored_procedures = pd.concat((stored_procedures, df))
    
stored_procedures['procedureName'] = stored_procedures.procedureName.apply(lambda x: x.lower())
stored_procedures['code'] = stored_procedures.procedureName.apply(lambda x: getProcedureCode(x))
stored_procedures = stored_procedures.reset_index(drop = True)
stored_procedures.to_json('stored_procedures.json')

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 25/25 [00:10<00:00,  2.29it/s]


In [135]:
stored_procedures

Unnamed: 0,procedureName,code
0,master.dbo.sp_allnightlog,create procedure dbo.sp_allnightlog ...
1,master.dbo.sp_allnightlog_setup,create procedure dbo.sp_allnightlog_setup ...
2,master.dbo.sp_blitz,create procedure dbo.sp_blitz @help tin...
3,master.dbo.sp_blitzanalysis,create procedure dbo.sp_blitzanalysis ( @he...
4,master.dbo.sp_blitzbackups,create procedure dbo.sp_blitzbackups @hel...
...,...,...
889,hrg.employee.simplelist,create procedure employee.simplelist as se...
890,hrg.employee.mediumlist,create procedure employee.mediumlist as ...
891,hrg.employee.leaders,--get list of all employees create proc...
892,hrg.employee.icims,--get data from unified table create proc...


In [4]:
stored_procedures = pd.read_json('stored_procedures.json')

In [5]:
procedures_outputs = []
missed_scripts = []
for i, (procedure_name, script) in tqdm(stored_procedures.iterrows()):
    if (
        script == None \
        or 'into ' not in script \
        or 'from' not in script
    ):
        missed_scripts.append(script)
        continue
        
    output_table = script.split('into ')[1].split()[0]
    if (
        output_table not in all_server_tables.TableName.values
        and output_table not in all_server_views.ViewName.values
    ):
        continue
    
    input_tables = []
    script_words = script.split()
    for i, word in enumerate(script_words):
        if word in ['from', 'join']:
            if ((
                    script_words[i+1] in all_server_tables.TableName.values
                    or script_words[i+1] in all_server_views.ViewName.values
                )
                and script_words[i+1] not in input_tables
            ):
                input_tables.append(script_words[i+1])
    
    for input_table in input_tables:
        procedures_outputs.append([input_table, procedure_name, output_table])
        
procedures_outputs = pd.DataFrame(procedures_outputs, columns = ['input_table', 'procedure_name', 'output_table'])

0it [00:00, ?it/s]


NameError: name 'all_server_tables' is not defined

In [180]:
procedures_outputs

Unnamed: 0,input_table,procedure_name,output_table
0,msdb.dbo.log_shipping_primaries,msdb.dbo.sp_add_log_shipping_secondary,msdb.dbo.log_shipping_secondaries
1,msdb.dbo.sysmail_profile,msdb.dbo.sysmail_add_profile_sp,msdb.dbo.sysmail_profile
2,msdb.dbo.sysjobs_view,msdb.dbo.sp_sqlagent_log_jobhistory,msdb.dbo.sysjobhistory
3,msdb.dbo.sysjobsteps,msdb.dbo.sp_sqlagent_log_jobhistory,msdb.dbo.sysjobhistory
4,msdb.dbo.sysoperators,msdb.dbo.sp_sqlagent_log_jobhistory,msdb.dbo.sysjobhistory
...,...,...,...
125,stage.rws_salesforce.order,hrg.commission.addmanualorderallocation,hrg.commission.manualorderallocation
126,stage.rws_salesforce.order_item,hrg.commission.addmanualorderallocation,hrg.commission.manualorderallocation
127,stage.rws_salesforce.vw_account,hrg.commission.addmanualorderallocation,hrg.commission.manualorderallocation
128,stage.rws_salesforce.budgetrates,hrg.commission.addmanualorderallocation,hrg.commission.manualorderallocation


# algorithm for creating data lineage

In [77]:
procedures_outputs = pd.DataFrame([
    ['table_10', 'proc_00', 'table_00'],
    ['table_11', 'proc_00', 'table_00'],
    
    ['table_20', 'proc_10', 'table_10'],
    ['table_21', 'proc_10', 'table_10'],
    
    ['table_22', 'proc_11', 'table_11'],
    ['table_23', 'proc_11', 'table_11'],
    
    ['table_30', 'proc_20', 'table_21'],
    ['table_31', 'proc_20', 'table_21']
], columns = ['input_table', 'procedure', 'output_table'])

procedures_outputs

Unnamed: 0,input_table,procedure,output_table
0,table_10,proc_00,table_00
1,table_11,proc_00,table_00
2,table_20,proc_10,table_10
3,table_21,proc_10,table_10
4,table_22,proc_11,table_11
5,table_23,proc_11,table_11
6,table_30,proc_20,table_21
7,table_31,proc_20,table_21


In [97]:
final_table = 'table_00'

In [175]:
iteration = 0
if iteration == 0:
    data_lineage_tables = [[[final_table]]]
    procedure = procedures_outputs[procedures_outputs.output_table == final_table].procedure.values[0]
    data_lineage_procedures = [[[procedure]]]
    
data_lineage_tables.append([])
data_lineage_procedures.append([])

for table in np.array(data_lineage_tables[-2]).flatten():
    input_tables = procedures_outputs[procedures_outputs.output_table == table].input_table.values.tolist()
    data_lineage_tables[-1].append(input_tables)
    
    data_lineage_procedures[-1].append([])
    if len(input_tables) > 0:
        for input_table in input_tables:
            procedure = procedures_outputs[procedures_outputs.output_table == input_table].procedure.unique().tolist()
            
            if len(procedure) > 0:
                data_lineage_procedures[-1][-1].append(procedure[0])
            else:
                data_lineage_procedures[-1][-1].append(None)

In [176]:
data_lineage_tables

[[['table_00']], [['table_10', 'table_11']]]

In [177]:
data_lineage_procedures

[[['proc_00']], [['proc_10', 'proc_11']]]

In [178]:
iteration = 1
if iteration == 0:
    data_lineage_tables = [[[final_table]]]
    procedure = procedures_outputs[procedures_outputs.output_table == final_table].procedure.values[0]
    data_lineage_procedures = [[[procedure]]]
    
data_lineage_tables.append([])
data_lineage_procedures.append([])

for table in np.array(data_lineage_tables[-2]).flatten():
    input_tables = procedures_outputs[procedures_outputs.output_table == table].input_table.values.tolist()
    data_lineage_tables[-1].append(input_tables)
    
    data_lineage_procedures[-1].append([])
    if len(input_tables) > 0:
        for input_table in input_tables:
            procedure = procedures_outputs[procedures_outputs.output_table == input_table].procedure.unique().tolist()
            
            if len(procedure) > 0:
                data_lineage_procedures[-1][-1].append(procedure[0])
            else:
                data_lineage_procedures[-1][-1].append(None)

In [179]:
data_lineage_tables

[[['table_00']],
 [['table_10', 'table_11']],
 [['table_20', 'table_21'], ['table_22', 'table_23']]]

In [180]:
data_lineage_procedures

[[['proc_00']], [['proc_10', 'proc_11']], [[None, 'proc_20'], [None, None]]]

In [181]:
iteration = 2
if iteration == 0:
    data_lineage_tables = [[[final_table]]]
    procedure = procedures_outputs[procedures_outputs.output_table == final_table].procedure.values[0]
    data_lineage_procedures = [[[procedure]]]
    
data_lineage_tables.append([])
data_lineage_procedures.append([])

for table in np.array(data_lineage_tables[-2]).flatten():
    input_tables = procedures_outputs[procedures_outputs.output_table == table].input_table.values.tolist()
    data_lineage_tables[-1].append(input_tables)
    
    data_lineage_procedures[-1].append([])
    if len(input_tables) > 0:
        for input_table in input_tables:
            procedure = procedures_outputs[procedures_outputs.output_table == input_table].procedure.unique().tolist()
            
            if len(procedure) > 0:
                data_lineage_procedures[-1][-1].append(procedure[0])
            else:
                data_lineage_procedures[-1][-1].append(None)

In [182]:
data_lineage_tables

[[['table_00']],
 [['table_10', 'table_11']],
 [['table_20', 'table_21'], ['table_22', 'table_23']],
 [[], ['table_30', 'table_31'], [], []]]

In [183]:
data_lineage_procedures

[[['proc_00']],
 [['proc_10', 'proc_11']],
 [[None, 'proc_20'], [None, None]],
 [[], [None, None], [], []]]

# algorithm for creating data lineage v2

In [22]:
procedures_outputs = pd.DataFrame([
    ['table_10', 'proc_00', 'table_00'],
    ['table_11', 'proc_00', 'table_00'],
    
    ['table_20', 'proc_10', 'table_10'],
    ['table_21', 'proc_10', 'table_10'],
    
    ['table_22', 'proc_11', 'table_11'],
    ['table_23', 'proc_11', 'table_11']
], columns = ['input_table', 'procedure', 'output_table'])

procedures_outputs

Unnamed: 0,input_table,procedure,output_table
0,table_10,proc_00,table_00
1,table_11,proc_00,table_00
2,table_20,proc_10,table_10
3,table_21,proc_10,table_10
4,table_22,proc_11,table_11
5,table_23,proc_11,table_11


In [23]:
table = 'table_00'

In [20]:
def create_data_lineage(procedures_outputs, table):
    """
    table argument indicates name of a table for which we want to create data lineage indicating
    how this table has been created
    
    procedures_outputs arguments indicates a table which shows what are inputs and outputs of procedures
    """
    data_lineage = {table: {}}
    df = procedures_outputs[procedures_outputs.output_table == table]

    if len(df) > 0:
        procedure = df.procedure.values[0]
        input_tables = df.input_table.values
        data_lineage[table][procedure] = []

        for input_table in input_tables:
            data_lineage[table][procedure].append(
                create_data_lineage(procedures_outputs, input_table)
            )
    else:
        return {table: None}
    
    return data_lineage

In [26]:
create_data_lineage(procedures_outputs, table)

{'table_00': {'proc_00': [{'table_10': {'proc_10': [{'table_20': None},
      {'table_21': None}]}},
   {'table_11': {'proc_11': [{'table_22': None}, {'table_23': None}]}}]}}

In [28]:
{'table_00': {'proc_00': [
    {'table_10': {'proc_10': [
        {'table_20': None},
        {'table_21': None}
    ]}},
    {'table_11': {'proc_11': [
        {'table_22': None}, 
        {'table_23': None}
    ]}}
]}}