## Install Libraries using pip
This must be done when using Colab Notebooks, since the instance of Python is not a permanent one.  Once all of the pip installs are done, execute the os._exit cell to restart your runtime.  This does not need to be done between pip installs as is prompted.

In [None]:
!pip install --upgrade snowflake-connector-python

In [None]:
!pip install --upgrade pandas

In [None]:
!pip install --upgrade snowflake-sqlalchemy

In [None]:
import os
os._exit(00) 

## Import Libraries

In [None]:
import os
import base64
import pandas as pd
from snowflake.sqlalchemy import URL
import snowflake.connector
import sqlalchemy as sa
from sys import exit
from sqlalchemy import text
import time

## Enter Connection Information
Please enter the necessary connection information, as well as the parameters necessary to execute the file load.  Please note that the password MUST be Base64 Encoded.



In [None]:
# Snowflake Credentials
sourceUser = ''
sourceAccount = ''
sourcePassword = ''
sourceDatabase = ''
sourceSchema = ''
sourceWarehouse = ''

# Parameters
db_name = ''
schema_name = ''
stage_name = ''
format_name = ''
warehouse_pattern = ''

# Execution Mode (options: 'FAST_MODE', 'ECONOMY_MODE')
# Economy mode requires 50% more files than a warehouse size can handle before
# moving up to the next size.  This can actually be pretty efficient in
# situations where files are varying sizes and some threads finish before
# others.  It also tries to avoid under-utilization on larger warehouse sizes.
exec_mode = 'FAST_MODE'

## Functions and Connections
Execute the following cell, which will compile the necessary functions of this Notebook, as well as create the necessary connections to Snowflake.  The output of the cell will be the current user, role, database, schema, warehouse, and version of Snowflake that you are connecting with.  Please validate these are correct before moving on to the next step.

In [None]:
def getEngine(snowUser=None, snowPassword=None, snowAccount=None, snowDatabase=None, snowSchema=None, snowWarehouse=None):
    try:
        base64_bytes = snowPassword.encode('ascii')
        message_bytes = base64.b64decode(base64_bytes)
        password = message_bytes.decode('ascii')

        SnowEngine = sa.create_engine(URL(
            user=snowUser,
            password=password,
            account=snowAccount,
            database=snowDatabase,
            schema=snowSchema,
            warehouse=snowWarehouse
        ))

        return SnowEngine

    except Exception as e:
        errorStr = 'ERROR: ' + str(e)
        return errorStr

def execSQLDataFrame(SnowEngine, sqlQuery):
    try:
        if SnowEngine is None:
            print('SnowEngine argument is required')
            exit(InvalidArgsCode)

        if sqlQuery is None:
            print('sqlQuery argument is required')
            exit(InvalidArgsCode)

        resultSet = pd.read_sql(text(sqlQuery), SnowEngine)
 
        return resultSet

    except Exception as e:
        errorStr = 'ERROR (execSQLDataFrame)' + str(e)
        print(errorStr)
        raise

def execDBQuery(sqlQuery, con):
    try:

        if sqlQuery is None:
            print('sqlQuery argument is required')
            exit(InvalidArgsCode)

        if con is None:
            print('con argument is required')
            exit(InvalidArgsCode)

        cur = con.cursor()
        qid = cur.execute(sqlQuery).sfqid
        cur.close()

        return qid

    except Exception as e:
        errorStr = 'ERROR (execDBQuery)' + str(e)
        print(errorStr)
        raise

def execDBQueryAsync(sqlQuery, con, snowWarehouse):
    try:

        if sqlQuery is None:
            print('sqlQuery argument is required')
            exit(InvalidArgsCode)

        if snowWarehouse is None:
            print('snowWarehouse argument is required')
            exit(InvalidArgsCode)

        if con is None:
            print('con argument is required')
            exit(InvalidArgsCode)

        whQuery = """USE WAREHOUSE """ + snowWarehouse + """;"""

        cura = con.cursor()
        cura.execute(whQuery)
        cura.execute(sqlQuery, _no_results=True)

        time.sleep(5)
        whQuery = """ALTER WAREHOUSE """ + snowWarehouse + """ SUSPEND;"""
        cura.execute(whQuery)

    except Exception as e:
        errorStr = 'ERROR (execDBQueryAsync)' + str(e)
        print(errorStr)
        raise


base64_bytes = sourcePassword.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
password = message_bytes.decode('ascii')

con = snowflake.connector.connect(
    user=sourceUser,
    password=password,
    account=sourceAccount,
    warehouse=sourceWarehouse,
    database=sourceDatabase,
    snowSchema=sourceSchema
)

cona = snowflake.connector.connect(
    user=sourceUser,
    password=password,
    account=sourceAccount,
    warehouse=sourceWarehouse,
    database=sourceDatabase,
    snowSchema=sourceSchema
)

SnowEngine = getEngine(snowUser=sourceUser, snowPassword=sourcePassword, snowAccount=sourceAccount, snowDatabase=sourceDatabase, snowSchema=sourceSchema, snowWarehouse=sourceWarehouse)

sqlQuery = """SELECT CURRENT_USER() as user, CURRENT_ROLE() as role, CURRENT_DATABASE() as db, CURRENT_SCHEMA() as schema, CURRENT_WAREHOUSE() as wh, CURRENT_VERSION() as ver;"""
print(execSQLDataFrame(SnowEngine, sqlQuery))


## Tables to Load
Modify the SQL statement below to specify the tables that you wish to load during the execution of this script.

In [None]:
sqlQuery = """
    SELECT * 
    FROM """ + db_name + """.information_schema.tables
    WHERE table_schema = '""" + schema_name.upper() + """'
      AND table_name LIKE 'DATA_LOAD_TEST%'
    """

dfTables = execSQLDataFrame(SnowEngine, sqlQuery)

dfTables

## Main Execution
The following cell is the main execution and will being executing COPY INTO statements across all available warehouses (mostly) in parallel.  The more warehouses available, the faster the tables will be executed.

### Before Executing!!!
There are several places within the following script that should be reviewed, as they are likely custom per customer, schema, data file, etc.  Look for the comments in the section below for more details.  Do Not modify any section that isn't between comment blocks unless you know what you're doing!

In [None]:
for x,y in dfTables.iterrows():

    dfWH = pd.DataFrame()

    while (dfWH.shape[0] == 0):
        print(str(time.strftime("%H:%M:%S", time.localtime())) + ': Looking for available warehouses ... ')
        
# ----------------- WAREHOUSE SEARCH CUSTOMIZATION BEGIN ---------------------

        WHQuery = """
            SHOW WAREHOUSES LIKE '""" + warehouse_pattern + """%';"""

# ----------------- WAREHOUSE SEARCH CUSTOMIZATION END -----------------------
        
        qid = execDBQuery(sqlQuery=WHQuery, con=con)
        
        WHQuery = """
            SELECT "name" as wh_name, "state" as state
            FROM table(result_scan('""" + qid + """'))
            WHERE state = 'SUSPENDED';
            """
        dfWH = execSQLDataFrame(SnowEngine, WHQuery)

        if dfWH.shape[0] == 0:
            time.sleep(10)


# ----------------- FIND FILES CUSTOMIZATION BEGIN ---------------------------
# Base Code assumes that the files are located in the root of internal table
# stages.  This code does not use the stage_name parameter that is set earlier,
# but it could.  Modify this code to find the files that are to be loaded.

    sqlQuery = """LIST @""" + sourceSchema + """.%""" + dfTables.table_name[x] + """;"""

# ----------------- FIND FILES CUSTOMIZATION END -----------------------------

    qid = execDBQuery(sqlQuery=sqlQuery, con=con)

# ----------------- FILE FILTER CUSTOMIZATION BEGIN --------------------------
# Base Code assumes that every file in the stage will be loaded. If this needs
# to be customized, leverage the SQL block below.

    cntQuery = """
        SELECT *
        FROM table(result_scan('""" + qid + """'));
        """
# ----------------- FILE FILTER CUSTOMIZATION END ----------------------------
    
    dfFiles = execSQLDataFrame(SnowEngine, cntQuery)

    fileCnt = dfFiles.shape[0]

    if exec_mode == 'FAST_MODE':
        if fileCnt <= 8:
            whSize = 'XSMALL'
        elif fileCnt > 8 and fileCnt <= 16:
            whSize = 'SMALL'
        elif fileCnt > 16 and fileCnt <= 32:
            whSize = 'MEDIUM'
        elif fileCnt > 32 and fileCnt <= 64:
            whSize = 'LARGE'
        elif fileCnt > 64 and fileCnt <= 128:
            whSize = 'XLARGE'        
        elif fileCnt > 128 and fileCnt <= 256:
            whSize = 'XXLARGE'
        elif fileCnt > 256 and fileCnt <= 512:
            whSize = 'XXXLARGE'
        elif fileCnt > 512:
            whSize = 'X4LARGE'
    else:
        if fileCnt <= 12:
            whSize = 'XSMALL'
        elif fileCnt > 12 and fileCnt <= 24:
            whSize = 'SMALL'
        elif fileCnt > 24 and fileCnt <= 48:
            whSize = 'MEDIUM'
        elif fileCnt > 48 and fileCnt <= 96:
            whSize = 'LARGE'
        elif fileCnt > 96 and fileCnt <= 192:
            whSize = 'XLARGE'        
        elif fileCnt > 192 and fileCnt <= 384:
            whSize = 'XXLARGE'
        elif fileCnt > 384 and fileCnt <= 728:
            whSize = 'XXXLARGE'
        elif fileCnt > 728:
            whSize = 'X4LARGE'

    sqlQuery = """
    ALTER WAREHOUSE """ + dfWH.wh_name[0] + """ 
    SET WAREHOUSE_SIZE = """ + whSize + """;"""
 
    qid = execDBQuery(sqlQuery=sqlQuery, con=con)
    print(str(time.strftime("%H:%M:%S", time.localtime())) + ': Warehouse set to: ' + whSize)

    sqlQuery = """
    ALTER WAREHOUSE """ + dfWH.wh_name[0] + """ 
    RESUME;"""
 
    qid = execDBQuery(sqlQuery=sqlQuery, con=con)
    print(str(time.strftime("%H:%M:%S", time.localtime())) + ': Warehouse ' + dfWH.wh_name[0] + ' Resumed')

    time.sleep(5)

# ----------------- COPY INTO CUSTOMIZATION BEGIN ----------------------------
# Base Code assumes that every file in the stage will be loaded. If additional
# parameters need to be added to the COPY statement, or the file selection
# filter needs to be customized, leverage the SQL block below.

    sqlQuery = """
        COPY INTO """ + dfTables.table_name[x] + """ 
        FROM @""" + sourceSchema.lower() + """.%""" + dfTables.table_name[x].lower() + """/
        PATTERN = '.*[.]csv[.]gz'
        FILE_FORMAT = (format_name = """ + format_name + """);
        """
# ----------------- COPY INTO CUSTOMIZATION END ------------------------------
    
    execDBQueryAsync(sqlQuery=sqlQuery, con=cona, snowWarehouse=dfWH.wh_name[0])
    print(str(time.strftime("%H:%M:%S", time.localtime())) + ': Copy Into for table: ' + dfTables.table_name[x])
