In [0]:
tables_df = spark.sql("SHOW TABLES IN msklenq.bronze")
display(tables_df)

for row in tables_df.collect():
    table_name = row['tableName']
    df_name = f"{table_name}_df"
    globals()[df_name] = spark.table(f"msklenq.bronze.{table_name}")

In [0]:
from pyspark.sql.functions import col, sum

missing_values = customers_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in customers_df.columns])
display(missing_values)

# Check for Duplicates
duplicate_count = customers_df.groupBy(customers_df.columns).count().filter("count > 1").count()
print(f"Number of duplicate rows: {duplicate_count}")

# Summary Statistics
summary_stats = customers_df.describe()
display(summary_stats)

# Check for duplicates in the customer_id column
duplicates = customers_df.groupBy("CustomerID").count().filter("count > 1")

# Display the duplicate customer_ids
display(duplicates)

# Optionally, count the number of duplicate customer_ids
duplicate_count = duplicates.count()
print(f"Number of duplicate customer_ids: {duplicate_count}")


In [0]:
%sql
create schema silver

In [0]:
%sql
CREATE TABLE silver.customers (
    CustomerID VARCHAR(50) PRIMARY KEY,
    FirstName VARCHAR(100),
    LastName VARCHAR(100),
    Email VARCHAR(255) NOT NULL,
    PhoneNumber BIGINT,
    DateOfBirth DATE,
    RegistrationDate DATE,
    PreferredPaymentMethodID VARCHAR(50)
)
USING DELTA;

In [0]:
# Append data from customers_df to the silver.customers table
customers_df.write.mode("append").saveAsTable("silver.customers")

In [0]:
%sql
select * from silver.customers;
ALTER TABLE silver.customers SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:

missing_values = orders_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in orders_df.columns])
display(missing_values)

# Check for Duplicates
duplicate_count = orders_df.groupBy(orders_df.columns).count().filter("count > 1").count()
print(f"Number of duplicate rows: {duplicate_count}")

# Summary Statistics
summary_stats = orders_df.describe()
display(summary_stats)

# Check for duplicates in the OrderID column
duplicates = orders_df.groupBy("OrderID").count().filter("count > 1")

# Display the duplicate OrderID
display(duplicates)

# Optionally, count the number of duplicate OrderID
duplicate_count = duplicates.count()
print(f"Number of duplicate OrderID: {duplicate_count}")

In [0]:
# integrity check

# Find CustomerID in orders_df that are not in customers_df using left anti join
non_matching_customer_ids = orders_df.join(customers_df, orders_df.CustomerID == customers_df.CustomerID, "left_anti")

# Display the result
display(non_matching_customer_ids)

In [0]:
%sql

CREATE TABLE silver.orders (
    OrderID VARCHAR(50) PRIMARY KEY,
    CustomerID VARCHAR(50),
    OrderDate TIMESTAMP NOT NULL,
    ShippingDate TIMESTAMP,
    ExpectedDeliveryDate TIMESTAMP,
    ActualDeliveryDate TIMESTAMP,
    ShippingMethodID VARCHAR(50),
    VendorID VARCHAR(50),
    FOREIGN KEY (CustomerID) REFERENCES silver.customers(CustomerID),
    FOREIGN KEY (VendorID) REFERENCES silver.vendors(VendorID),
    FOREIGN KEY (ShippingMethodID) REFERENCES silver.shipping_methods(ShippingMethodID)
)
USING DELTA;


In [0]:
# Append data from orders_df to the silver.orders table
orders_df.write.mode("append").saveAsTable("silver.orders")

In [0]:
%sql

select *  from silver.orders

In [0]:
shipping_methods_df.display()

In [0]:
%sql
CREATE TABLE silver.vendors (
    VendorID VARCHAR(50) PRIMARY KEY,
    VendorName VARCHAR(255)
)
USING DELTA;

CREATE TABLE silver.shipping_methods (
    ShippingMethodID VARCHAR(50) PRIMARY KEY,
    MethodName VARCHAR(255),
    Cost_Rs int
)
USING DELTA;

In [0]:

# Append data from vendors_df to the silver.vendor table
vendors_df.write.mode("append").saveAsTable("silver.vendors")

# Append data from silver.shipping_methods to the silver.shipping_methods table
shipping_methods_df.write.mode("append").saveAsTable("silver.shipping_methods")

In [0]:
products_df.display()
payment_methods_df.display()

In [0]:
from pyspark.sql.functions import col, sum
missing_values = products_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in products_df.columns])
display(missing_values)

# Check for Duplicates
duplicate_count = products_df.groupBy(products_df.columns).count().filter("count > 1").count()
print(f"Number of duplicate rows: {duplicate_count}")

# Summary Statistics
summary_stats = products_df.describe()
display(summary_stats)


In [0]:
# Get data types of all columns of products_df
data_types = products_df.dtypes
display(data_types)

In [0]:
%sql

CREATE TABLE silver.payment_methods (
    PaymentMethodID VARCHAR(50) PRIMARY KEY,
    MethodName VARCHAR(255)
)
USING DELTA;



In [0]:
%sql
drop table silver.products;
CREATE TABLE silver.products (
    Product_ID VARCHAR(50) PRIMARY KEY,
    Product_Name VARCHAR(255) NOT NULL,
    Product_Category VARCHAR(100),
    Product_Sub_Category VARCHAR(100),
    Product_Rating DOUBLE,
    Number_of_product_ratings DOUBLE,
    Discounted_Price int,
    Actual_Price int
);

In [0]:
products_df.display()

In [0]:
# Remove currency symbols and commas
df_cleaned = products_df.withColumn("Actual_Price_Cleaned", regexp_replace(col("Actual_Price"), "[₹]", "")) \
               .withColumn("Actual_Price_Cleaned", regexp_replace(col("Actual_Price_Cleaned"), ",", ""))
df_cleaned.display()


In [0]:
from pyspark.sql.functions import regexp_replace, col

# Cast columns to appropriate data types
products_df = products_df.withColumn("Product_Rating", products_df["Product_Rating"].cast("double"))
products_df = products_df.withColumn("Number_of_product_ratings", products_df["Number_of_product_ratings"].cast("double"))
products_df = products_df.withColumn("Actual_Price", regexp_replace(col("Actual_Price"), "[₹]", "")) \
               .withColumn("Actual_Price", regexp_replace(col("Actual_Price"), ",", "").cast("int"))
products_df = products_df.withColumn("Discounted_Price", regexp_replace(col("Discounted_Price"), "[₹]", "")) \
               .withColumn("Discounted_Price", regexp_replace(col("Discounted_Price"), ",", "").cast("int"))
products_df.display()


In [0]:
# Append data to the silver.products table with schema merging
products_df.write.mode("append").option("mergeSchema", "true").saveAsTable("silver.products")

In [0]:
%sql

select * from silver.products

In [0]:
from pyspark.sql.functions import regexp_replace, col

# Remove the $ symbol and convert the column to integer
products_df = products_df.withColumn("Actual_Price", regexp_replace(col("Actual_Price"), "[₹]", "").cast("int"))

display(products_df)

In [0]:

# Append data from payment_methods_df to the silver.payment_methods table
payment_methods_df.write.mode("append").saveAsTable("silver.payment_methods")

In [0]:
%sql
select * from silver.products

In [0]:
%sql
CREATE TABLE silver.order_items (
    OrderItemID VARCHAR(50) PRIMARY KEY,
    OrderID VARCHAR(50),
    ProductID VARCHAR(50),
    Quantity INT,
    FOREIGN KEY (OrderID) REFERENCES silver.orders(OrderID),
    FOREIGN KEY (ProductID) REFERENCES silver.products(Product_ID)
)
USING DELTA;

CREATE TABLE silver.payments (
    PaymentID VARCHAR(50) PRIMARY KEY,
    OrderID VARCHAR(50),
    PaymentDate TIMESTAMP,
    GiftCardUsage VARCHAR(30),
    GiftCardAmount DOUBLE,
    CouponUsage VARCHAR(30),
    CouponAmount DOUBLE,
    PaymentMethodID VARCHAR(50),
    FOREIGN KEY (OrderID) REFERENCES silver.orders(OrderID)
)
USING DELTA;

CREATE TABLE silver.returns (
    OrderID VARCHAR(50),
    Return_reason VARCHAR(500),
    FOREIGN KEY (OrderID) REFERENCES silver.orders(OrderID)
)
USING DELTA;

CREATE TABLE silver.addresses (
    AddressID VARCHAR(255) PRIMARY KEY,
    CustomerID VARCHAR(255),
    AddressLine1 VARCHAR(500),
    City VARCHAR(255),
    State VARCHAR(255),
    PinCode FLOAT,
    AddressType VARCHAR(255),
    FOREIGN KEY (CustomerID) REFERENCES silver.customers(CustomerID)
)
USING DELTA;

In [0]:
payments_df.write.mode("append").saveAsTable("silver.payments")
returns_df.write.mode("append").saveAsTable("silver.returns")
order_items_df.write.mode("append").saveAsTable("silver.order_items")

In [0]:
# Convert 'PinCode' field to the correct data type if needed
addresses_df = addresses_df.withColumn("PinCode", col("PinCode").cast("float"))

# Save the DataFrame as a table
addresses_df.write.mode("append").option("mergeSchema", "true").saveAsTable("silver.addresses")

In [0]:
%sql
select * from silver.addresses