## Loading files to S3

In [1]:
# Imports
from aws_snowflake_config import aws_s3_connection, snowflake_connection
import logging
from botocore.exceptions import ClientError
import os

In [2]:
s3_client, bucket_name = aws_s3_connection()

In [3]:
# Get a list of all the files with the extension in the directory_path
def get_files_in_directory(directory_path, extension):
    files=[]
    for f in os.listdir(directory_path):
        if f.endswith(extension):
            files.append(f)
    return files

In [4]:
# Function  to upload the file of type extension to S3 bucket
def upload_file_to_s3(file_path, type, extension):
    try:
        # Fetching all files in the directory with the given extension
        list_of_files = get_files_in_directory(file_path,extension)
        print("\n",type.rstrip("/"),"files:",)
        print(list_of_files)
        
        # Upload if files exist
        if list_of_files:
            for file in list_of_files:
                try:
                    file_full_path=os.path.join(file_path, file)    # file path in local directory
                    key_value = type + file   # file path in S3 bucket
                    
                    # Upload the file to S3 using boto3
                    response = s3_client.upload_file(file_full_path, bucket_name, key_value)
                    print(f'File uploaded successfully: {file_full_path} -> s3://{bucket_name}/{key_value}')
                except ClientError as e:
                    logging.error(e)
        else:
            print(f'No files found in the {file_path} directory.')
    except Exception as e:
        print(f'Error uploading file {file_path}: {e}')

upload_file_to_s3('../web-scraping-and-dataset/','CSV_Data/','.csv')
upload_file_to_s3('../pdf-extractions/pypdf','PyPDF/','.txt')
upload_file_to_s3('../pdf-extractions/grobid','Grobid/','.txt')



 CSV_Data files:
['scraped_data.csv']
File uploaded successfully: ../web-scraping-and-dataset/scraped_data.csv -> s3://bigdata-group3-assignment2/CSV_Data/scraped_data.csv

 PyPDF files:
['PyPDF_RR_2024_l1_combined.txt', 'PyPDF_RR_2024_l2_combined.txt', 'PyPDF_RR_2024_l3_combined.txt']
File uploaded successfully: ../pdf-extractions/pypdf\PyPDF_RR_2024_l1_combined.txt -> s3://bigdata-group3-assignment2/PyPDF/PyPDF_RR_2024_l1_combined.txt
File uploaded successfully: ../pdf-extractions/pypdf\PyPDF_RR_2024_l2_combined.txt -> s3://bigdata-group3-assignment2/PyPDF/PyPDF_RR_2024_l2_combined.txt
File uploaded successfully: ../pdf-extractions/pypdf\PyPDF_RR_2024_l3_combined.txt -> s3://bigdata-group3-assignment2/PyPDF/PyPDF_RR_2024_l3_combined.txt

 Grobid files:
['Grobid_RR_2024_l1_combined.txt', 'Grobid_RR_2024_l2_combined.txt', 'Grobid_RR_2024_l3_combined.txt']
File uploaded successfully: ../pdf-extractions/grobid\Grobid_RR_2024_l1_combined.txt -> s3://bigdata-group3-assignment2/Grobid/Grob

## Loading metadata of text files to snowflake

In [None]:
from sqlalchemy import create_engine, text
import datetime as dt
import mimetypes
import pandas as pd

### Defining functions to create snowflake objects

In [21]:
def create_internal_stage(engine, stage_name):
    create_stage_query = f"""
    CREATE OR REPLACE STAGE {stage_name};
    """
    with engine.connect() as connection:
        connection.execute(text(create_stage_query))
    print("Stage created")

In [22]:
def put_data_into_stage(engine, csv_file_path, stage_name):
    put_data_query = f"""
    PUT file://{csv_file_path} @{stage_name};
    """
    with engine.connect() as connection:
        connection.execute(text(put_data_query))
    
    print("File added to stage")

In [23]:
def create_table_with_csv_structure(engine, table_name):
    create_table_query = f"""
    CREATE OR REPLACE TABLE {table_name} (
        file_name VARCHAR,
        language VARCHAR,
        version NUMBER,
        encoding VARCHAR,
        file_size NUMBER,
        s3_url VARCHAR
    );
    """
    with engine.connect() as connection:
        connection.execute(text(create_table_query))
    
    print(f"Table {table_name} created")

In [24]:
def load_data_from_stage_to_table(engine, table_name, stage_name):
    copy_into_query = f"""
    COPY INTO {table_name} FROM @{stage_name} FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1) ON_ERROR = 'CONTINUE';
    """
    with engine.connect() as connection:
        connection.execute(text(copy_into_query))
    
    print(f"Data Loaded from {stage_name} into table {table_name}")

### Loading metadata of grobid to snowflake

Steps to upload metadata to Snowflake table:  
1. Create an engine for Snowflake connection
2. Create an internal stage in Snowflake  
3. Create a table with reference to CSV file structure  
4. Adding file into stage  
5. Load data from stage to table using COPY INTO command

In [25]:
# Creating SQLAlchemy connection and engine for snowflake
user, password, account, warehouse, database, schema = snowflake_connection()
sql_engine = create_engine(f'snowflake://{user}:{password}@{account}/?warehouse={warehouse}&database={database}&schema={schema}')

# creating internal stage
create_internal_stage(sql_engine, 'grobid_metadata_stage')

# Adding file to stage
put_data_into_stage(sql_engine, './metadata-grobid.csv', 'grobid_metadata_stage')

# creating table into snowflake
create_table_with_csv_structure(sql_engine, 'grobid_metadata')

# loading data from stage to table
load_data_from_stage_to_table(sql_engine, 'grobid_metadata','grobid_metadata_stage')

Stage created
File added to stage
Table grobid_metadata created
Data Loaded from grobid_metadata_stage into table grobid_metadata
