In [3]:
import os
import glob
import pandas as pd
import numpy as np
import pyodbc 
 
import datetime, time 

import multiprocessing
import time

In [4]:
import itertools
import sqlparse

In [5]:
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML

In [6]:
def is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() == 'SELECT':
            return True
    return False

In [7]:
def extract_from_part(parsed):
    from_seen = False
    for item in parsed.tokens:
        if item.is_group:
            for x in extract_from_part(item):
                yield x
        if from_seen:
            if is_subselect(item):
                for x in extract_from_part(item):
                    yield x
            elif item.ttype is Keyword and item.value.upper() in ['ORDER', 'GROUP', 'BY', 'HAVING', 'GROUP BY']:
                from_seen = False
                StopIteration
            else:
                yield item
        if item.ttype is Keyword and item.value.upper() == 'FROM':
            from_seen = True

In [8]:
def extract_table_identifiers(token_stream):
    for item in token_stream:
        if isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                value = identifier.value.replace('"', '').lower()                
                yield value
        elif isinstance(item, Identifier):
            value = item.value.replace('"', '').lower()
            yield value

In [9]:
def extract_tables(sql):
    # let's handle multiple statements in one sql string
    extracted_tables = []
    statements = list(sqlparse.parse(sql))
    for statement in statements:
        if statement.get_type() != 'UNKNOWN':
            stream = extract_from_part(statement)
            extracted_tables.append(set(list(extract_table_identifiers(stream))))
    return list(itertools.chain(*extracted_tables))

In [10]:
def query_dep_dict(dire,folder):
    #print(dire+'\\'+folder+"\\*.sql")
    sqlFiles=glob.glob(dire+'\\'+folder+"\\*.sql")
    #print(sqlFiles)
    tmpdict={}
    for file in sqlFiles:
        #print(file)
        FileName=os.path.basename(file)
        TableName=FileName.replace('.sql','')
        #print(FileName)
        #Open and read the file as a single buffer
        fd = open(file, 'r')
        sql = fd.read()
        fd.close()
        sql=sql.replace('`','')
        #print(sql)
        tables=extract_tables(sql)
        #print(tables)
        tab = [x.split(" ",1)[0] for x in tables  if '.' in x]
        tab.sort()
        #print(tab)
        #tmpdict[folder+'.'+FileName+' Query Depends On']=tab
        #tmpdict[folder+'.'+TableName+' Table Depends On']=tab
        tmpdict[folder+'.'+TableName]=tab
    #print(tmpdict)
    return tmpdict

In [11]:
def formatData(t,s):
    if not isinstance(t,dict) and not isinstance(t,list):
        print("\t"*s+str(t))
    else:
        for key in t:
            print("\t"*s+str(key))
            if not isinstance(t,list):
                formatData(t[key],s+1)

In [12]:
def dependency_table():
    newdepdict={}
    folders=['tmp','final']
    for f in folders:
        depdict=query_dep_dict("D:\SephoraTest\\",f)
        newdepdict.update(depdict)
    #print(newdepdict)
    
    tmpdepdict={}
    for key, value in newdepdict.items():
        for v in value:
            if 'tmp.' in v:
                tmpdepdict[key]=value
    
    rawdepdict = { k : newdepdict[k] for k in set(newdepdict) - set(tmpdepdict) }
    
    tmpdepdf=pd.DataFrame.from_dict(tmpdepdict,orient='index')
    
    rawdepdf=pd.DataFrame.from_dict(rawdepdict,orient='index')
    rawdepdf.insert(0, 'SeqNum', range(1, len(rawdepdf)+1))
    maxSeqNum=rawdepdf['SeqNum'].max() # Max Sequence Number
    
    tmpdepdf.insert(0, 'SeqNum', range(maxSeqNum+1, maxSeqNum+len(tmpdepdf)+1))
    
    df=rawdepdf.append(tmpdepdf).fillna(np.nan)
    
    return df,rawdepdf,tmpdepdf

In [13]:
dependencyChart,rawfolderdeptab,tmpfolderdeptab=dependency_table()
dependencyChart

Unnamed: 0,SeqNum,0,1,2,3
tmp.product_images,1,raw.pictures,raw.products,,
tmp.product_categories,2,raw.categories,raw.products,,
tmp.item_purchase_prices,3,raw.purchase_items,raw.purchase_line_items,,
tmp.variant_images,4,raw.pictures,raw.variants,,
tmp.inventory_items,5,raw.inventory_items,,,
tmp.products,6,raw.products,tmp.product_categories,tmp.product_images,
tmp.variants,7,raw.variants,tmp.inventory_items,tmp.item_purchase_prices,tmp.variant_images
final.products,8,tmp.products,tmp.variants,,


In [14]:
# if __name__ == '__main__':
#     newdepdict={}
#     folders=['tmp','final']
#     for f in folders:
#         depdict=query_dep_dict("D:\SephoraTest\\",f)
#         newdepdict.update(depdict)
#     print(newdepdict)
#         #formatData(depdict,0)

In [15]:
# tmpdepdict={}
# for key, value in newdepdict.items():
#     for v in value:
#         if 'tmp.' in v:
#             tmpdepdict[key]=value

In [16]:
# rawdepdict = { k : newdepdict[k] for k in set(newdepdict) - set(tmpdepdict) }

In [17]:
# tmpdepdf=pd.DataFrame.from_dict(tmpdepdict,orient='index')

In [18]:
# rawdepdf=pd.DataFrame.from_dict(rawdepdict,orient='index')
# rawdepdf.insert(0, 'SeqNum', range(1, len(rawdepdf)+1))
# maxSeqNum=rawdepdf['SeqNum'].max() # Max Sequence Number

In [19]:
# tmpdepdf.insert(0, 'SeqNum', range(maxSeqNum+1, maxSeqNum+len(tmpdepdf)+1))

In [20]:
# df=rawdepdf.append(tmpdepdf).fillna(np.nan)

In [21]:
rawfolderdeptab

Unnamed: 0,SeqNum,0,1
tmp.product_images,1,raw.pictures,raw.products
tmp.product_categories,2,raw.categories,raw.products
tmp.item_purchase_prices,3,raw.purchase_items,raw.purchase_line_items
tmp.variant_images,4,raw.pictures,raw.variants
tmp.inventory_items,5,raw.inventory_items,


In [22]:
rawfolderdeptab.index

Index(['tmp.product_images', 'tmp.product_categories',
       'tmp.item_purchase_prices', 'tmp.variant_images',
       'tmp.inventory_items'],
      dtype='object')

In [23]:
def run_sql_in_sequence(df):
    Basedir="D:\SephoraTest\\"
    for Ind in df.index:
        Ind=Ind.replace('.','\\')
        sqlFileName=Basedir+Ind+'.sql'
        #print(Ind)
        print ("Connecting via ODBC")
        conn= pyodbc.connect('DSN=dsn', autocommit=True)
        print ("Connected!\n")
        start = time.time()
        with open(sqlFileName,'r') as inserts:
            sqlScript = inserts.read()
            #for statement in sqlScript.split(';'):
            print(sqlScript)
            with conn.cursor() as cur:
                cur.execute(sqlScript)
        conn.close()
        end = time.time() 
        print('Time taken for executing the script '+sqlFileName+':'+str((end - start)*1000) + ' ms')

In [24]:
run_sql_in_sequence(dependencyChart)

Connecting via ODBC
Connected!

SELECT
    pr.id As productId,
    ARRAY_AGG(
        p.url
    ) AS urls
FROM `raw.products` pr
LEFT JOIN `raw.pictures` p ON (p.id, 'product') = (p.external_id, p.type)
GROUP BY 1
Time taken for executing the script D:\SephoraTest\tmp\product_images.sql:2.000093460083008 ms
Connecting via ODBC
Connected!

SELECT
    p.id As productId,
    ARRAY_AGG(
        STRUCT(
            c.category_name AS categoryName
        )
    ) AS categories
FROM `raw.products` p
LEFT JOIN `raw.categories` c ON  p.category_id = c.id
GROUP BY 1
Time taken for executing the script D:\SephoraTest\tmp\product_categories.sql:0.0 ms
Connecting via ODBC
Connected!

SELECT
    l.sku_id AS sku_id,
    i.batch_id,
    ARRAY_AGG(
        STRUCT(
            i.purchase_price,
            i.currency
        )
    ) AS purchase_prices
FROM
    `raw.purchase_line_items` l
    INNER JOIN `raw.purchase_items` i ON l.batch_purchase_id = i.batch_id
GROUP BY 1, 2
Time taken for executing the 

In [29]:
def execute_sql_stmt(sqlScript):
    print ("Connecting via ODBC")
    conn= pyodbc.connect('DSN=dsn', autocommit=True)
    print ("Connected!\n")
    with conn.cursor() as cur:
        cur.execute(sqlScript)

def run_sql_in_parallel():
    Basedir="D:\SephoraTest\\"
    sqlsList=[]
    for Ind in rawfolderdeptab.index:
        Ind=Ind.replace('.','\\')
        sqlFileName=Basedir+Ind+'.sql'
        with open(sqlFileName,'r') as inserts:
            sqlScript = inserts.read()
            sqlsList.append(sqlScript)
            #print(sqlScript)
    # multiprocessing pool object
    pool = multiprocessing.Pool()
  
    # pool object with number of element
    pool = multiprocessing.Pool(processes=5)
    
    # map the function to the list and pass
    # function and input list as arguments
    sqlparallelrun = pool.map(execute_sql_stmt, sqlsList)

In [None]:
run_sql_in_parallel()