In [1]:
print('Data Ingestion has started...')

In [2]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from dotenv import load_dotenv
import os
import psycopg2

print('All dependencies imported')

In [3]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
bucket_name = "d2b-internal-assessment-bucket"

# List all objects in the bucket
response = s3.list_objects(Bucket=bucket_name, Prefix="orders_data")

# Check if there are any objects in the response
if 'Contents' in response:
    # Loop through each object and print its key
    for obj in response['Contents']:
        print("File:", obj['Key'])

File: orders_data/
File: orders_data/analytics_export/faitusie8037/agg_public_holiday.csv
File: orders_data/analytics_export/faitusie8037/agg_shipments.csv
File: orders_data/analytics_export/faitusie8037/best_performing_product.csv
File: orders_data/analytics_export/murtodun9658/agg_public_holiday.csv
File: orders_data/analytics_export/murtodun9658/agg_shipments.csv
File: orders_data/analytics_export/murtodun9658/best_performing_product.csv
File: orders_data/analytics_export/murtodun9658/late_shipments.csv
File: orders_data/analytics_export/murtodun9658/undelivered_shipments.csv
File: orders_data/analytics_export/salimuti8986/agg_public_holiday.csv
File: orders_data/analytics_export/salimuti8986/agg_shipments.csv
File: orders_data/analytics_export/salimuti8986/best_performing_product.csv
File: orders_data/orders.csv
File: orders_data/reviews.csv
File: orders_data/shipment_deliveries.csv


In [4]:
# Download the orders.csv
s3.download_file(bucket_name, "orders_data/orders.csv", "orders.csv")

# Download the reviews.csv
s3.download_file(bucket_name, "orders_data/reviews.csv", "reviews.csv")

# Download the orders.csv
s3.download_file(bucket_name, "orders_data/shipment_deliveries.csv", "shipment_deliveries.csv")

print('All raw files downloaded')

In [5]:
# Read in fact_orders and format types

orders_types = {
    'order_id': int,
    'customer_id': int,
    'product_id': int,
    'unit_price': float,
    'quantity': int,
    'total_price': float
    
}


fact_orders = pd.read_csv('orders.csv', dtype=orders_types, parse_dates=['order_date'])


In [6]:
# Read in fact_reviews and format types

reviews_types = {
    'review': int,
    'product_id': int
}

fact_reviews = pd.read_csv('reviews.csv', dtype=reviews_types)

In [7]:
# Read in fact_shipment_deliveries and format types

shipments_types = {
    'shipment_id': int,
    'order_id': int
    
}


fact_shipment_deliveries = pd.read_csv('shipment_deliveries.csv', dtype=shipments_types, parse_dates=['shipment_date', 'delivery_date'])

In [8]:
# Load environmental variables from the .env file
load_dotenv("/workspaces/Data2Bots-Assessment/2.env")


# Get the environmental variables
PG_USERNAME = os.getenv("PG_USERNAME")
PG_PASS = os.getenv("PG_PASS")
PG_HOST = os.getenv("PG_HOST")
PG_DB = os.getenv("PG_DB")


postgres
postgres
75.119.135.61
d2b_accessment


In [11]:
# Create PostgreSQL Connection
engine = create_engine(f'postgresql://{PG_USERNAME}:{PG_PASS}@{PG_HOST}:5732/{PG_DB}')


In [12]:
engine.connect()

print('Successfully connected to Database')

<sqlalchemy.engine.base.Connection at 0x7fc39aef08b0>

In [68]:
# Create fact_orders table in Database
fact_orders.head(n=0).to_sql(name='fact_orders', con=engine, schema='joshodey2178_staging', if_exists='replace')

print('fact_orders table created')

1000

In [None]:
# Load data into fact_orders table
fact_orders.to_sql(name='fact_orders', con=engine, schema='joshodey2178_staging', if_exists='append')

print('fact_orders table successfully loaded')

In [69]:
# Create fact_reviews table in Database
fact_reviews.head(n=0).to_sql(name='fact_reviews', con=engine, schema='joshodey2178_staging', if_exists='replace', index=False)

print('fact_reviews table created')

0

In [71]:
# Load data into fact_reviews table
fact_reviews.to_sql(name='fact_reviews', con=engine, schema='joshodey2178_staging', if_exists='append', index=False)

print('fact_reviews table successfully loaded')

236

In [72]:
# Create fact_shipment_deliveries table
fact_shipment_deliveries.head(n=0).to_sql(name='fact_shipment_deliveries', con=engine, schema='joshodey2178_staging', if_exists='replace', index=False)

print('fact_shipment_deliveries table created')


0

In [73]:
# Load data into fact_shipment_deliveries table
fact_shipment_deliveries.to_sql(name='fact_shipment_deliveries', con=engine, schema='joshodey2178_staging', if_exists='append', index=False)

print('fact_shipment_deliveries successfully loaded')


1000