In [1]:
import warnings
warnings.filterwarnings("ignore")
from datetime import datetime
import logging
from IPython.display import display, Markdown
from sqlalchemy import MetaData, Table, Column, String, Integer
from snowflake_helper import getSnowflakeEngine
from snowflake.connector import connect

In [2]:
# Configure logging
file_name = "..\..\logs\data-load-log\snowflake-upload.log"
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
    handler = logging.FileHandler(file_name)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

# display log message in-line 
class NotebookHandler(logging.Handler):
    def emit(self, record):
        display(Markdown(self.format(record)))
        
logger.addHandler(NotebookHandler())

# set the file to empty at start
with open(file_name, 'w'):
    pass

In [3]:
# test the URL connection
def testConnection(engine):
    try:
        logger.info('Testing Snowflake connection')
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        logger.info(results[0])
        logger.info('Successfully connected to Snowflake')
        connection.close()
        return True
    except:
        logger.error("Error connecting to Snowflake")
        if connection:
            connection.close()
        return False

In [4]:
def createCFATable(engine):
    try:
        logger.info('-------Starting Snowflake table creation-------')
        # Define metadata
        metadata = MetaData()
    
        # Define table structure
        table_name = 'cfa-web-data'
        topics_table = Table(
            table_name,
            metadata,
            Column('Name_of_the_topic', String),
            Column('Year', Integer),
            Column('Level', String),
            Column('Introduction_Summary', String),
            Column('Learning_Outcomes', String),
            Column('Summary', String),
            Column('Link_to_Summary_Page', String),
            Column('Link_to_PDF_File', String)
        )
    
        # create or replace table in Snowflake
        topics_table.drop(engine, checkfirst=True)  # Drop table if exists
        topics_table.create(engine)
        logger.info('Table created')
        logger.info('-------Ending Snowflake table creation-------')
        return table_name
    except:
        logger.error("Error creating Snowflake Table")
        return False

In [5]:
def uploadDataToSnowflake(engine, table_name):
    file_format_name = 'csv_file_format'
    field_delimiter = '\t'
    skip_header = 1
    skip_blank_lines = True
    trim_space = True
    field_optionally_enclosed_by = None

    file_path = "../../data/scrape-data/cfa-data.csv"
    stage_name = "data_csv_stage"
    
    # Create or replace file format
    create_file_format_sql = f"""
    CREATE OR REPLACE FILE FORMAT {file_format_name}
    TYPE = 'CSV'
    FIELD_DELIMITER = '{field_delimiter}'
    SKIP_HEADER = {skip_header}
    SKIP_BLANK_LINES = {skip_blank_lines}
    TRIM_SPACE = {trim_space}
    """

    # create or replace Stage
    create_stage = f"""CREATE OR REPLACE STAGE {stage_name} DIRECTORY = ( ENABLE = true );"""
    
    # Put file format
    put_command = f"""PUT 'file://{file_path}' @{stage_name}"""

    # Copy to table
    copy_sql = f"""
        COPY INTO "{table_name}" FROM '@{stage_name}'
        FILE_FORMAT = (FORMAT_NAME = {file_format_name})
        """

    try:
        logger.info('-------Starting Data Upload to Snowflake-------')
        
        with engine.connect() as connection: 
            # execute file format
            connection.execute(create_file_format_sql)
            logger.info('File Format created')
            
            # execute stage creation
            connection .execute(create_stage)
            logger.info('Stage created')
            
            # put file in stage
            connection.execute(put_command)
            logger.info('Put file into stage')

            # put file in stage
            connection.execute(copy_sql)
            logger.info('Copied file into table')

        logger.info('-------Ending Data Upload to Snowflake-------')
    except: 
        logger.error("Error creating Uploading to Snowflake")

In [6]:
if __name__ == "__main__":
    # create snowflake engine()
    snowflake_database="ASSIGNMENT_2"
    snowflake_schema="RR_SCHEMA"
    snowflake_warehouse="WH_2"
    engine = getSnowflakeEngine(snowflake_database, snowflake_schema, snowflake_warehouse)
    
    # test connectuion
    testConnection(engine)
    
    # create table for CFA data
    table_name = createCFATable(engine)

    # uploading data to snowflake
    uploadDataToSnowflake(engine, 'cfa-web-data')
    

Testing Snowflake connection

8.6.2

Successfully connected to Snowflake

-------Starting Snowflake table creation-------

Table created

-------Ending Snowflake table creation-------

-------Starting Data Upload to Snowflake-------

File Format created

Stage created

Put file into stage

Copied file into table

-------Ending Data Upload to Snowflake-------