In [2]:
import pandas as pd
from sqlalchemy import create_engine , text
from sqlalchemy.engine import URL
import boto3
import json
from datetime import date
import datetime

In [3]:
def extract(s3, filename, bucketname):
    file = s3.get_object(Bucket=bucketname, Key=filename)
    data = file['Body'].read().decode('utf-8')                               # Read the body
    json_data = json.loads(data)                                             # Parse as JSON
    return json_data

def transform(data):
    orders_data = []
    products_data = []
    customers_data = []
    for order in data:                                                       #Each order is a separate item in the list which is flattened using for loops

        customer_row = {
            "order_id" : order['order_id'],
            "customer_id" : order['customer']['customer_id'],
            "name" : order['customer']['name'],
            "email" : order['customer']['email'],
            "address" : order['customer']['address']
        }
        customers_data.append(customer_row)
        
        for product in order['products']:                                   #If a order has multiple products, each product is flattened into a separate row
            
            order_row = {
                "order_id" : order['order_id'],
                "order_date" : order['order_date'],
                "total_amount" : order['total_amount'],
                "customer_id" : order['customer']['customer_id'],
                "product_id" : product['product_id'],
                "quantity" : product['quantity'],
            }
            orders_data.append(order_row)
        
            product_row = {
               "order_id" : order['order_id'],
                "product_id" : product['product_id'],
                "name" : product['name'],
                "category" : product['category'],
                "price" : product['price']
            }
            products_data.append(product_row)
        
    df_orders = pd.DataFrame(orders_data)                                     #creating a orders dataframe which has multiple rows for each order id if there are more than one product

    df_products = pd.DataFrame(products_data)
    df_products.sort_values(by=['product_id', 'order_id'], ascending=[True, False], inplace=True)
    df_products.drop_duplicates('product_id', keep='first', inplace=True)
    df_products = df_products.reset_index(drop=True)
    df_products.drop(columns='order_id', inplace=True)                        #creating a products dataframe with no duplicates
    
    df_customers = pd.DataFrame(customers_data)
    df_customers.sort_values(by=['customer_id', 'order_id'], ascending=[True, False], inplace=True)
    df_customers.drop_duplicates('customer_id', keep='first', inplace=True)
    df_customers = df_customers.reset_index(drop=True)
    df_customers.drop(columns='order_id', inplace=True)                       #creating a customers dataframe with no duplicates

    return df_orders, df_products, df_customers

def load(df_orders, df_products, df_customers, engine):
    df_customers.to_sql(name="customers_stg", con=engine, index=False, if_exists="replace") #load the customers from recent file into a staging table
    df_products.to_sql(name="products_stg", con=engine, index=False, if_exists="replace")   #load the products from recent file into a staging table


    #The Python ETL logic inserts new versioned records into the Products dimension table with effective dates and status flags, 
    #while preserving historical entries, to implement Slowly Changing Dimension Type 2.
    
    update_query_products = text('''UPDATE products1 p 
                                    JOIN products_stg s ON p.product_id = s.product_id
                                    SET p.end_date = CURDATE() - INTERVAL 1 DAY WHERE p.end_date = '9999-12-31' 
                                    AND NOT (p.price<=>s.price AND p.name<=>s.name AND p.category<=>s.category)''')
    
    insert_query_products = text('''INSERT INTO products1 (product_id, name, category, price, start_date, end_date)
                                    SELECT s.product_id, s.name, s.category, s.price, CURDATE(), '9999-12-31' FROM products_stg s
                                    LEFT JOIN products1 p ON p.product_id = s.product_id AND p.end_date = '9999-12-31'                  
                                    WHERE p.product_id IS NULL OR NOT (p.price<=>s.price AND p.name<=>s.name AND p.category<=>s.category)''')

    #The Python ETL logic updates the Customers dimension table in place, overwriting old attribute values with the latest source data 
    #to implement Slowly Changing Dimension Type 1. 
    
    update_query_customers = text('''update customers1 
             inner join  customers_stg on customers1.customer_id=customers_stg.customer_id
             set customers1.email = customers_stg.email , customers1.name = customers_stg.name
             ,customers1.address = customers_stg.address''')
    
    insert_query_customers = text('''insert into customers1 (customer_id,name,email,address)
                     select * from customers_stg
                     where customer_id not in (select customer_id from customers1)''')

    #The update and insert statements are executed in MySQL
    with engine.connect() as conn:
        pu = conn.execute(update_query_products)
        pi = conn.execute(insert_query_products)
        cu = conn.execute(update_query_customers)
        ci = conn.execute(insert_query_customers)
        conn.commit()

    #The dimension tables are read back into a dataframe
    df_mysql_products = pd.read_sql("select * from products1", con=engine)
    df_mysql_customers = pd.read_sql("select * from customers1", con=engine)

    #The fact table is joined with the dimension table to get the surrogate keys and then uploaded into MySQL
    df_orders_products = pd.merge(left=df_orders, right=df_mysql_products.loc[df_mysql_products['end_date'] == date(9999, 12, 31), ['product_id', 'product_sk']], how='inner', on='product_id')
    df_orders_products_customers = pd.merge(left=df_orders_products, right=df_mysql_customers.loc[df_mysql_customers['end_date'] == date(9999, 12, 31),['customer_id', 'customer_sk']], how='inner', on='customer_id')
    columns = ['order_id','order_date','total_amount','customer_sk','product_sk', 'quantity']
    df_orders_final = df_orders_products_customers[columns]
    df_orders_final.to_sql(name="orders1", con=engine, index=False, if_exists="append")

def main():
    #Creating a connection with MySQL database where the dimension and fact tables will be updated
    engine = create_engine(URL.create(
    "mysql+pymysql",
    username="root",
    password="#######",      
    host="#######",
    port=3306,
    database="python",
    ))

    #creating a connection with Amazon S3 to get the daily order files
    s3 = boto3.client(
    "s3",
    aws_access_key_id="#########",
    aws_secret_access_key="#########",
    verify=False  # <- USE SECURE CA BUNDLE
    )
    
    today = date.today()
    formatted = today.strftime("%Y%m%d")
    
    bucket_name = "namaste-python-ram"
    response = s3.list_objects_v2(Bucket=bucket_name) #To get list of files and folders within a specific bucket in S3

    #Getting the list of JSON files containing the orders for the specific day and processing them one by one in a loop
    for obj in response.get("Contents", []):
        if obj["Key"].split('.')[-1] == "json" and obj['Key'].split('/')[1] == formatted and obj['Key'].split('/')[0] == "etl_pipeline":  
            data = extract(s3, obj["Key"], bucket_name)               #extracts the data in a file into JSON format
            df_orders, df_products, df_customers = transform(data)    #flattens the data in JSON format into fact and dimension tables in dataframe format
            load(df_orders, df_products, df_customers, engine)        #loads the data into relevant tables in warehouse in MySQL
        else:
            continue

main()
    

