In [1]:
! pip install boto3



In [3]:
#Uploading validated data from local machine to Amazon S3 bucket
import os
import boto3
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# AWS credentials
aws_access_key_id = os.getenv("access_id")
aws_secret_access_key = os.getenv("secret_key")

# S3 bucket details
s3_bucket_name = "validateddata"  # Replace with your S3 bucket name

# Local CSV file path
local_csv_path = "C:/Users/deepa/Documents/Repo/Assignment3/validated_CFA.csv"

# AWS S3 client
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

# Upload CSV file to the root of S3 bucket
s3_key = "validated_CFA.csv"
s3_client.upload_file(local_csv_path, s3_bucket_name, s3_key)


In [11]:
# Loading validated data to Snowflake database from Amazon S3 buckeet
import os
from sqlalchemy import create_engine, Column, Integer, String
from dotenv import load_dotenv
import snowflake.connector
import warnings

# Loading the environment variables from .env file
load_dotenv()

# AWS credentials
aws_access_key_id = os.getenv("access_id")
aws_secret_access_key = os.getenv("secret_key")

# Ignore warnings
warnings.filterwarnings("ignore")

# Snowflake engine for creating databases and warehouses
snowflake_engine = create_engine(
    'snowflake://{user}:{password}@{account}/'.format(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT"),
    )
)

try:
    # Snowflake operations
    snowflake_connection = snowflake_engine.connect()

    # Create databases
    create_raw_database_query = "CREATE OR REPLACE DATABASE RAW;"
    create_dbt_database_query = "CREATE OR REPLACE DATABASE DBT;"
    results = snowflake_connection.execute(create_raw_database_query)
    results = snowflake_connection.execute(create_dbt_database_query)

    # Use RAW database
    results = snowflake_connection.execute("USE DATABASE RAW")

    # Create warehouse
    create_warehouse_query = """CREATE OR REPLACE WAREHOUSE TRANSFORM
        WITH WAREHOUSE_SIZE='X-SMALL'
        AUTO_SUSPEND = 180
        AUTO_RESUME = TRUE
        INITIALLY_SUSPENDED=TRUE;"""
    results = snowflake_connection.execute(create_warehouse_query)

    # Create the RAW.MFA_TABLE
    create_raw_table_query = """CREATE OR REPLACE TABLE MFA_TABLE (
        Name_of_the_topic STRING,
        Year INTEGER,
        Level INTEGER,
        Introduction_Summary STRING,
        Learning_Outcomes STRING,
        Link_to_the_Summary_Page STRING(255),
        Link_to_the_PDF_file STRING(255)
    );"""
    results = snowflake_connection.execute(create_raw_table_query)

    # Create stage
    create_stage_query = f"""CREATE STAGE DUMMY
        URL = 's3://validateddata'
        CREDENTIALS = (
            aws_key_id='{aws_access_key_id}' 
            aws_secret_key='{aws_secret_access_key}'
        )
        FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1);"""
    results = snowflake_connection.execute(create_stage_query)

    # Upload data to stage and copy from stage to table
    copy_stage_to_table_raw_query = f"""COPY INTO MFA_TABLE
        FROM @DUMMY
        FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
        PATTERN = '.*validated_CFA.csv.gz'
        ON_ERROR = 'CONTINUE';"""
    results = snowflake_connection.execute(copy_stage_to_table_raw_query)

finally:
    print("Done")
    snowflake_connection.close()
    snowflake_engine.dispose()


Done
