## Upload files to S3 Bucket

- Write a Python function to upload both the structured data (CSV) and the
extracted text files (from both Grobid and PyPDF) into an AWS S3 bucket.
- Utilize SQLAlchemy to upload the structured metadata from step 2
(Grobid) including the link to the uploaded txt file (from S3) the into a
Snowflake database.
- This function should be documented within a Python notebook

### Imports

In [1]:
import warnings
warnings.filterwarnings("ignore")

from sqlalchemy import Boolean, Column, Integer, String

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv
import boto3
import os

In [2]:
load_dotenv('../config/.env',override=True)

True

## PART 1
### Loading env variables for S3 

In [3]:
def loadenv():
    s3_bucket_name = os.getenv("S3_BUCKET_NAME")
    s3_pypdf = os.getenv("S3_PYPDF_FOLDER_NAME")
    s3_grobid = os.getenv("S3_GROBID_FOLDER_NAME")
    access_key = os.getenv("S3_ACCESS_KEY")
    secret_key = os.getenv("S3_SECRET_KEY")
    region = os.getenv("S3_REGION")
    return s3_bucket_name, s3_pypdf, s3_grobid, access_key, secret_key, region

In [4]:
s3_bucket_name, s3_pypdf, s3_grobid, access_key, secret_key, region = loadenv()

### Function to upload .txt files to a particular folder inside s3 bucket

In [5]:
def upload_text_files_to_s3_folder(local_path, bucket_name, s3_folder):
    # Create an S3 client
    s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name = region)

    # Iterate through all files in the local directory
    for filename in os.listdir(local_path):
        if filename.endswith(".txt"):
            local_file_path = os.path.join(local_path, filename)
            s3_object_key = f"{s3_folder}/{filename}"

            # Check if the file already exists in S3
            try:
                s3.head_object(Bucket=bucket_name, Key=s3_object_key)
                print(f"File {filename} already exists in S3. Overwriting...")
            except Exception as e:
                # If the file doesn't exist, upload it
                try:
                    s3.upload_file(local_file_path, bucket_name, s3_object_key)
                    print(f"File {filename} uploaded successfully to S3: s3://{bucket_name}/{s3_object_key}")
                except Exception as upload_error:
                    print(f"Error uploading file {filename} to S3: {upload_error}")
        elif filename == "metadata_output.csv":
            local_file_path = os.path.join(local_path, filename)
            s3_object_key = f"{s3_folder}/{filename}"

            # Check if the file already exists in S3
            try:
                s3.head_object(Bucket=bucket_name, Key=s3_object_key)
                print(f"File {filename} already exists in S3. Overwriting...")
            except Exception as e:
                # If the file doesn't exist, upload it
                try:
                    s3.upload_file(local_file_path, bucket_name, s3_object_key)
                    print(f"File {filename} uploaded successfully to S3: s3://{bucket_name}/{s3_object_key}")
                except Exception as upload_error:
                    print(f"Error uploading file {filename} to S3: {upload_error}")

### Function to upload .txt files to S3 bucket

In [6]:
def upload_text_files_to_s3_root(local_path, s3_bucket_name):
    # Create an S3 client
    s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name = region)

    # List all files in the local path
    local_files = os.listdir(local_path)

    for file_name in local_files:
        if file_name == '224_links.txt':  # Upload only text files, adjust the condition based on your file types
            local_file_path = os.path.join(local_path, file_name)

            # Specify the S3 key (file path within the bucket)
            s3_key = file_name  # This will upload directly to the root of the S3 bucket

            # Upload the file to S3
            try:
                s3.upload_file(local_file_path, s3_bucket_name, s3_key)
                print(f"Successfully uploaded {file_name} to S3 bucket {s3_bucket_name}")
            except Exception as e:
                print(f"Error uploading {file_name} to S3: {e}")
        elif file_name == 'metadata_output.csv':  # Upload only text files, adjust the condition based on your file types
            local_file_path = os.path.join(local_path, file_name)

            # Specify the S3 key (file path within the bucket)
            s3_key = file_name  # This will upload directly to the root of the S3 bucket

            # Upload the file to S3
            try:
                s3.upload_file(local_file_path, s3_bucket_name, s3_key)
                print(f"Successfully uploaded {file_name} to S3 bucket {s3_bucket_name}")
            except Exception as e:
                print(f"Error uploading {file_name} to S3: {e}")

### Uploading .txt files generated using PyPDF to S3 

In [7]:
local_path = '../sample_output/PyPDF'

# Upload only new text files or overwrite existing ones in the specified S3 folder
upload_text_files_to_s3_folder(local_path, s3_bucket_name, s3_pypdf)

File PyPDF_RR_2024_l2_combined.txt uploaded successfully to S3: s3://cfa-pdfs/pypdf/PyPDF_RR_2024_l2_combined.txt
File PyPDF_RR_2024_l3_combined.txt uploaded successfully to S3: s3://cfa-pdfs/pypdf/PyPDF_RR_2024_l3_combined.txt
File PyPDF_RR_2024_l1_combined.txt uploaded successfully to S3: s3://cfa-pdfs/pypdf/PyPDF_RR_2024_l1_combined.txt


### Uploading .txt files generated using GROBID to S3 

In [8]:
local_path = '../sample_output/Grobid'

# Upload only new text files or overwrite existing ones in the specified S3 folder
upload_text_files_to_s3_folder(local_path, s3_bucket_name, s3_grobid)

File Grobid_RR_2024_l1_combined.txt uploaded successfully to S3: s3://cfa-pdfs/grobid/Grobid_RR_2024_l1_combined.txt
File Grobid_RR_2024_l2_combined.txt uploaded successfully to S3: s3://cfa-pdfs/grobid/Grobid_RR_2024_l2_combined.txt
File Grobid_RR_2024_l3_combined.txt uploaded successfully to S3: s3://cfa-pdfs/grobid/Grobid_RR_2024_l3_combined.txt


### Uploading metadata.csv and scraped links to S3

In [9]:
local_path = '../sample_output/'
upload_text_files_to_s3_root(local_path, s3_bucket_name)

Successfully uploaded metadata_output.csv to S3 bucket cfa-pdfs


## PART 2
### Env Variables for Snowflake 

In [10]:
def loadenv_snowflake():
    user = os.getenv("SNOWFLAKE_USER")
    password = os.getenv("SNOWFLAKE_PASSWORD")
    db = os.getenv("SNOWFLAKE_DATABASE")
    account_identifier = os.getenv("SNOWFLAKE_ACCOUNT_IDENTIFIER")
    wh = os.getenv("SNOWFLAKE_WAREHOUSE")
    S3_META_BUCKET = os.getenv("S3_BUCKET_NAME")
    S3_META_ACCESS_KEY = os.getenv('S3_ACCESS_KEY')
    S3_META_SECRET_KEY = os.getenv("S3_SECRET_KEY")
    return user,password ,db ,account_identifier,wh, S3_META_BUCKET, S3_META_ACCESS_KEY, S3_META_SECRET_KEY

In [11]:
user , password, db, account_identifier, wh, S3_META_BUCKET, S3_META_ACCESS_KEY, S3_META_SECRET_KEY = loadenv_snowflake()


### Connecting to Snowflake 

In [12]:
def connectionToSnow(path='../config/.env',connection_test=False):
    load_dotenv(path,override=True)
    user, password, _, account_identifier,_,_,_,_ = loadenv_snowflake()
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user=user,
            password=password,
            account_identifier=account_identifier,
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
        if connection_test:
            connection.close()
        else:
            return connection
    finally:
        engine.dispose()

In [13]:
connection = connectionToSnow()

8.6.2


### Utility function execute statements

In [14]:
def execute(connection,query):
    try:
        results = connection.execute(query)
    except Exception as e:
        print("error-->",e)
    finally:
        print("Done")

### Setting up env in Snowflake

In [15]:
## setting up env in sonwflake for connection
execute(connection,"USE ROLE {};".format('ACCOUNTADMIN'))
execute(connection,"USE WAREHOUSE {}".format(wh))
execute(connection,"USE DATABASE {};".format(db))
        

Done
Done
Done


### Staging the data in S3, External storage

In [16]:
staging_query = """CREATE OR REPLACE STAGE META_S3_STAGE
  URL='s3://{}'
  CREDENTIALS=(AWS_KEY_ID='{}' AWS_SECRET_KEY='{}')
  FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"');""".format(S3_META_BUCKET, S3_META_ACCESS_KEY, S3_META_SECRET_KEY)


execute(connection,staging_query)

Done


### Creating table to consume data in Snowflake

In [17]:
create_table = """CREATE OR REPLACE TABLE CFA_META_R (
    `File_Key` VARCHAR(255),
    `Last_Modified_Grobid` VARCHAR(255),
    `ETag_Grobid` VARCHAR(255),
    `Size_Grobid` VARCHAR(255),
    `S3_Link_Grobid` VARCHAR(500),
    `File_Type_Grobid` VARCHAR(50),
    `Last_Modified_PyPDF` VARCHAR(255),
    `ETag_PyPDF` VARCHAR(255),
    `Size_PyPDF` VARCHAR(255),
    `S3_Link_PyPDF` VARCHAR(500),
    `File_Type_PyPDF` VARCHAR(255)
);"""

execute(connection,create_table)

Done


### Publishing data from S3 to Snowflake 

In [18]:

put_table = """
COPY INTO "{}"
FROM '@"{}"."PUBLIC"."{}"'
FILES = ('metadata_output.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
)""".format('CFA_META_R',db,'META_S3_STAGE')

execute(connection,put_table)

Done
