### Clean CSVs upload to Snowflake

In [1]:
from sqlalchemy import create_engine, text

##### Functions to create snowflake connection, internal staging, creating table, staging data and loading staged data to Snowflake database

In [2]:
import configparser
config = configparser.ConfigParser()
config.read('configuration.properties')

user = config['SNOWFLAKE']['User']
password = config['SNOWFLAKE']['Password']
account = config['SNOWFLAKE']['Account']
warehouse = config['SNOWFLAKE']['Warehouse']
database = config['SNOWFLAKE']['Database']
schema = config['SNOWFLAKE']['Schema']

# Create a connection string
connection_string = f'snowflake://{user}:{password}@{account}/' \
                        f'?warehouse={warehouse}&database={database}&schema={schema}'

In [3]:
# Function to create stage
def create_internal_stage_for_classes(engine, stage_name):
    create_stage_query = f"""
    CREATE OR REPLACE STAGE {stage_name};
    """
    with engine.connect() as connection:
        connection.execute(text(create_stage_query))
    
    print(f"Stage {stage_name} created successfully")

In [4]:
# Function to create table with specified table and column names
def create_table_for_csv_files(engine, table_name, column_names):
    create_table_query = f"""
    CREATE OR REPLACE TABLE {table_name} (
        {column_names}
    );
    """
    with engine.connect() as connection:
        connection.execute(text(create_table_query))
    
    print(f"Table {table_name} created successfully")    

In [5]:
# Function to put data into internal stage
def put_data_into_stage(engine, csv_file_path, stage_name):
    put_data_query = f"""
    PUT file://{csv_file_path} @{stage_name};
    """
    with engine.connect() as connection:
        connection.execute(text(put_data_query))
    
    print(f"Data Loaded into {stage_name} successfully")

In [6]:
# function to create file format with given file format name and field delimeter
def creating_file_format(engine, ff_name, field_delim):
    create_ff_query = f"""
    CREATE OR REPLACE FILE FORMAT {ff_name}
    TYPE = 'CSV'
    FIELD_DELIMITER = '{field_delim}'
    SKIP_HEADER = 1
    SKIP_BLANK_LINES = True
    EMPTY_FIELD_AS_NULL = true
    TRIM_SPACE = True;
    """
    # print(create_ff_query)
    with engine.connect() as connection:
        connection.execute(text(create_ff_query))
    
    print(f"File format {ff_name} created successfully")

In [7]:
# function to load data from stage to table
def load_data_from_stage_to_table(engine, table_name, stage_name, ff_name):
    copy_into_query = f"""
    COPY INTO {table_name} FROM @{stage_name} FILE_FORMAT = (FORMAT_NAME = {ff_name});
    """
    with engine.connect() as connection:
        connection.execute(text(copy_into_query))
    
    print(f"Data loaded from {stage_name} to {table_name} successfully")

Steps to upload data to Snowflake Database:  
1. Create an engine for Snowflake connection  
2. Create an internal stage in Snowflake  
3. Create a table with reference to CSV structure  
4. Put data into stage  
5. Load data from stage to table using COPY INTO command

In [8]:
# Create an engine for Snowflake Connection
engine = create_engine(connection_string)
print("Engine created for snowflake sqlalchemy")

# declaring stage, file formats and table names
url_stage_name = 'url_class_stage'
pdf_metadata_stage_name = 'pdf_metadata_stage'
pdf_content_stage_name = 'pdf_content_stage'

url_ff = 'url_data_ff' 
pdf_data_ff = 'pdf_data_ff'

url_table = 'URLDATA'
pdf_metadata_table = 'PDFMETADATA'
pdf_content_table = 'PDFCONTENTDATA'

# Creating 3 stages for URLClass, PDF metadata and PDF content
create_internal_stage_for_classes(engine, url_stage_name)
create_internal_stage_for_classes(engine, pdf_metadata_stage_name)
create_internal_stage_for_classes(engine, pdf_content_stage_name)

# Create a table
create_table_for_csv_files(engine, url_table, 'topic_name VARCHAR, year INT, level VARCHAR, introduction VARCHAR,learning_outcome VARCHAR, summary VARCHAR, summary_page_link VARCHAR, pdf_file_link VARCHAR')
create_table_for_csv_files(engine, pdf_metadata_table, 'text VARCHAR, para INT, bboxes VARIANT, pages VARIANT, section_title VARCHAR, section_number VARCHAR, paper_title VARCHAR, file_path VARCHAR')
create_table_for_csv_files(engine, pdf_content_table, 'title VARCHAR, topic_name VARCHAR, year INT, level VARCHAR, learning_outcome VARCHAR')

# create file format
creating_file_format(engine, url_ff, '\\t')
creating_file_format(engine, pdf_data_ff, '|')

# Stage the data
put_data_into_stage(engine, 'data\output\clean_url_class.csv', url_stage_name)
put_data_into_stage(engine, 'data\output\clean_pdf_metadata.csv', pdf_metadata_stage_name)
put_data_into_stage(engine, 'data\output\clean_pdf_content.csv', pdf_content_stage_name)

# Load data from stage to table 
load_data_from_stage_to_table(engine, url_table, url_stage_name, url_ff)
load_data_from_stage_to_table(engine, pdf_metadata_table, pdf_metadata_stage_name, pdf_data_ff)
load_data_from_stage_to_table(engine, pdf_content_table, pdf_content_stage_name, pdf_data_ff)

Engine created for snowflake sqlalchemy
Stage url_class_stage created successfully
Stage pdf_metadata_stage created successfully
Stage pdf_content_stage created successfully
Table URLDATA created successfully
Table PDFMETADATA created successfully
Table PDFCONTENTDATA created successfully
File format url_data_ff created successfully
File format pdf_data_ff created successfully
Data Loaded into url_class_stage successfully
Data Loaded into pdf_metadata_stage successfully
Data Loaded into pdf_content_stage successfully


  connection.execute(text(copy_into_query))


Data loaded from url_class_stage to URLDATA successfully
Data loaded from pdf_metadata_stage to PDFMETADATA successfully
Data loaded from pdf_content_stage to PDFCONTENTDATA successfully
