In [11]:
import psycopg2
import pandas as pd
import boto3
from io import StringIO
import json
from sqlalchemy import create_engine
import urllib.parse
import logging

In [12]:
logging.basicConfig(level=logging.INFO)

In [13]:
def credentialFile(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

In [14]:
def read_csv_from_s3(bucket_name, ETL_file_key, credentials):
    try:
        # Create an S3 client
        s3 = boto3.client('s3',
                          aws_access_key_id=credentials['aws_access_key_id'],
                          aws_secret_access_key=credentials['aws_secret_access_key'])

        # Read the CSV file from S3
        response = s3.get_object(Bucket=bucket_name, Key=ETL_file_key)
        csv_content = response['Body'].read().decode('utf-8')

        # Convert the CSV content to a DataFrame
        data = pd.read_csv(StringIO(csv_content))

        logging.info(f"File {ETL_file_key} read from S3 bucket {bucket_name} successfully.")
        return data

    except s3.exceptions.NoSuchKey:
        logging.error(f"The file {file_key} does not exist in the bucket {bucket_name}.")
        raise
    except Exception as e:
        logging.error(f"An error occurred while reading {file_key} from S3: {e}")
        raise

In [21]:
def copy_to_redshift(table_name, file_key, redshift_credentials, credentials, bucket_name):
    try:
        conn = psycopg2.connect(
            dbname=redshift_credentials['db'],
            user=redshift_credentials['user'],
            password=redshift_credentials['password'],
            host=redshift_credentials['host'],
            port=redshift_credentials['port']
        )
        cursor = conn.cursor()
        
        copy_query = f"""
        COPY {table_name}
        FROM 's3://{bucket_name}/{file_key}'
        iam_role 'iam_role_link'
        CSV  
        IGNOREHEADER 1;
        """

        cursor.execute(copy_query)
        conn.commit()

        logging.info(f"Copied data from S3 to Redshift table {table_name} successfully.")
        
    except Exception as e:
        logging.error(f"Failed to copy data to Redshift table {table_name}: {e}")
        raise

    finally: 
        cursor.close()
        conn.close()   

In [20]:
def main():
    bucket_name = 'bucket_name'
    credentials_file = 'credential_file_name'
    redshift_credentials_file = 'redshift_credential_file_name'


    # Load credentials
    credentials = credentialFile(credentials_file)
    redshift_credentials = credentialFile(redshift_credentials_file)

    # File keys for the uploaded dimension and fact tables
    file_keys = {
        'product': 'product_file_name',
        'customer': 'customer_file_name',
        'date': 'date_file_name',
        'location': 'location_file_name',
        'fact': 'fact_file_name'
    }

    try:    
        # Read dimension and fact tables from S3
        dataframes = {}
        for table_name, file_key in file_keys.items():
            df = read_csv_from_s3(bucket_name, file_key, credentials)
            dataframes[table_name] = df

            # Copy each dataframe to Redshift
            copy_to_redshift(table_name, file_key, redshift_credentials, credentials, bucket_name)

    except Exception as e:
        logging.error(f"Failed to read data: {e}")

In [18]:
if __name__ == "__main__":
    main()

INFO:root:File product_dimension.csv read from S3 bucket fileuplodtesting successfully.
INFO:root:Copied data from S3 to Redshift table product successfully.
INFO:root:File customer_dimension.csv read from S3 bucket fileuplodtesting successfully.
INFO:root:Copied data from S3 to Redshift table customer successfully.
INFO:root:File date_dimension.csv read from S3 bucket fileuplodtesting successfully.
INFO:root:Copied data from S3 to Redshift table date successfully.
INFO:root:File location_dimension.csv read from S3 bucket fileuplodtesting successfully.
INFO:root:Copied data from S3 to Redshift table location successfully.
INFO:root:File fact_table.csv read from S3 bucket fileuplodtesting successfully.
INFO:root:Copied data from S3 to Redshift table fact successfully.
