In [1]:
import warnings
warnings.filterwarnings("ignore")
from datetime import datetime
from IPython.display import display, Markdown
from sqlalchemy import MetaData, Table, Column, String, Integer
from snowflake_helper import getSnowflakeEngine
from snowflake.connector import connect

In [2]:
# test the URL connection
def testConnection(engine):
    try:
        print('Testing Snowflake connection')
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
        print('Successfully connected to Snowflake')
        connection.close()
        return True
    except:
        print("Error connecting to Snowflake")
        if connection:
            connection.close()
        return False

In [3]:
def createCFATable(engine):
    try:
        print('-------Starting Snowflake table creation-------')
        # Define metadata
        metadata = MetaData()
    
        # Define table structure
        table_name = 'CFA'
        topics_table = Table(
            table_name,
            metadata,
            Column('topic', String),
            Column('year', Integer),
            Column('level', String),
            Column('introductionSummary', String),
            Column('learningOutcomes', String),
            Column('summary', String),
            Column('summaryPageLink', String),
            Column('pdfFileLink', String)
        )
    
        # create or replace table in Snowflake
        topics_table.drop(engine, checkfirst=True)  # Drop table if exists
        topics_table.create(engine)
        print('Table created')
        print('-------Ending Snowflake table creation-------')
        return table_name
    except:
        print("Error creating Snowflake Table")
        return False

In [4]:
def createGrobidTable(engine):
    try:
        print('-------Starting Snowflake table creation-------')
        # Define metadata
        metadata = MetaData()
    
        # Define table structure
        table_name = 'GROBID'
        topics_table = Table(
            table_name,
            metadata,
            Column('topic', String),
            Column('articleName', String),
            Column('year', Integer),
            Column('level', Integer),
            Column('summary', String)
        )
    
        # create or replace table in Snowflake
        topics_table.drop(engine, checkfirst=True)  # Drop table if exists
        topics_table.create(engine)
        print('Table created')
        print('-------Ending Snowflake table creation-------')
        return table_name
    except:
        print("Error creating Snowflake Table")
        return False

In [5]:
def createPypdfTable(engine):
    try:
        print('-------Starting Snowflake table creation-------')
        # Define metadata
        metadata = MetaData()
    
        # Define table structure
        table_name = 'PYPDF'
        topics_table = Table(
            table_name,
            metadata,
            Column('topic', String),
            Column('articleName', String),
            Column('year', Integer),
            Column('level', Integer),
            Column('summary', String)
        )
    
        # create or replace table in Snowflake
        topics_table.drop(engine, checkfirst=True)  # Drop table if exists
        topics_table.create(engine)
        print('Table created')
        print('-------Ending Snowflake table creation-------')
        return table_name
    except:
        print("Error creating Snowflake Table")
        return False

In [6]:
def createMetadataTable(engine):
    try:
        print('-------Starting Snowflake table creation-------')
        # Define metadata
        metadata = MetaData()
    
        # Define table structure
        table_name = 'metadata'
        topics_table = Table(
            table_name,
            metadata,
            Column('text', String),
            Column('para', Integer),
            Column('bboxes', String),
            Column('pages', String),
            Column('section_title', String),
            Column('section_number', String),
            Column('paper_title', String),
            Column('file_path', String)
        )
    
        # create or replace table in Snowflake
        topics_table.drop(engine, checkfirst=True)  # Drop table if exists
        topics_table.create(engine)
        print('Table created')
        print('-------Ending Snowflake table creation-------')
        return table_name
    except:
        print("Error creating Snowflake Table")
        return False

In [7]:
def uploadDataToSnowflake(engine, table_name):
    file_format_name = 'csv_file_format'
    field_delimiter = '\t'
    skip_header = 1
    skip_blank_lines = True
    trim_space = True
    field_optionally_enclosed_by = None

    file_path = "../data/clean-data/cfa-data-clean.csv"
    stage_name = "data_csv_stage"
    
    # Create or replace file format
    create_file_format_sql = f"""
    CREATE OR REPLACE FILE FORMAT {file_format_name}
    TYPE = 'CSV'
    FIELD_DELIMITER = '{field_delimiter}'
    SKIP_HEADER = {skip_header}
    SKIP_BLANK_LINES = {skip_blank_lines}
    TRIM_SPACE = {trim_space}
    """

    # create or replace Stage
    create_stage = f"""CREATE OR REPLACE STAGE {stage_name} DIRECTORY = ( ENABLE = true );"""
    
    # Put file format
    put_command = f"""PUT 'file://{file_path}' @{stage_name}"""

    # Copy to table
    copy_sql = f"""
        COPY INTO "{table_name}" FROM '@{stage_name}'
        FILE_FORMAT = (FORMAT_NAME = {file_format_name})
        """
    # select warehouse
    wh_sql = f"""use warehouse cfa_data"""
    
    # try:
    print('-------Starting Data Upload to Snowflake-------')
    
    with engine.connect() as connection: 
        # execute file format
        connection.execute(wh_sql)
        
        connection.execute(create_file_format_sql)
        print('File Format created')
        
        # execute stage creation
        connection .execute(create_stage)
        print('Stage created')
        
        # put file in stage
        connection.execute(put_command)
        print('Put file into stage')

        # put file in stage
        connection.execute(copy_sql)
        print('Copied file into table')

    print('-------Ending Data Upload to Snowflake-------')
    # except Exception as e: 
    #     print("Error creating Uploading to Snowflake")

In [8]:
def uploadGrobidToSnowflake(engine, table_name):
    file_format_name = 'csv_file_format'

    file_path = "../data/clean-data/pdf-data-grobid-clean.csv"
    stage_name = "data_csv_stage_grobid"

    # create or replace Stage
    create_stage = f"""CREATE OR REPLACE STAGE {stage_name} DIRECTORY = ( ENABLE = true );"""
    
    # Put file format
    put_command = f"""PUT 'file://{file_path}' @{stage_name}"""

    # Copy to table
    copy_sql = f"""
        COPY INTO "{table_name}" FROM '@{stage_name}'
        FILE_FORMAT = (FORMAT_NAME = {file_format_name})
        """
    # select warehouse
    wh_sql = f"""use warehouse cfa_data"""
    
    # try:
    print('-------Starting Data Upload to Snowflake-------')
    
    with engine.connect() as connection: 
        # execute file format
        connection.execute(wh_sql)
        
        # execute stage creation
        connection .execute(create_stage)
        print('Stage created')
        
        # put file in stage
        connection.execute(put_command)
        print('Put file into stage')

        # put file in stage
        connection.execute(copy_sql)
        print('Copied file into table')

    print('-------Ending Data Upload to Snowflake-------')
    # except Exception as e: 
    #     print("Error creating Uploading to Snowflake")

In [9]:
def uploadPypdfToSnowflake(engine, table_name):
    file_format_name = 'csv_file_format'

    file_path = "../data/clean-data/pdf-data-pypdf-clean.csv"
    stage_name = "data_csv_stage_pypdf"

    # create or replace Stage
    create_stage = f"""CREATE OR REPLACE STAGE {stage_name} DIRECTORY = ( ENABLE = true );"""
    
    # Put file format
    put_command = f"""PUT 'file://{file_path}' @{stage_name}"""

    # Copy to table
    copy_sql = f"""
        COPY INTO "{table_name}" FROM '@{stage_name}'
        FILE_FORMAT = (FORMAT_NAME = {file_format_name})
        """
    # select warehouse
    wh_sql = f"""use warehouse cfa_data"""
    
    # try:
    print('-------Starting Data Upload to Snowflake-------')
    
    with engine.connect() as connection: 
        # execute file format
        connection.execute(wh_sql)
        
        # execute stage creation
        connection .execute(create_stage)
        print('Stage created')
        
        # put file in stage
        connection.execute(put_command)
        print('Put file into stage')

        # put file in stage
        connection.execute(copy_sql)
        print('Copied file into table')

    print('-------Ending Data Upload to Snowflake-------')
    # except Exception as e: 
    #     print("Error creating Uploading to Snowflake")

In [18]:
def uploadMetadataToSnowflake(engine, table_name):
    file_format_name = 'csv_file_format'
    file_path = "../data/clean-data/grobid_metadata-clean.csv"
    stage_name = "data_csv_stage_metadata"

    # create or replace Stage
    create_stage = f"""CREATE OR REPLACE STAGE {stage_name} DIRECTORY = ( ENABLE = true );"""
    
    # Put file format
    put_command = f"""PUT 'file://{file_path}' @{stage_name}"""

    # Copy to table
    copy_sql = f"""
        COPY INTO {table_name} FROM '@{stage_name}'
        FILE_FORMAT = (FORMAT_NAME = {file_format_name})
        """
    # select warehouse
    wh_sql = f"""use warehouse cfa_data"""
    
    # try:
    print('-------Starting Data Upload to Snowflake-------')
    
    with engine.connect() as connection: 
        # execute file format
        connection.execute(wh_sql)
        
        # execute stage creation
        connection .execute(create_stage)
        print('Stage created')
        
        # put file in stage
        connection.execute(put_command)
        print('Put file into stage')

        # put file in stage
        connection.execute(copy_sql)
        print('Copied file into table')

    print('-------Ending Data Upload to Snowflake-------')
    # except Exception as e: 
    #     print("Error creating Uploading to Snowflake")

In [None]:
if __name__ == "__main__":
    # create snowflake engine()
    snowflake_database="ASSIGNMENT3"
    snowflake_schema="cfa_data"
    snowflake_warehouse="cfa_data"
    engine = getSnowflakeEngine(snowflake_database, snowflake_schema, snowflake_warehouse)
    
    # test connectuion
    testConnection(engine)
    
    # create table for CFA data
    table_name = createCFATable(engine)

    # create table for Grobid data
    table_name_gb = createGrobidTable(engine)

    # create metadata table
    table_name_metadata = createMetadataTable(engine)

    # create table for Pypdf data
    table_name_py = createPypdfTable(engine)

    # uploading data to snowflake
    uploadDataToSnowflake(engine, table_name)

    # uploading data to snowflake
    uploadGrobidToSnowflake(engine, table_name_gb)

    # uploading data to snowflake
    uploadPypdfToSnowflake(engine, table_name_py)

    uploadMetadataToSnowflake(engine, table_name_metadata)

Testing Snowflake connection
8.8.4
Successfully connected to Snowflake
-------Starting Snowflake table creation-------
Table created
-------Ending Snowflake table creation-------
-------Starting Snowflake table creation-------
Table created
-------Ending Snowflake table creation-------
-------Starting Snowflake table creation-------
Table created
-------Ending Snowflake table creation-------
-------Starting Snowflake table creation-------
Table created
-------Ending Snowflake table creation-------
-------Starting Data Upload to Snowflake-------
File Format created
Stage created
