<a href="https://colab.research.google.com/github/HokieCrazy/Snowflake-Data-Generator-Notebook/blob/master/Dynamic_Data_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install Libraries using pip
This must be done when using Colab Notebooks, since the instance of Python is not a permanent one.  Once all of the pip installs are done, the `os._exit(00)` line should be executed to reset the kernel for you.  You will receive an error message, but this is expected.


In [0]:
!pip install --upgrade snowflake-connector-python

In [0]:
!pip install --upgrade pandas

In [0]:
!pip install --upgrade snowflake-sqlalchemy

In [0]:
import os
os._exit(00)

## Import Libraries

In [0]:
import os
import base64
import pandas as pd
from snowflake.sqlalchemy import URL
import sqlalchemy as sa
from sys import exit
from sqlalchemy import text

## Set your Snowflake Environments
This Notebook allows to read from a Source account, database, and schema.  Once the information is read, the data can be generated in a separate account, database, and schema.

##### *Note: This does not mean the Notebook is copying any data from Source to Target.*

##### *Note: The sourcePassword and targetPassword entered into the page should be the base64 encrypted password, not the plain text password.*


In [0]:
# Source System to gather Statistics from
sourceUser = ''
sourceAccount = ''
sourcePassword = ''
sourceDatabase = ''
sourceSchema = ''
sourceWarehouse = ''

# Target System to generate data to
targetUser = ''
targetAccount = ''
targetPassword = ''
targetDatabase = ''
targetSchema = ''
targetWarehouse = ''


## Compile Functions

In [0]:
def getEngine(snowUser=None, snowPassword=None, snowAccount=None, snowDatabase=None, snowSchema=None, snowWarehouse=None):
    try:
        base64_bytes = snowPassword.encode('ascii')
        message_bytes = base64.b64decode(base64_bytes)
        password = message_bytes.decode('ascii')

        SnowEngine = sa.create_engine(URL(
            user=snowUser,
            password=password,
            account=snowAccount,
            database=snowDatabase,
            schema=snowSchema,
            warehouse=snowWarehouse
        ))

        return SnowEngine

    except Exception as e:
        errorStr = 'ERROR: ' + str(e)
        return errorStr

def execSQLDataFrame(SnowEngine, sqlQuery):
    try:
        if SnowEngine is None:
            print('SnowEngine argument is required')
            exit(InvalidArgsCode)
        if sqlQuery is None:
            print('sqlQuery argument is required')
            exit(InvalidArgsCode)

        resultSet = pd.read_sql(text(sqlQuery), SnowEngine)
 
        return resultSet

    except Exception as e:
        errorStr = 'ERROR (execLoadDBQuery)' + str(e)
        print(errorStr)
        raise

def execColStats(dfColumns, rows):
    for i,row in dfColumns.iterrows():
        if dfColumns.data_type[i] == 'TEXT':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        """ + str(dfColumns.character_maximum_length[i]) + """::NUMBER as data_type_length,
                        NULL::NUMBER as data_type_precision,
                        MIN(LENGTH(""" + str(dfColumns.column_name[i]) + """))::NUMBER as min_length,
                        MAX(LENGTH(""" + str(dfColumns.column_name[i]) + """))::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""
        elif dfColumns.data_type[i] == 'FLOAT':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        NULL::NUMBER as data_type_length,
                        NULL::NUMBER as data_type_precision,
                        MIN((""" + str(dfColumns.column_name[i]) + """))::NUMBER as min_length,
                        MAX((""" + str(dfColumns.column_name[i]) + """))::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""
        elif dfColumns.data_type[i] == 'NUMBER':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        """ + str(dfColumns.numeric_precision[i]) + """::NUMBER as data_type_length,
                        """ + str(dfColumns.numeric_scale[i]) + """::NUMBER as data_type_precision,
                        MIN((""" + str(dfColumns.column_name[i]) + """))::NUMBER as min_length,
                        MAX((""" + str(dfColumns.column_name[i]) + """))::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""
        elif dfColumns.data_type[i] == 'DATE' or dfColumns.data_type[i] == 'TIMESTAMP' or dfColumns.data_type[i] == 'TIMESTAMP_LTZ' or dfColumns.data_type[i] == 'TIMESTAMP_NTZ':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        NULL::NUMBER as data_type_length,
                        NULL::NUMBER as data_type_precision,
                        NULL::NUMBER as min_length,
                        NULL::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""
        elif dfColumns.data_type[i] == 'ARRAY':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        NULL::NUMBER as data_type_length,
                        NULL::NUMBER as data_type_precision,
                        MIN(ARRAY_SIZE(""" + str(dfColumns.column_name[i]) + """))::NUMBER as min_length,
                        MAX(ARRAY_SIZE(""" + str(dfColumns.column_name[i]) + """))::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""
        elif dfColumns.data_type[i] != 'VARIANT':
            sqlQuery = """
                SELECT '""" + str(dfColumns.table_name[i]) + """' as table_name,
                        '""" + str(dfColumns.column_name[i]) + """' as column_name,
                        '""" + str(dfColumns.data_type[i]) + """' as data_type,
                        NULL::NUMBER as data_type_length,
                        NULL::NUMBER as data_type_precision,
                        MIN(LENGTH(""" + str(dfColumns.column_name[i]) + """))::NUMBER as min_length,
                        MAX(LENGTH(""" + str(dfColumns.column_name[i]) + """))::NUMBER as max_length,
                        COUNT(DISTINCT """ + str(dfColumns.column_name[i]) + """)::NUMBER as cardinality,
                        """ + rows + """::NUMBER as total_cardinality,
                        SUM(CASE WHEN """ + str(dfColumns.column_name[i]) + """ IS NULL THEN 1 ELSE 0 END)::NUMBER as total_null
                FROM """ + str(dfColumns.table_name[i]) + """;"""

        if i==0:
            dfResult = execSQLDataFrame(SourceEngine, sqlQuery)
        else:
            dfTemp = execSQLDataFrame(SourceEngine, sqlQuery)
            dfResult = dfResult.append(dfTemp, ignore_index = True)

    return dfResult

def execGenDataSQL(dfStats):
    
    seed_1 = 10000  #default seed value, increaments for each row added

    sqlQuery = """
        INSERT INTO """ + dfStats.table_name[0] + """("""
    
    for i,row in dfStats.iterrows():
        if i == 0:
            sqlQuery += dfStats.column_name[i]
        else:
            sqlQuery += """, """ + dfStats.column_name[i]
    
    sqlQuery += """)
        SELECT """

    for i,row in dfStats.iterrows():
        if dfStats.data_type[i].upper() == 'DATE':
            fb = 'dateadd(day, -uniform(1, ' + str(dfStats.cardinality[i]) + ', random('+ str(seed_1) +')), date_trunc(day, current_date)) '
            fe = '::date '
        elif dfStats.data_type[i].upper() == 'TIMESTAMP' or dfStats.data_type[i].upper() == 'TIMESTAMP_LTZ' or dfStats.data_type[i].upper() == 'TIMESTAMP_NTZ':
            fb = '(date_part(epoch_second, current_date) - (uniform(1, ' + str(dfStats.cardinality[i]) + ', random('+ str(seed_1) +')))) '
            fe = '::timestamp '
        elif dfStats.data_type[i].upper() == 'VARCHAR' or dfStats.data_type[i].upper() == 'TEXT':
            fb = 'randstr(uniform(' + str(dfStats.min_length[i]) + ',' + str(dfStats.max_length[i]) + ', random(' + str(seed_1) + ')),uniform(1,'+ str(dfStats.cardinality[i]) + ',random(' + str(seed_1) + '))) '
            fe = '::varchar(' + str(int(dfStats.data_type_length[i])) + ') '
        elif dfStats.data_type[i].upper() == 'NUMBER':
            if dfStats.cardinality[i] == dfStats.total_cardinality[0] and int(dfStats.data_type_precision[i]) == 0:
                fb = '(seq8()+1)'
            else:
                fb = 'uniform(1,' + str(dfStats.cardinality[i]) + ' , random('+ str(seed_1) +')) '
            fe = '::number(' + str(int(dfStats.data_type_length[i])) + ',' + str(int(dfStats.data_type_precision[i])) + ') '
        elif dfStats.data_type[i].upper() == 'FLOAT':
            fb = "uniform(1,ceil(" + str(int(dfStats.cardinality[i])/4) + ") , random("+ str(seed_1) +")) || '.' || uniform(1,9999,random(" + str(seed_1) + ")) "
            fe = '::float'
        else:
            fb = 'NULL '
            fe = '::' + dfStats.data_type[i].upper()

        if (fb == ''):
            print( "WARNING: Line: {0} Unknown Datatype: {1}".format(i,dfStats.data_type[i]))
        else:
            if (int(dfStats.cardinality[i]) == 0):
                formula = 'null' + fe
            elif (dfStats.total_null[i] == 0):
                formula = fb + fe
            else:
                formula = '(case when uniform(1,1000,random(' + str(seed_1+10000) + ')) <= (' + str(int(dfStats.total_null[i])) + '/' + str(dfStats.total_cardinality[0]) + ')*1000 then null else ' + fb +' end) ' + fe

        if i == 0:
            sqlQuery += formula
        else:
            sqlQuery += """,
            """ + formula

    sqlQuery += """
        FROM table(generator(rowcount => """ + str(dfStats.total_cardinality[0]) + """));"""

    return sqlQuery

## Create Source and Target SQLAlchemy Engines

In [0]:
SourceEngine = getEngine(snowUser=sourceUser, snowPassword=sourcePassword, snowAccount=sourceAccount, snowDatabase=sourceDatabase, snowSchema=sourceSchema, snowWarehouse=sourceWarehouse)
TargetEngine = getEngine(snowUser=targetUser, snowPassword=targetPassword, snowAccount=targetAccount, snowDatabase=targetDatabase, snowSchema=targetSchema, snowWarehouse=targetWarehouse)

## List Tables to Analyze from Source

In [0]:
sqlQuery = """
    SHOW TABLES IN """ + sourceSchema + """;
    """

dfTables = execSQLDataFrame(SourceEngine, sqlQuery)

dfTables

## Main Logic
This section will iterate through the list of Tables, get key statistics on each table, generate the necessary SQL to create data on the target, and then insert that randomly generated data into the target table.

##### *Note: This section will detect whether the tables exist on the target system.  If they do, they will insert (no truncate).  If they do not exist, the tables will be created.*


##### *Also Note: If the target table definition is different, you may see errors or odd behavior.  If the source table has constraints, it is possible that the CREATE or REPLACE TABLE statement will fail.*


In [0]:
for i,row in dfTables.iterrows():
    sqlQuery = """
        SELECT *
        FROM information_schema.columns
        WHERE table_name = '""" + str(dfTables.name[i]) + """'
        AND table_schema = '""" + sourceSchema + """'
        AND data_type NOT IN ('VARIANT','ARRAY')
        ORDER BY ordinal_position;
        """
    dfColumns = execSQLDataFrame(SourceEngine, sqlQuery)

    if dfColumns.shape[0] != 0:
        dfStats = execColStats(dfColumns,str(dfTables.rows[i]))

        sqlQuery = execGenDataSQL(dfStats)

        sqlCheckQuery = """SELECT table_name FROM information_schema.tables WHERE table_name = '""" + dfTables.name[i] + """' and table_catalog = '""" + targetDatabase + """';"""
        target_check = execSQLDataFrame(TargetEngine, sqlCheckQuery)

        if target_check.shape[0] != 0:
            output = execSQLDataFrame(TargetEngine, sqlQuery)
            print(sqlQuery)
        else:
            target_ddl_sql = """SELECT get_ddl('table','""" + dfTables.name[i] + """') as ctas;"""
            target_ddl = execSQLDataFrame(SourceEngine, target_ddl_sql)

            execSQLDataFrame(TargetEngine, target_ddl.ctas[0])
            output = execSQLDataFrame(TargetEngine, sqlQuery)
            print(sqlQuery)

## Optional Main Logic
This section will iterate through the list of Tables, get key statistics on each table, generate the necessary SQL to create data on the target, and then print the resulting SQL to the Notebook, so it can be executed manually on Snowflake.

In [0]:
for i,row in dfTables.iterrows():
    sqlQuery = """
        SELECT *
        FROM information_schema.columns
        WHERE table_name = '""" + str(dfTables.name[i]) + """'
        AND table_schema = '""" + sourceSchema + """'
        AND data_type NOT IN ('VARIANT','ARRAY')
        ORDER BY ordinal_position;
        """
    dfColumns = execSQLDataFrame(SourceEngine, sqlQuery)

    if dfColumns.shape[0] != 0:
        dfStats = execColStats(dfColumns,str(dfTables.rows[i]))

        sqlQuery = execGenDataSQL(dfStats)
        
        print(sqlQuery)