In [1]:
from psycopg2 import sql
import pandas as pd
import boto3
import os
from dotenv import load_dotenv
folder_path = r"../Resources/data/"
env_path = r"../Resources/env/"
load_dotenv(env_path + 'AwsCfg.env')

table_name = 'GroceryProducts'
default_region = os.getenv("region")
s3 = boto3.client('s3')
bucket_name = 'posgresdata'

In [None]:
import psycopg2
import csv

folder_path = r"../Resources/data/"
# Connect to PostgreSQL
conn = psycopg2.connect(
    host="localhost",
    database="DA2",
    user="postgres",
    password="123456",
    port = '5432'
)
# Save data as CSV with specified encoding
csv_file_path_train_data = 'GroceryProducts.csv'
csv_file_path_product_data = 'ProductsData.csv'
csv_file_path_customer_data = 'CustomersData.csv'
csv_file_path_reviews_data = 'ReviewsData.csv'

# Query data
train_data_query = 'select r.customer_fk as customer_id,p.product_pk as product_id,p.product_title,p.product_category,r.star_rating,r.review_date from core.products p left join core.reviews r on p.product_pk = r.product_fk order by p.product_pk ASC'
product_query = 'select * from core.products order by product_pk asc'
customer_query = 'select * from core.customers order by customer_pk asc'
review_query = 'select * from core.reviews order by review_pk asc'
# Define a list of queries and corresponding file names
queries = [
    (train_data_query, csv_file_path_train_data),
    (product_query, csv_file_path_product_data),
    (customer_query, csv_file_path_customer_data),
    (product_query, csv_file_path_reviews_data),
]
cursor = conn.cursor()
for query, csv_filename in queries:
    # Execute the query
    cursor.execute(query)
    # Save data as CSV with specified encoding
    csv_file_path = folder_path + csv_filename
    with open(csv_file_path, "w", newline="", encoding="utf-8") as csv_file:
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow([desc[0] for desc in cursor.description])  # Write column headers
        csv_writer.writerows(cursor)
        
     # Upload CSV to S3
    s3_file_path = 's3://' + bucket_name + '/' + csv_filename
    s3.upload_file(csv_file_path, bucket_name, os.path.basename(s3_file_path))
    print('Upload successful')



cursor.close()
conn.close()


In [41]:
s3 = boto3.client('s3')
bucket_name = 'posgresdata'

response = s3.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={
        'LocationConstraint': 'ap-southeast-1'
    }
)

print('Bucket created:', response)


Bucket created: {'ResponseMetadata': {'RequestId': 'HG1WT9ABFF9X8E6C', 'HostId': 'qx+Vd/eIb63/P9tG92pUUiK3/zW59wEz6xgQvtjAN8Y8nPTSsiyavvLvEfGwiPt1RsjR3UDEyADyKnuAyxUhlA==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'qx+Vd/eIb63/P9tG92pUUiK3/zW59wEz6xgQvtjAN8Y8nPTSsiyavvLvEfGwiPt1RsjR3UDEyADyKnuAyxUhlA==', 'x-amz-request-id': 'HG1WT9ABFF9X8E6C', 'date': 'Mon, 14 Aug 2023 13:18:49 GMT', 'location': 'http://posgresdata.s3.amazonaws.com/', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'Location': 'http://posgresdata.s3.amazonaws.com/'}


In [43]:
dbname = os.getenv("dbname")
user = os.getenv("user")
password = os.getenv("password")
host = os.getenv("host")
port = os.getenv("port")
role = os.getenv("role")

# Construct the connection string
conn_string = f"dbname='{dbname}' user='{user}' password='{password}' host='{host}' port='{port}'"

try:
    # Connect to Redshift
    redshift_conn = psycopg2.connect(conn_string)
    print("Connected to Redshift successfully!")
    # Create a cursor for Redshift
    redshift_cursor = redshift_conn.cursor()

    # Define the TABLE query
    drop_query = f"""DROP TABLE IF EXISTS public.{table_name};"""
    
    create_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            customer_id int,
            product_id int,
            product_title varchar(512),
            product_category varchar(256),
            star_rating int,
            review_date date
        )
        SORTKEY (product_id);"""
    
    copy_query = f"""COPY dev.public.{table_name} (customer_id, product_id, product_title, product_category, star_rating, review_date) 
    FROM '{s3_file_path}' IAM_ROLE '{role}' FORMAT AS CSV DELIMITER ',' QUOTE '"' IGNOREHEADER 1 REGION AS '{default_region}'"""
    
    # Execute the CREATE TABLE query
    redshift_cursor.execute(drop_query)
    print("drop query sucessfully")
    redshift_cursor.execute(create_query)
    print("create query sucessfuly")
    redshift_cursor.execute(copy_query)
    print("copy query sucessfuly")
    redshift_conn.commit()

except Exception as e:
    print("Error:", e)

finally:
    if redshift_conn is not None:
        redshift_conn.close()
        print("Connection to Redshift closed.")


Connected to Redshift successfully!
drop query sucessfully
create query sucessfuly
copy query sucessfuly
Connection to Redshift closed.
