# Automating the DDL Scripts

1. Create ETL Table file
1. Create Stages and Pipes file
1. Create Foundation Table file
1. Create afterMigrateError file
1. Create Add Format file

Data we need from the excel mappings document (Table Info)
| Item | Example |
|-|-|
|Table Name |DW_MBR_PCP|
|Column Name |DW_MBR_PCP_REC_ID|
|Column Data Type |TEXT(1000)|
|Column Nullability |NULL or NOT NULL|
|Key Info|Primary, Unique, ~~Foreign~~|
|Comment|'Key for the table'|

---
Data created manually (General Snowflake Info)
| Item | Example |
|-|-|
|Database |ECT_DEV_ELIGIBILITY_DB|
|Schema/Prefix |ETL or FOUNDATION|
|Environment |DEV, STG, PRD|
|Format| JSON or AVRO|
|Version| 'R', 'V1', 'V2'|
|Flyway History Table| flyway_schema_history|

In [118]:
#imports
import pandas as pd
import os

## Member PCP (CDB) Data

### Acquire Data

In [119]:
# Read data from excel sheet
excel_data_df = pd.read_excel('data/excel_mapping_docs/MEMBER_PCP_MAPPING_DOC.xlsx', sheet_name='Member_pcp_dd')

In [120]:
# Snowflake table names
t_name = 'TABLE_NAME'
# Snowflake column names
column_name = 'COLUMN_NAME'
# Column datatype
datatype = 'DATATYPE'
# Info regarding Primary, Foreign and Unique Keys
key_info = 'INDEX_TYPE'
# Info on column nullability
nullability = 'NULL_OPTION'
# Column comment
column_comment = 'COLUMN_COMMENT'

# Reduce the df to just the info required.
df = excel_data_df.loc[:,[t_name, column_name, datatype, key_info, nullability, column_comment]]

df.head()

Unnamed: 0,TABLE_NAME,COLUMN_NAME,DATATYPE,INDEX_TYPE,NULL_OPTION,COLUMN_COMMENT
0,DW_MBR_PCP,DW_MBR_PCP_REC_ID,STRING(1000),PRIMARY KEY,Not Null,Key for the table . Usually composite combinat...
1,DW_MBR_PCP,DW_SYS_REF_CD,STRING(1000),UNIQUE KEY,Null,DW Source System Reference Code
2,DW_MBR_PCP,MBR_PCP_ID,STRING(1000),UNIQUE KEY,Null,Key for the table . Usually composite combinat...
3,DW_MBR_PCP,MBRSHP_SRC_ID,STRING(1000),,Null,Membership Individual Identifier
4,DW_MBR_PCP,INDV_SRC_ID,STRING(1000),,Null,Individual Identifier


### Data Cleaning

In [121]:
# Make all datatypes uppercase for uniformity. 
df[datatype] = df[datatype].str.upper()

In [122]:
# Check the datatypes
df[datatype].value_counts()

STRING(1000)     71
INTEGER          27
TIMESTAMP(3)     16
VARCHAR2(10)     13
VARIANT          10
VARCHAR(10)       7
CHAR(1)           5
DATE              4
STRING(100)       2
VARCHAR2(20)      2
VARCHAR2(100)     1
Name: DATATYPE, dtype: int64

In [123]:
# Ensure there are no quotation marks (' OR " OR \") that cause code to fail, only (\') is allowed.
print(df[column_comment][121])

# df[column_comment] = df[column_comment].str.replace("\'", "\'")
df[column_comment] = df[column_comment].str.replace("'", "\'")
df[column_comment] = df[column_comment].str.replace('"', "\'")
df[column_comment] = df[column_comment].str.replace("\"", "\'")

print(df[column_comment][121])
# [ord(c) for c in df[column_comment][121][-1]]


Provider identifier type code , Only Populated when Key Type is 'Provider ID' OR "Native Provider ID"
Provider identifier type code , Only Populated when Key Type is 'Provider ID' OR 'Native Provider ID'


In [124]:
# Change TIMESTAMP to CREATE_TIMESTAMP() - Possible future solution for value needed by Compaction logic
# df = df.replace('TIMESTAMP(3)','CURRENT_TIMESTAMP()')
# df.head(15)

#### Check dataframe datatypes against snowflake datatypes (Deprecated: Trust the excel file instead for now)

In [125]:
# snowflake_dtypes = ['NUMBER','DECIMAL','NUMERIC','INT','INTEGER','BIGINT','SMALLINT','TINYINT','BYTEINT','FLOAT','FLOAT4','FLOAT8','DOUBLE','DOUBLE PRECISION', 'REAL','VARCHAR','CHAR','CHARACTER','STRING','TEXT','BINARY','VARBINARY','BOOLEAN','DATE','DATETIME','TIME','TIMESTAMP','TIMESTAMP_LTZ','TIMESTAMP_NTZ', 'TIMESTAMP_TZ','VARIANT', 'OBJECT','ARRAY','GEOGRAPHY']

In [126]:
# df_dtypes = df[datatype].value_counts().keys().tolist()
# df_dtypes

In [127]:
# TODO: functionality to test the numbers after each datatype too.
# for datatype in df_dtypes:
#     datatype = datatype.split("(")[0]
#     if datatype not in snowflake_dtypes:
#         raise Exception(f'ERROR - "{datatype}" is not an acceptable data type in Snowflake /nSee here for a list of acceptable data types: https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html')

### Create or Replace Table output

In [128]:
def CreateUniqueKeyOutput(uk):
    '''
    Returns a string of a table unique keys for use in an SQL query.

        Parameters: 
            uk (List): A List of unique keys in the current table.

        Returns:
            uk_output (str): A string for use in an SQL (CREATE TABLE) query.
    '''
    if len(uk) > 0:
        uk_output = f',\n\t'# CONSTRAINT {table_name}_AK1 UNIQUE ('

        uk_output = f',\n\tUNIQUE('
        for key in uk:
            uk_output += ''+key+','
        uk_output = uk_output.rstrip(',')
        uk_output += ')\n),'

    return uk_output

In [129]:
def CreateForeignKeyOutput(fk): # Deprecated: Foreign Keys don't matter in Snowflake.
    if len(fk) > 0:
        fk_output = f',\n\t FOREIGN KEY ('
        for key in fk:
            fk_output += ''+key+','
        fk_output += ')'

        # fk_output = f',\n\t CONSTRAINT {table_name}_AK1 FOREIGN KEY ('
        # for key in fk:
        #     fk_output += ''+key+','
        fk_output += ' REFERENCES DW_MBR_PCP ('
        for key in fk:
            fk_output += ''+key+','

        fk_output = fk_output.rstrip(',')
        fk_output += ')\n),'
    return fk_output


In [130]:
def CreateETLTables(df, schema):
    '''
    Returns a string for numerous CREATE OR REPLACE TABLE SQL queries.

        Parameters: 
            df (Dataframe): A Dataframe with data needed for table creation.

        Returns:
            output (str): An SQL query to create tables.
    '''

    # Initialize output to be append to later
    output = ''

    # Loop through each table in data
    tables = df.groupby(t_name)

    for table_name, table in tables:

        uk = [] # Unique Keys list
        # fk = [] # Foreign Keys list
        
        output += f'\nCREATE OR REPLACE TABLE {schema}.{table_name} \n('

        for i, row in table.iterrows():

            output += f'\n\t{row[column_name]} {row[datatype]} {row[nullability]} COMMENT "{row[column_comment]}",'
            
            if 'PRIMARY KEY' in str(row[key_info]):
                pk = f',\n\tPRIMARY KEY ({row[column_name]})'
                
            if 'UNIQUE KEY' in str(row[key_info]):
                uk.append(row[column_name])

        output = output.rstrip(',')
        
        output += pk

        output += CreateUniqueKeyOutput(uk)

        # output += CreateForeignKeyOutput(fk)

    output = output.rstrip(',')

    # print(output)

    return output

In [131]:
def TablesCreationWithTimestamp(df):
    # Run the loop for each table 
    tables = df.groupby(t_name)

    # Initialize output string
    output = ''

    for table_name, table in tables:

        # Initialize variables for edges cases (CONSTRAINTS (PRIMARY KEYS & UNIQUE KEYS), TIMESTAMPS)
        constraints = ''
        uk = []
        timestamp_flag = False

        output += f'\nCREATE OR REPLACE TABLE {schema}.{table_name} \n('

        for i, row in table.iterrows():

            # if row[datatype] == 'TIMESTAMP(3)': #DW_CREAT_DTTM TIMESTAMP DEFAULT CURRENT_TIMESTAMP() COMMENT 'timestamp when record was created in the dw.'
            #     output += f'\n\t {row[column_name]} TIMESTAMP DEFAULT CURRENT_TIMESTAMP() COMMENT "{row[column_comment]}",'
            #     timestamp_flag = True
            # else:
            output += f'\n\t {row[column_name]} {row[datatype]} {row[nullability]} COMMENT "{row[column_comment]}",'
            
            if row[key_info] == 'PRIMARY KEY':
                constraints += f',\n\t CONSTRAINT {table_name}_PK PRIMARY KEY ({row[column_name]})'
            if row[key_info] == 'UNIQUE KEY':
                uk.append(row[column_name])

        output = output.rstrip(',')
        
        output += constraints

        if len(uk) > 0:
            uk_output = f',\n\t CONSTRAINT {table_name}_AK1 UNIQUE ('
            for key in uk:
                uk_output += ''+key
            uk_output += ')'
            output += uk_output
            uk = []

        if timestamp_flag:
            output += f',\n\t SYSTEMTIMESTAMP TIMESTAMP'

        output += '\n),'
    output = output.rstrip(',')
    print(output)

    return output

### Create Pipe and Stage Files

In [132]:
def CreateStageAndPipe(df, env, db, schema, format):
    tables = df.groupby(t_name)
    output = ''

    for table_name, table in tables:
        output += f'\n-----{table_name}---------------------------------------------------------------------------'

        output += f'\nCREATE OR REPLACE STAGE {table_name}_STAGE \n\t copy_options = (on_error="skip_file");\n\n'

        output += f'CREATE OR REPLACE PIPE {db}.{schema}.{table_name}_PIPE AS \nCOPY INTO {db}.{schema}.{table_name}('
        for i, row in table.iterrows():
            output += f'\n\t{row[column_name]},'
        output = output.rstrip(',')

        output += ')\nFROM (SELECT'
        for i, row in table.iterrows():
            output += f'\n\tt.$1: {row[column_name]},'
        output = output.rstrip(',')

        output += f'\nFROM @{table_name} t) \nfile_format = (format_name = {env.lower()}_{format}_format);\n'

    # print(output)
    return output

### Create Foundation Table files

In [133]:
# OUTPUT:
# CREATE TABLE FOUNDATION.DW_MBR_PCP LIKE ETL.DW_MBR_PCP
# ...
 
def CreateFoundationTables(df):
    tables = df.groupby(t_name)
    output = ''

    for table_name, table in tables:
        output += f'CREATE TABLE FOUNDATION.{table_name} LIKE ETL.{table_name};\n'

    # print(output)
    return output

### Create afterMigrateError.sql files

In [134]:
# OUTPUT:
# DELETE FROM "ECT_DEV_ELIGIBILITY_DB"."ETL"."flyway_schema_history_memberpcp" WHERE "success" = 0;

def CreateAfterMigrateError(db, schema, flyway_history):
    return f'DELETE FROM "{db}"."{schema}"."{flyway_history}" WHERE "success" = 0'

### Create Format files

In [135]:
# OUTPUT:
# create or replace file format prd_avro_format
#   type = avro;

def CreateFormatFile(env, format):
    return f'create or replace file format {env.lower()}_{format}_format\n\ttype = {format}'

### Check directories for storing files, create new directories if needed

In [136]:
from os import path
paths = ['ddls', 'ddls/dev', 'ddls/stg', 'ddls/prd']

# Create directories for files
for p in paths:
  if not path.isdir(p):
    os.mkdir(p)

### Output to file for each environment (dev, stg, prd)

In [137]:
# Create some variables that are not found in the excel doc.
envs = ['DEV', 'STG', 'PRD'] # Files need to be created for all 4 envs. 
schema = 'ETL'
format = 'json' # For ADD {format} FORMAT.sql file.
version = "V1" # For flyway versioning.
flyway_history = "flyway_schema_history_member_pcp" #Check which flyway schema history your project should be using.

for env in envs:
    
    db = f'ECT_{env}_ELIGIBILITY'

    with open(f'ddls/{env}/{version}__CREATE_ETL_TABLES.sql', 'w') as f:
        f.write(CreateETLTables(df, schema))
        
    with open(f'ddls/{env}/{version}__CREATE_STAGE_AND_PIPES.sql', 'w') as f:
        f.write(CreateStageAndPipe(df, env, db, schema, format))
    
    with open(f'ddls/{env}/{version}__CREATE_FOUNDATION_TABLES.sql', 'w') as f:
        f.write(CreateFoundationTables(df))

    with open(f'ddls/{env}/afterMigrateError.sql', 'w') as f:
        f.write(CreateAfterMigrateError(db, schema, flyway_history))

    with open(f'ddls/{env}/{version}__ADD {format.upper()} FORMAT.sql', 'w') as f:
        f.write(CreateFormatFile(env, format))