In [11]:
#Importing all required modules
import boto3
import json
import pandas as pd
from sqlalchemy import create_engine, text

In [2]:
#opening and loading of json file and retuning in list of dictionaries

# def extract():
#     f = open("orders_ETL_incremental.json","r")
#     data = json.load(f)
#     return data

In [16]:
#opening and loading of json file and retuning in list of dictionaries
s3 = boto3.client(
    "s3",
    aws_access_key_id="dummy",
    aws_secret_access_key="dummy",
    region_name="us-east-1"
)
def extract():
    bucket_name = "namaste-kart"
    file_key = "orders/20251120/orders_ETL.json"     # path inside bucket

    response = s3.get_object(Bucket=bucket_name, Key=file_key)

    content = response["Body"].read().decode("utf-8")
    data = json.loads(content)

    return data

In [17]:
data = extract()


In [18]:
data

[{'order_id': 1,
  'order_date': '2024-01-10',
  'total_amount': 200.5,
  'customer': {'customer_id': 101,
   'name': 'John Doe',
   'email': 'johndoe@example.com',
   'address': '123 Main St, Springfield'},
  'products': [{'product_id': 'P01',
    'name': 'Wireless Mouse',
    'category': 'Electronics',
    'price': 25.0,
    'quantity': 2},
   {'product_id': 'P02',
    'name': 'Bluetooth Keyboard',
    'category': 'Electronics',
    'price': 45.0,
    'quantity': 1}]},
 {'order_id': 2,
  'order_date': '2024-01-12',
  'total_amount': 150.0,
  'customer': {'customer_id': 102,
   'name': 'Jane Smith',
   'email': 'janesmith@example.com',
   'address': '456 Oak St, Springfield'},
  'products': [{'product_id': 'P03',
    'name': 'Laptop Stand',
    'category': 'Electronics',
    'price': 75.0,
    'quantity': 1}]},
 {'order_id': 3,
  'order_date': '2024-01-12',
  'total_amount': 120.0,
  'customer': {'customer_id': 103,
   'name': 'Alice Johnson',
   'email': 'alicejohnson@example.com',
 

In [4]:
for order in data :
    print(order)
    break

{'order_id': 13, 'order_date': '2024-01-10', 'total_amount': 200.5, 'customer': {'customer_id': 101, 'name': 'John Doe Update', 'email': 'johndoe@example.com', 'address': '123 Main St, Springfield'}, 'products': [{'product_id': 'P01', 'name': 'Wireless Mouse Update', 'category': 'Electronics', 'price': 25.0, 'quantity': 2}, {'product_id': 'P06', 'name': 'Bluetooth Keyboard New', 'category': 'Electronics', 'price': 50.0, 'quantity': 1}]}


In [5]:
#Next step is to transform raw data into structured format

#initializing three lists
def transform(data):
    orders_data = []
    products_data = []
    customer_data = []

#looping through each order
#Extracting customers and products data --> Dim Table
#Extracting order data --> Fact Table

    for order in data:
        row_customers={      
            "order_id" : order["order_id"],
            "customer_id": order["customer"]["customer_id"],
            "name" : order["customer"]["name"],
            "email" : order["customer"]["email"],
            "address":order["customer"]["address"]
        }
        
        customer_data.append(row_customers)
        
        for product in order["products"]:
            row_orders={
                "order_id" : order["order_id"],
                "order_date":order["order_date"],
                "total_amount":order["total_amount"],
                "customer_id":order["customer"]["customer_id"],
                "product_id":product["product_id"],
                "quantity":product["quantity"] 
            }
                
                
            row_products={
                "order_id":order["order_id"],
                "product_id":product["product_id"],
                "name":product["name"],
                "category":product["category"],
                "price":product["price"]
            }
            
            products_data.append(row_products)
            orders_data.append(row_orders)
    
    df_orders = pd.DataFrame(orders_data)
    
    df_products = pd.DataFrame(products_data)
    df_sorted = df_products.sort_values(["product_id","order_id"])
    df_products_dim = df_sorted.drop_duplicates("product_id", keep = "last")
    df_products_dim = df_products_dim.drop(columns = "order_id")        
    
    
    df_customers = pd.DataFrame(customer_data)
    df_sorted = df_customers.sort_values(["customer_id","order_id"])
    df_customers_dim = df_sorted.drop_duplicates("customer_id",keep = "last")
    df_customers_dim = df_customers_dim[["customer_id","name","email","address"]]
    
    return df_orders, df_products_dim, df_customers_dim
                
            
    


In [6]:
df1,df2,df3 = transform(data)

In [7]:
df1

Unnamed: 0,order_id,order_date,total_amount,customer_id,product_id,quantity
0,13,2024-01-10,200.5,101,P01,2
1,13,2024-01-10,200.5,101,P06,1


In [8]:
df2

Unnamed: 0,product_id,name,category,price
0,P01,Wireless Mouse Update,Electronics,25.0
1,P06,Bluetooth Keyboard New,Electronics,50.0


In [9]:
df3

Unnamed: 0,customer_id,name,email,address
0,101,John Doe Update,johndoe@example.com,"123 Main St, Springfield"


# LOADING DATA

In [10]:
#The function loads transformed data into MySql database

def load(df_orders,df_products_dim,df_customers_dim, engine):
    
    #loading dimension table data into staging tables
    #if table already exists replacing it
    
    df_customers_dim.to_sql(name = "customers_stg", con =engine, index = False, if_exists = "replace")
    df_products_dim.to_sql(name = "products_stg", con=engine, index = False, if_exists = "replace")
    
    #Updating existing products(SCD1 overwrite)
    
    update_query_products = text('''update products
    inner join products_stg on products.product_id = products_stg.product_id
    set products.price = products_stg.price, products.name = products_stg.name,
    products.category = products_stg.category''')
    
    #Inserting if there is entry of new product
    insert_query_products = text('''insert into products (product_id, name, category, price)
    select * from products_stg
    where product_id not in (select product_id from products)''')
    
    #Updating existing customers(SCD1 overwrite)
    
    update_query_customers = text('''update customers
    inner join customers_stg on customers_stg.customer_id = customers.customer_id
    set customers.email = customers_stg.email, customers.name = customers_stg.name,
    customers.address = customers_stg.address''')
    
    # Inserting id there exist entry of new customer
    insert_query_customers = text('''insert into customers (customer_id,name,email, address)
    select * from customers_stg
    where customer_id not in (select customer_id from customers)''')
    
    
    #executing and commiting the update and insert queries
    with engine.connect() as conn:
        pu = conn.execute(update_query_products)
        pi = conn.execute(insert_query_products)
        cu = conn.execute(update_query_customers)
        ci = conn.execute(insert_query_customers)
        conn.commit()
    
    #fact upload
    df_mysql_products = pd.read_sql_query(sql = text("select * from products"),con = engine.connect())
    df_mysql_customers = pd.read_sql_query(sql =text("select * from customers"), con = engine.connect())
    df_orders_products = pd.merge(left = df_orders,right = df_mysql_products[["product_id","product_sk"]], on="product_id",how = "inner")
    df_orders_products_customers = pd.merge(left = df_orders_products, right = df_mysql_customers[["customer_id","customer_sk"]],
                                           on = "customer_id", how = "inner")
    columns = ['order_id','order_date','total_amount','customer_sk','product_sk', 'quantity']
    df_order_final = df_orders_products_customers[columns]
    
    df_order_final.to_sql(name = "order_details",con = engine, index = False, if_exists = "append")
    
    
def main():
    engine = create_engine("mysql+pymysql://root:Abhi%401996@127.0.0.1:3306/my_cart")
    data = extract()
    df_orders, df_products_dim, df_customers_dim =transform(data)
    load(df_orders,df_products_dim,df_customers_dim, engine)


main()
    