# PLSQL Data Lineage tool

With this simple code, you can visualize your PLSQL code to help you understand the flow of data in your project.
#### what you need:

libraries:
- graphviz (conda install -c anaconda graphviz)<br>
- pydot (conda install -c anaconda pydot)<br>
<br>
- your plsql code in 'plsql.txt' file in your root directory. <br>
- (optinal) List of tables used in PL/SQL 'tables.txt' - if you don't name the schema in your code explicitly <br>

Don't forget to first define the functions and lists in the code below.

In [1]:
from IPython.display import Image, display
import pydot
import os

from tqdm.notebook import tqdm
import time

ModuleNotFoundError: No module named 'pydot'

In [None]:
f = open('plsql.txt', encoding="utf8") #open your plsql code
line = f.readline() # read the first line

In [None]:
G = pydot.Dot(graph_type="digraph")
G.set_graph_defaults(compound='true',
                                              #splines='ortho',
                                              rankdir='LR')
# see https://graphviz.org/doc/info/attrs.html#d:shape

In [None]:
child_tab = []

while line:
        # strip trailing spaces and newline
        line1 = line.rstrip()  
        line = line1.upper()
        list_tab = [] 
        
        # process the line
        
        #0. get rid of execute immediate (truncate or CTAS - we do not support that)
        
        if line.find('EXECUTE IMMEDIATE') >= 0:
            # if found, go to next line
            line2 = f.readline()
            line1 = line2.rstrip()  
            line = line1.upper() 
            
        #1.a get rid of commented code --
        if line.find('--') >= 0:
            line = line[:line.find('--')]
        
        #1.b get rid of commented code /*
        
        if line.find('/*') >= 0:
            line_orig = line # find /* on the same line 
            line = line[:line.find('/*')]
                     
            list_tab = find_table(line)
            if type(list_tab) == list:

                if find_beggining(line) == 1:
                    #print(line)
                    #print(list_tab)
                    if len(list_tab)>0:

                        child_tab = list_tab[0]
                        if child_tab.find('OWNER_DWH') >= 0:
                            node = pydot.Node(child_tab, style="filled", fillcolor="green")
                        elif child_tab.find('OWNER_INT') >= 0:
                            node = pydot.Node(child_tab, style="filled", fillcolor="azure")
                        else:
                            node = pydot.Node(child_tab, style="filled", fillcolor="yellow")
                        G.add_node(node)
                else:
                    for i in list_tab:
                        #print(child_tab)
                        if len(child_tab) > 0:
                            parent_tab = i
                            if parent_tab.find('OWNER_DWH') >= 0:
                                node = pydot.Node(parent_tab, style="filled", fillcolor="green")
                            elif parent_tab.find('OWNER_INT') >= 0:
                                node = pydot.Node(parent_tab, style="filled", fillcolor="azure")
                            else:
                                node = pydot.Node(parent_tab, style="filled", fillcolor="yellow")

                            G.add_node(node)
                            #print(parent_tab, child_tab)
                            #print(line)
                            edge = pydot.Edge(parent_tab,child_tab)
                            G.add_edge(edge)
    
    
            while line_orig.find('*/') < 0:
                #print(line_orig)
                line_orig = f.readline() 
            # process the code after /*
            line1 = line_orig[line_orig.find('*/')+2:]   
            line = line1.upper()
        #2. handle the begining of the procedure or package
        
        line_upper = line.upper()
        if line_upper.find('CREATE OR REPLACE') >= 0:
            #print(line)
            line = f.readline() 
        
        # 3. process plsql code
        
        list_tab = find_table(line)
        #print(line)
        #print(list_tab)
        
        if type(list_tab) == list:
            
            if find_beggining(line) == 1:
                #print(line)
                #print(list_tab)
                if len(list_tab)>0:
                    
                    child_tab = list_tab[0]
                    if child_tab.find('OWNER_DWH') >= 0:
                        node = pydot.Node(child_tab, style="filled", fillcolor="green")
                    elif child_tab.find('OWNER_INT') >= 0:
                        node = pydot.Node(child_tab, style="filled", fillcolor="azure")
                    elif child_tab.find('DM_REPORTING') >= 0:
                        node = pydot.Node(child_tab, style="filled", fillcolor="darkturquoise")
                    else:
                        node = pydot.Node(child_tab, style="filled", fillcolor="yellow")
                    G.add_node(node)
            else:
                for i in list_tab:
                    #print(child_tab)
                    if len(child_tab) > 0:
                        parent_tab = i
                        if parent_tab.find('OWNER_DWH') >= 0:
                            node = pydot.Node(parent_tab, style="filled", fillcolor="green")
                        elif parent_tab.find('OWNER_INT') >= 0:
                            node = pydot.Node(parent_tab, style="filled", fillcolor="azure")
                        elif parent_tab.find('DM_REPORTING') >= 0:
                            node = pydot.Node(parent_tab, style="filled", fillcolor="darkturquoise")
                        else:
                            node = pydot.Node(parent_tab, style="filled", fillcolor="yellow")

                        G.add_node(node)
                        #print(parent_tab, child_tab)
                        #print(line)
                        edge = pydot.Edge(parent_tab,child_tab)
                        G.add_edge(edge)
        # go to next line
        line = f.readline() 

In [None]:
f.close()

In [None]:
im = Image(G.create_png())
display(im)

list of schemas where to look for tables

In [None]:
schema = ['AP_SALES.', 'AP_RISK.', 'OWNER_DWH.', 'OWNER_INT.', 'DM_REPORTING.', 'OWNER_STG.', 'STG_OSA', 'DW_WRK', 'AP_MARK', 'AP_COLL'] 

import list of tables from db catalog to look for (optional)

select *
  from dba_dependencies t
 where owner = 'AP_RISK'
 AND NAME = 'FS_DENNI_REPORT_NEW'
 AND REFERENCED_TYPE = 'TABLE'
 ;
 

In [None]:
tables = []
t = open('tables.txt')
line = t.readline()
while line:
    line = line.rstrip() 
    tables.append(line.upper())
    line = t.readline()
t.close()

Find table based on defined schema or list of tables

In [None]:
def find_table(text):
    list_tab = []
    for t in schema:
        pos_schema = text.find(t)
        #print(pos_schema)
        if pos_schema > 0: 
            text_schema = text[pos_schema:]
            pos_sp = text_schema.find(' ')
            #print(pos_sp)
            if pos_sp > 0:
                tab = text_schema[:pos_sp]
            else:
                tab = text_schema
            #get rid of ')' at the end of a line
            if tab.find(')') > 0:
                #print(tab)
                tab = tab[:tab.find(')')]
            if tab.find('(') > 0:
                tab = tab[:tab.find('(')]
                #eliminate sequence
            if tab.find('NEXTVAL') < 0:
                list_tab.append(tab)

    if len(list_tab) > 0:
        return list_tab
    
    #look for provided tables
    
    for t in tables:
        if text.find(t) >= 0:
            # blank space or new line is required 
            if text[text.find(t)+len(t):text.find(t)+len(t)+1] == ' ' or text[text.find(t)+len(t):text.find(t)+len(t)+1] == '':
                list_tab.append(t)
            #print(t)
    if len(list_tab) >= 0:
        return list_tab

detect the beggining of a command 

In [None]:
sql_start = ['INSERT INTO', 'CREATE TABLE', 'MERGE ', 'UPDATE ', 'INTO']

In [None]:
def find_beggining(text):
    for t in sql_start:
        pos_start = text.find(t)
        if pos_start >= 0:
            return 1
    return 0