In [None]:
from datetime import datetime, timedelta
import time

import os
import shutil
import subprocess

import re

import sqlparse

import json
import pandas as pd

from google.cloud import storage
from google.cloud import bigquery

import sqlalchemy

import logging

In [None]:
def read_contents_file(file_path, encoding_scheme='cp1252', read_lines=False):
    inputfile = open(file_path, 'r', encoding=encoding_scheme)
    if read_lines:
        return inputfile.readlines()
    else:
        return inputfile.read()

In [None]:
#Write file to local directory
def write_file_local(path,file_data,is_list=False):
    
    with open(path, 'w') as file:
        if is_list:
            file_string = '\n'.join(file_data)
        else:
            file_string = file_data
        file.write(file_string)

In [None]:
def get_unique_index(database, table, user_name, password, host_name):
    if table.endswith('_mv') :
            table = table[:-3]
            print(table)
    if table.endswith('_old') :
            table = table[:-4]
            print(table)
    td_engine = sqlalchemy.create_engine('teradatasql://' + host_name + '/?user=' + user_name + '&password=' + password + '&logmech=KRB5')
    query = f"SELECT columnname FROM dbc.indicesV where databasename = '{database}' and tablename = '{table}'   and uniqueflag = 'Y' order by columnposition" 
    try:
        results_df = pd.read_sql(query, td_engine)
    except Exception as e1:
        print("Exception occured when getting unique index")            
        pass
    return results_df

In [None]:
def is_comment(line):
    return line.strip().startswith("--") or line.strip().startswith("/*")

In [None]:
def gen_duplicate_checks(convertedsql_path, postdupcheckssql_path, user_name, password, host_name):

    print("Duplicate Checks Generation has begun") 
    
    if os.path.exists(postdupcheckssql_path):
        shutil.rmtree(postdupcheckssql_path, ignore_errors=True)
    os.makedirs(postdupcheckssql_path)
    print("Created folder : {}".format(postdupcheckssql_path))

    regexp_EOF = re.compile(r"\bEOF\b",re.IGNORECASE | re.DOTALL)

    file_list = [filename for filename in os.listdir(f"{convertedsql_path}") if filename.endswith(".sql")]
    for filename in file_list:
        source_file_path = os.path.join(convertedsql_path, filename).lower()
        converted_sql = read_contents_file(source_file_path)

        modified_stmnt = "DECLARE DUP_COUNT INT64;\n"
        statements = converted_sql.split(';')

        in_comment_block = False

        for stmnt in statements:
            if stmnt.strip():
                stmnt =  stmnt.strip()  + ';\n'

            if 'TABLE=' in stmnt.upper().strip() \
                or 'JOB=' in stmnt.upper().strip() \
                or stmnt.strip().startswith('#') \
                or stmnt.strip().startswith('locking') \
                or not regexp_EOF.search(stmnt) is None \
                or 'FOR SESSION' in stmnt.upper().strip() :
                    stmnt = "--" + stmnt.strip() + ';\n'

            if stmnt.startswith("/*"):
                in_comment_block = True
                loc_comnt_begin = stmnt.find("/*")
            else:
                loc_comnt_begin = 0
            if stmnt.find("*/") != -1:
                loc_comnt_end = stmnt.find("*/")
                if loc_comnt_end > loc_comnt_begin:
                    in_comment_block = False

            if not in_comment_block and not is_comment(stmnt):
                if ("INSERT " in stmnt.upper() or "MERGE " in stmnt.upper()):
                    stmnt_upper = stmnt.upper()
                    tablename = stmnt_upper.split('INTO')[1].split()[0]
                    tablenamesplits = tablename.split(".")
                    if len(tablenamesplits) == 3 :
                        tdtable = tablenamesplits[-1]
                        tdschema = tablenamesplits[-2]
                    elif len(tablenamesplits) == 2 :
                        tdtable = tablenamesplits[1]
                        tdschema = tablenamesplits[0]

                    uniquecolumn_df = get_unique_index(tdschema, tdtable, user_name, password, host_name)

                    if not uniquecolumn_df.empty:
                        col = ''
                        for column in uniquecolumn_df['ColumnName']:
                            col = col + str(column).lower() + ','
                        # print("Unique constraint Columns Found - " + col)
                        fullyqualifiedbqtablename = '`' + project_id + '`.' + tdschema+'.'+ tdtable
                        fullyqualifiedbqtablename = fullyqualifiedbqtablename.lower()
                        dup_query = "SET DUP_COUNT = (\nselect count(*)\nfrom (\nselect\n"+ col[:-1] +"\nfrom "+ fullyqualifiedbqtablename + " group by " + col[:-1] + "\nhaving count(*) > 1\n)\n);\nIF DUP_COUNT <> 0 THEN ROLLBACK TRANSACTION; RAISE USING MESSAGE = concat('Duplicates are not allowed in the table " + fullyqualifiedbqtablename + "'); ELSE COMMIT TRANSACTION; END IF;\n"
                        modified_stmnt += "BEGIN TRANSACTION;\n" + stmnt +  dup_query  
                    else:
                        modified_stmnt +=  stmnt  
                else:
                    modified_stmnt +=  stmnt 
            else:
                modified_stmnt += '\n' +  stmnt + '\n'
        
        formattedsql = modified_stmnt.strip()
        formattedsql = sqlparse.format(formattedsql, reindent=True, keyword_case='upper')
        
        copy_file_path = os.path.join(postdupcheckssql_path, filename).lower()
        write_file_local(copy_file_path,formattedsql)

    print("Duplicate Checks Generation is completed")

In [None]:
def postprocess_bqsqls(postdupcheckssql_path, postprocess_sql_path):

    print("Post Processing has begun") 
    
    if os.path.exists(postprocess_sql_path):
        shutil.rmtree(postprocess_sql_path, ignore_errors=True)
    os.makedirs(postprocess_sql_path)
    print("Created folder : {}".format(postprocess_sql_path))

    file_list = [filename for filename in os.listdir(f"{postdupcheckssql_path}") if filename.endswith(".sql")]
    for filename in file_list:
        source_file_path = os.path.join(postdupcheckssql_path, filename).lower()
        converted_sql = read_contents_file(source_file_path)

        for i in range(len(post_process_find_replace_list)):
            converted_sql = converted_sql.replace(post_process_find_replace_list[i]["search"], post_process_find_replace_list[i]['replace'])

        for i in range(len(post_process_regex_find_replace_list)):
            regexp_pattern = re.compile(post_process_regex_find_replace_list[i]["search"], re.IGNORECASE)
            regexp_repl_pattern = post_process_regex_find_replace_list[i]["replace"]
            converted_sql = re.sub(regexp_pattern, regexp_repl_pattern, converted_sql)

        formattedsql = converted_sql.strip()
        formattedsql = sqlparse.format(formattedsql, reindent=True, keyword_case='upper')

        for i in range(len(post_process_find_replace_list)):
            formattedsql = formattedsql.replace(post_process_find_replace_list[i]["search"], post_process_find_replace_list[i]['replace'])  

        for i in range(len(post_process_regex_find_replace_list)):
            regexp_pattern = re.compile(post_process_regex_find_replace_list[i]["search"], re.IGNORECASE)
            regexp_repl_pattern = post_process_regex_find_replace_list[i]["replace"]
            formattedsql = re.sub(regexp_pattern, regexp_repl_pattern, formattedsql)

        formattedsql = formattedsql.strip(' ;')
        copy_file_path = os.path.join(postprocess_sql_path, filename).lower()
        # print(copy_file_path)
        write_file_local(copy_file_path,formattedsql)

    print("Post Processing is completed") 

In [None]:
def execute_sql(postprocess_sql_path):
    print(f"executing SQL files in  {postprocess_sql_path}" )
    client = bigquery.Client(project=project_id)

    for file_name in os.listdir(postprocess_sql_path):
        if file_name.endswith('.sql'):
            sql_file_path = os.path.join(postprocess_sql_path, file_name)
            sql_query = read_contents_file(sql_file_path)
            for i in range(len(execution_time_find_replace_list)):
                sql_query = sql_query.replace(execution_time_find_replace_list[i]["search"], execution_time_find_replace_list[i]['replace'])
            try:         
                client.query(sql_query, project=project_id, location='US').result()
                logging.info(f"SQL file {file_name} executed successfully.")
                print(f"SQL file {file_name} executed successfully.")
            except Exception as e:
                logging.error(f"Error executing SQL file {file_name}: {e}")
                print(f"Error executing SQL file {file_name}: {e}")

In [None]:
def print_execution_results(logfilename):
    total_lines = 0
    success_lines = 0
    failure_lines = 0
    missing_dbobjects = 0 
    dup_data_issues = 0

    try:
        with open(logfilename, "r") as file:
            print("=========Errors================")
            for line in file:
                if "SQL file" in line:
                    total_lines += 1
                    if "executed successfully" in line:
                        success_lines += 1
                    elif "Not found" in line:
                        missing_dbobjects  += 1    
                        print(line.strip())                    
                        failure_lines += 1
                    elif "Duplicates" in line or "at most one source row" in line :
                        dup_data_issues  += 1    
                        print(line.strip())                    
                        failure_lines += 1
                    else:
                        failure_lines += 1
                        print(line.strip())

        print("=========execution_results================")
        print("Total SQL files executed:", total_lines)
        print("SQL files executed successfully:", success_lines)
        print("SQL files failed with errors:", failure_lines)
        print(" --SQL files with Missing db objects:", missing_dbobjects)
        print(" --SQL files with Dup Data Issues:", dup_data_issues)


        logging.info("=========execution_results================")
        logging.info(f"Total SQL files executed: {total_lines}")
        logging.info(f"SQL files executed successfully: {success_lines}")
        logging.info(f"SQL files failed with errors: {failure_lines}")
        logging.info(f" --SQL files with Missing db objects: {missing_dbobjects}")
        logging.info(f" --SQL files with Dup Data Issues: {dup_data_issues}")
                     
    except FileNotFoundError:
        print(f"Error: File '{logfilename}' not found.")
    except Exception as e:
        print("An error occurred:", str(e))

In [None]:
def productionize_sqls(postprocessqls, prodready_bqsqls):

    if os.path.exists(prodready_bqsqls):
        shutil.rmtree(prodready_bqsqls, ignore_errors=True)
    os.makedirs(prodready_bqsqls)
    print("Created folder : {}".format(prodready_bqsqls))

    df_job_source = pd.read_csv("config\script_source_map.csv", index_col=None)
    
    for file in os.listdir(f"{postprocessqls}"):
        if file.endswith('.sql'):
            source_file_path = os.path.join(postprocessqls, file)
            post_process_sql = read_contents_file(source_file_path)
            
            filename = file.replace(".sql","").lower()
            
            df_job_source_match = df_job_source[df_job_source["script_name"]==filename]
            if df_job_source_match.empty:
                source_per_job_name = ""
            else:
                source_per_job_name = df_job_source_match["source"].iloc[0]

            for i in range(len(production_regex_find_replace_list)):
                regexp_pattern = re.compile(production_regex_find_replace_list[i]["search"], re.IGNORECASE | re.DOTALL)
                regexp_repl_pattern = production_regex_find_replace_list[i]["replace"]
                post_process_sql = re.sub(regexp_pattern, regexp_repl_pattern, post_process_sql)

            for i in range(len(production_parms_find_replace_list)):
                post_process_sql = post_process_sql.replace(production_parms_find_replace_list[i]["search"], production_parms_find_replace_list[i]["replace"])  

            formattedsql = post_process_sql.strip()
            formattedtext = sqlparse.format(formattedsql, reindent=True, keyword_case='upper', strip_comments=False)
            
            os.makedirs(f"{prodready_bqsqls}\{source_per_job_name}", exist_ok=True)
            copy_file_path = os.path.join(prodready_bqsqls, source_per_job_name, file).lower()
            write_file_local(copy_file_path, formattedtext)


In [None]:
def upload_to_dags(prodready_bqsqls):
    # print(prodready_bqsqls)
    command = 'gsutil -m cp -r ' + prodready_bqsqls + '\**\*.sql gs://' + dag_bucket_path

    try:
        completed_process = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        print(completed_process.stdout)
        print("Uploaded Dags to {}".format(dag_bucket_path))
    except subprocess.CalledProcessError as e:
        print("Error:", e)
        print(e.output)

In [None]:
dt1 = datetime.now()
# run_time = (dt1).strftime('%Y%m%d_%H%M')
run_time = "20240909_0643"

with open('config/lob_config.json') as json_lob_config:
    config = json.load(json_lob_config)

lob = config['lob']
lob_abbr = config['lob_abbr'] # lobname in BQMS Script

lob_lower = lob.strip().lower()
lob_upper = lob.strip().upper()
lob_abbr_lower = lob_abbr.strip().lower()
lob_abbr_upper = lob_abbr.strip().upper()

log_folder = config['log_folder']
log_path_folder = f"{log_folder}\{lob_abbr_lower}"

if not os.path.exists(log_path_folder):
    os.makedirs(log_path_folder)

logfilename = lob_abbr_lower + '_param_execution_' + run_time + '.log'
logfilenamepath = os.path.join(log_path_folder, logfilename)
logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(filename=logfilenamepath, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

conversion_target_folder = config['conversion_target_folder']
conversion_target_path_folder = f"{conversion_target_folder}\{lob_abbr_lower}"

post_dupchecks_bqsqls  = config['post_dupchecks_bqsqls']
post_dupchecks_bqsqls_path = f"{post_dupchecks_bqsqls}\{lob_abbr_lower}\{run_time}"

postprocessed_bqsqlsfiles = config['postprocessqls']
postprocessed_bqsqlsfiles_path = f"{postprocessed_bqsqlsfiles}\{lob_abbr_lower}\{run_time}"

prodready_bqsqls = config['prodready_bqsqls']
prodready_bqsqls_path = f"{prodready_bqsqls}\{lob_abbr_lower}\{run_time}"

project_id = config['project_id']
fns_project_id = project_id
fns_dataset = config['fns_dataset']
dag_bucket_path = config['dag_bucket_path']

user_name = config['user_name']
password = config['password']
host_name = config['host_name']

In [None]:
    # # CR Replacements

    # post_process_find_replace_list = [
    #     {'search':'!=','replace':'<>'},
    #     {'search':"','AS",'replace':"',' AS"},
    #     {'search':'syslib.length','replace':'length'},
    #     {'search':'syslib.isnumeric','replace':'`hca-hin-dev-cur-ops`.bqutil_fns.isnumeric'},
    # ]

    # post_process_regex_find_replace_list = [
    #     {'search':r"bqutil\.fn\.cw_td_strtok\([ ]*([.|a-z|_|0-9]+)[ ]*,[ ]*'(.)'[ ]*,[ ]*([0-9]+)\)",
    #     'replace':r"SPLIT(\1, '\2')[ORDINAL(\3)]"},
    #     {'search':r"td_sysfnlib\.to_number\((.*?)\)", 'replace':r"safe_cast(\1 as numeric) "},
    #     {'search':r"bqutil\.fn\.", 'replace':r"`hca-hin-dev-cur-ops`.bqutil_fns."}
    # ]

    # execution_time_find_replace_list = [
    # #    {'search':'`hca-hin-dev-cur-parallon`.edw_pub_views.','replace':'`hca-hin-dev-cur-parallon`.edw_pub_views.'},
    # ]

    # production_parms_find_replace_list = [
    #     {'search':'`hca-hin-dev-cur-ops`.edwcr_staging.','replace':'{{ params.param_cr_stage_dataset_name }}.'},        
    #     {'search':'`hca-hin-dev-cur-ops`.edwcr.','replace':'{{ params.param_cr_core_dataset_name }}.'},
    #     {'search':'`hca-hin-dev-cur-ops`.edwcr_base_views.','replace':'{{ params.param_cr_base_views_dataset_name }}.'},
    #     {'search':'`hca-hin-dev-cur-ops`.bqutil_fns.','replace':'{{ params.param_cr_bqutil_fns_dataset_name }}.'},
    #     {'search':'`hca-hin-dev-cur-ops`.auth_base_views.','replace':'{{ params.param_cr_auth_base_views_dataset_name }}.'},
    #     {'search':'`hca-hin-dev-cur-ops`.edwcr_views.','replace':'{{ params.param_cr_views_dataset_name }}.'},
    #     {'search':'`hca-hin-dev-cur-ops`.edw_pub_views.clinical_facility','replace':'{{ params.param_cr_auth_base_views_dataset_name }}.clinical_facility'}
    # ]

    # production_regex_find_replace_list = [
    #     {'search':r"DATE\(([a-z|0-9|_|.]+)\)\s*=\s*current_date\('US\/Central'\)\s*", 
    #     'replace':r"\1 >= tableload_start_time - INTERVAL 1 MINUTE "},
    #     {'search':r"([a-z|0-9|_]+.dw_last_update_date_time)\s*[<>=]\s*\(\s*SELECT\s*MAX\(etl_job_run\.job_start_date_time\).*FROM.*etl_job_run.*WHERE.*=.*'\s*\)", 
    #     'replace':r"\1 = current_date('US/Central') "},
    #     {'search':r"\(\s*SELECT\s*MAX\(etl_job_run\.job_start_date_time\).*FROM.*etl_job_run.*WHERE.*=.*'\s*\)", 
    #     'replace':r" tableload_start_time - INTERVAL 1 MINUTE "}
    # ]

In [None]:
# IM Replacements

post_process_find_replace_list = [
    {'search':'!=','replace':'<>'},
    {'search':"','AS",'replace':"',' AS"},
    {'search':'syslib.length','replace':'length'},
    {'search':'syslib.isnumeric','replace':'`hca-hin-dev-cur-pub`.bqutil_fns.isnumeric'},
]

post_process_regex_find_replace_list = [
    {'search':r"bqutil\.fn\.cw_td_strtok\([ ]*([.|a-z|_|0-9]+)[ ]*,[ ]*'(.)'[ ]*,[ ]*([0-9]+)\)",
    'replace':r"SPLIT(\1, '\2')[ORDINAL(\3)]"},
    {'search':r"td_sysfnlib\.to_number\((.*?)\)", 'replace':r"safe_cast(\1 as numeric) "},
    {'search':r"bqutil\.fn\.", 'replace':r"`hca-hin-dev-cur-pub`.bqutil_fns."},
    {'search':r"edw_pub_views", 'replace':r"`hca-hin-dev-cur-comp`.auth_base_views"},
    {'search':r"edwcdm_base_views", 'replace':r"`hca-hin-dev-cur-comp`.auth_base_views"},
    {'search':r"edwdw_base_views", 'replace':r"`hca-hin-dev-cur-comp`.auth_base_views"}
]

execution_time_find_replace_list = [
#    {'search':'`hca-hin-dev-cur-parallon`.edw_pub_views.','replace':'`hca-hin-dev-cur-parallon`.edw_pub_views.'},
]

production_parms_find_replace_list = [
        {'search':'edwim_staging.','replace':'{{ params.param_im_stage_dataset_name }}.'},        
        {'search':'edwim.','replace':'{{ params.param_im_core_dataset_name }}.'},
        {'search':'edwim_base_views.','replace':'{{ params.param_im_base_views_dataset_name }}.'},
        {'search':'edwim_views.','replace':'{{ params.param_im_views_dataset_name }}.'},
        {'search':'auth_base_views.','replace':'{{ params.param_im_auth_base_views_dataset_name }}.'},
        {'search':'`hca-hin-dev-cur-pub`.bqutil_fns.','replace':'{{ params.param_im_bqutil_fns_dataset_name }}.'}
]

production_regex_find_replace_list = [
    {'search':r"DATE\(([a-z|0-9|_|.]+)\)\s*=\s*current_date\('US\/Central'\)\s*", 
    'replace':r"\1 >= tableload_start_time - INTERVAL 1 MINUTE "},
    {'search':r"([a-z|0-9|_]+.dw_last_update_date_time)\s*[<>=]\s*\(\s*SELECT\s*MAX\(etl_job_run\.job_start_date_time\).*FROM.*etl_job_run.*WHERE.*=.*'\s*\)", 
    'replace':r"\1 = current_date('US/Central') "},
    {'search':r"\(\s*SELECT\s*MAX\(etl_job_run\.job_start_date_time\).*FROM.*etl_job_run.*WHERE.*=.*'\s*\)", 
    'replace':r" tableload_start_time - INTERVAL 1 MINUTE "}
]

In [None]:
#get the DDLs for the tables from teradata 
def parameterize_sql_scripts():

    # Generate Duplicate Checks
    gen_duplicate_checks(conversion_target_path_folder, post_dupchecks_bqsqls_path, user_name, password, host_name)

    # Postprocess SQL Files
    postprocess_bqsqls(post_dupchecks_bqsqls_path, postprocessed_bqsqlsfiles_path)
    
    # Execute Postprocessed SQL FIles
    execute_sql(postprocessed_bqsqlsfiles_path)

    # Print Execution Results
    print_execution_results(logfilenamepath)

    # Parameterize the sqls
    productionize_sqls(postprocessed_bqsqlsfiles_path, prodready_bqsqls_path)

    # Upload the Templated SQLs to DAG
    # upload_to_dags(output_path_folder)
    
    dt2 = datetime.now()
    print(dt2-dt1)

In [None]:
print("Begin of Processing")

parameterize_sql_scripts()

print("End of Processing")