In [1]:
import pyspark
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

spark=SparkSession.builder.appName('Apple DataSet').getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Products 

In [2]:
products_schema=StructType([StructField('Product_ID',StringType(),True),
                           StructField('Product_Name',StringType(),True),
                            StructField('Category_ID',StringType(),True),
                             StructField('Launch_Date',StringType(),True),
                             StructField('Price',IntegerType(),True)
                            
                           ])
products=spark.read.csv('datasets/products.csv',schema=products_schema,header=True)
products = products.withColumn("launch_date", to_date("launch_date","dd-MM-yyyy"))

products.show()

+----------+--------------------+-----------+-----------+-----+
|Product_ID|        Product_Name|Category_ID|launch_date|Price|
+----------+--------------------+-----------+-----------+-----+
|       P-1|             MacBook|      CAT-1| 2022-07-25|  717|
|       P-2|    MacBook Air (M1)|      CAT-1| 2022-02-24|  618|
|       P-3|    MacBook Air (M2)|      CAT-1| 2022-04-06|  644|
|       P-4| MacBook Pro 13-inch|      CAT-1| 2025-08-01| 1444|
|       P-5| MacBook Pro 14-inch|      CAT-1| 2023-05-07|  906|
|       P-6|   AirPods (2nd Gen)|      CAT-2| 2024-09-09| 1501|
|       P-7|         AirPods Pro|      CAT-2| 2020-06-08| 1705|
|       P-8|         AirPods Max|      CAT-2| 2023-09-14| 1229|
|       P-9|             iPad 10|      CAT-3| 2024-11-18|  205|
|      P-10|            iPad Air|      CAT-3| 2024-05-17| 1583|
|      P-11|            iPad Pro|      CAT-3| 2021-09-12|  991|
|      P-12|           iPhone 14|      CAT-4| 2020-04-30| 1275|
|      P-13|           iPhone 13|      C

In [3]:
# checking for missing values
products.select([sum(col(c).isNull().cast('int')).alias(c) for c in products.columns]).show()

+----------+------------+-----------+-----------+-----+
|Product_ID|Product_Name|Category_ID|launch_date|Price|
+----------+------------+-----------+-----------+-----+
|         0|           0|          0|          0|    0|
+----------+------------+-----------+-----------+-----+



In [4]:
# checking for duplicates
products.groupby(['Product_ID','Product_Name','Category_ID']).agg(count('Product_ID').alias('count_of_Product_ID')).filter('count_of_Product_ID>1').show()

+----------+------------+-----------+-------------------+
|Product_ID|Product_Name|Category_ID|count_of_Product_ID|
+----------+------------+-----------+-------------------+
+----------+------------+-----------+-------------------+



#  category

In [5]:
category_schema=StructType([StructField('category_id',StringType(),True),
                           StructField('category_name',StringType(),True)])
category=spark.read.csv('datasets/category.csv',schema=category_schema,header=True)
category.show()

+-----------+--------------------+
|category_id|       category_name|
+-----------+--------------------+
|      CAT-1|              Laptop|
|      CAT-2|               Audio|
|      CAT-3|              Tablet|
|      CAT-4|          Smartphone|
|      CAT-5|            Wearable|
|      CAT-6|    Streaming Device|
|      CAT-7|             Desktop|
|      CAT-8|Subscription Service|
|      CAT-9|       Smart Speaker|
|     CAT-10|         Accessories|
+-----------+--------------------+



# sales

In [6]:
sales=spark.read.csv('datasets/sales.csv',inferSchema=True,header=True)
sales.printSchema()

root
 |-- Sales_Id: string (nullable = true)
 |-- Sale_Date: string (nullable = true)
 |-- Store_Id: string (nullable = true)
 |-- Product_Id: string (nullable = true)
 |-- Quantity: integer (nullable = true)



In [7]:
sales_schema = StructType([
    StructField('sale_id', StringType(), True),
    StructField('sale_date', StringType(), True), 
    StructField('store_id', StringType(), True),
    StructField('product_id', StringType(), True),
    StructField('quantity', IntegerType(), True)
])
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
sales = spark.read.csv('datasets/sales.csv', schema=sales_schema, header=True)
sales = sales.withColumn(
    "sale_date",
    coalesce(
        to_date("sale_date", "dd-MM-yyyy"),
        to_date("sale_date", "yyyy-MM-dd"),
        to_date("sale_date", "MM-dd-yyyy"),
        to_date("sale_date", "dd/MM/yyyy")
    )
)
sales.show()

+---------+----------+--------+----------+--------+
|  sale_id| sale_date|store_id|product_id|quantity|
+---------+----------+--------+----------+--------+
| EY-76191|2022-11-18|   ST-32|      P-20|       1|
| DB-53244|2023-01-26|   ST-16|       P-6|       6|
|BA-396172|2024-12-31|   ST-54|      P-27|       7|
|  WD-2661|2023-05-13|   ST-45|       P-6|       2|
|  CO-8657|2022-12-14|    ST-3|      P-27|       5|
|RG-694960|2023-07-15|   ST-15|       P-8|       3|
|  OS-5971|2024-12-07|   ST-70|      P-24|       3|
|GI-100174|2023-08-24|   ST-57|      P-13|       2|
|  ER-3186|2022-12-29|   ST-70|       P-5|       1|
|RX-243117|2024-10-23|   ST-19|       P-7|       4|
| IJ-94363|2024-04-06|   ST-29|      P-16|       8|
| VV-78964|2025-01-26|   ST-25|      P-28|       1|
|  FA-1333|2024-11-14|   ST-11|       P-6|       3|
| IT-98790|2023-07-23|   ST-20|      P-10|       5|
| YX-05485|2023-05-26|   ST-27|      P-17|       5|
| JH-40146|2024-04-03|   ST-22|       P-6|       8|
|  ES-6915|2

In [8]:
# Checking Missing Data in sales data
sales.select([sum(col(c).isNull().cast('int')).alias(c) for c in sales.columns]).show()

+-------+---------+--------+----------+--------+
|sale_id|sale_date|store_id|product_id|quantity|
+-------+---------+--------+----------+--------+
|      0|        0|       0|         0|       0|
+-------+---------+--------+----------+--------+



In [9]:
# checking for duplicates
sales.groupby('sale_id').agg(count('sale_id').alias('count_of_salesid')).filter('count_of_salesid>1').show()

+-------+----------------+
|sale_id|count_of_salesid|
+-------+----------------+
+-------+----------------+



# stores

In [10]:
store_schema = StructType([
    StructField('Store_ID',   StringType(), True),
    StructField('Store_Name', StringType(), True),
    StructField('City',       StringType(), True),
    StructField('Country',    StringType(), True)
])
stores = spark.read.csv('datasets/stores.csv', schema=store_schema, header=True)
stores.show()

+--------+--------------------+-------------+-------------+
|Store_ID|          Store_Name|         City|      Country|
+--------+--------------------+-------------+-------------+
|    ST-1|  Apple Fifth Avenue|     New York|United States|
|    ST-2|  Apple Union Square|San Francisco|United States|
|    ST-3|Apple Michigan Av...|      Chicago|United States|
|    ST-4|     Apple The Grove|  Los Angeles|United States|
|    ST-5|          Apple SoHo|     New York|United States|
|    ST-6|Apple Michigan Av...|      Chicago|United States|
|    ST-7|     Apple The Grove|  Los Angeles|United States|
|    ST-8|     Apple The Grove|  Los Angeles|United States|
|    ST-9|  Apple Union Square|San Francisco|United States|
|   ST-10|     Apple The Grove|  Los Angeles|United States|
|   ST-11|     Apple The Grove|  Los Angeles|United States|
|   ST-12|          Apple SoHo|     New York|United States|
|   ST-13|     Apple The Grove|  Los Angeles|United States|
|   ST-14|     Apple The Grove|  Los Ang

In [11]:
#cheking missing values in stores dataset
stores.select([sum(col(c).isNull().cast('int')).alias(c) for c in stores.columns]).show()

+--------+----------+----+-------+
|Store_ID|Store_Name|City|Country|
+--------+----------+----+-------+
|       0|         0|   0|      0|
+--------+----------+----+-------+



In [12]:
# checking for duplicates
stores.groupby(['Store_ID','Store_Name','City','Country']).agg(count('Store_id').alias('total_countof_storeid')).filter('total_countof_storeid>1').show()

+--------+----------+----+-------+---------------------+
|Store_ID|Store_Name|City|Country|total_countof_storeid|
+--------+----------+----+-------+---------------------+
+--------+----------+----+-------+---------------------+



# warranty

In [13]:
store_schema = StructType([
    StructField("claim_id", StringType(), True),
    StructField("claim_date", StringType(), True),
    StructField("sale_id", StringType(), True),
    StructField("repair_status", StringType(), True)
])
warranty=spark.read.csv('datasets/warranty.csv',schema=store_schema,header=True)
warranty = warranty.withColumn("claim_date", to_date("claim_date","dd-MM-yyyy"))
warranty.show()


+--------+----------+---------+-------------+
|claim_id|claim_date|  sale_id|repair_status|
+--------+----------+---------+-------------+
| CL_0001|2025-04-17| DR-49575|     Rejected|
| CL_0002|2025-01-15|FC-315363|      Pending|
| CL_0003|2025-04-03|NG-493213|    Completed|
| CL_0004|2025-02-05|  UL-5951|     Rejected|
| CL_0005|2024-10-02| DT-18684|  In Progress|
| CL_0006|2024-10-10| GE-35978|      Pending|
| CL_0007|2025-06-15|TT-433171|      Pending|
| CL_0008|2025-04-06| SE-11826|     Rejected|
| CL_0009|2024-11-10|JJ-057520|  In Progress|
| CL_0010|2025-06-05|SL-810669|     Rejected|
| CL_0011|2024-10-11| YO-23292|    Completed|
| CL_0012|2025-03-19| TG-72168|  In Progress|
| CL_0013|2025-07-14|  LF-0856|     Rejected|
| CL_0014|2025-03-17| MV-74903|     Rejected|
| CL_0015|2025-01-11| CF-66125|     Rejected|
| CL_0016|2024-10-18| MU-96897|      Pending|
| CL_0017|2025-01-02|  LY-3410|  In Progress|
| CL_0018|2025-07-23|  US-0045|      Pending|
| CL_0019|2024-10-19|  MJ-3685|  I

In [14]:
# checking Missing Value in warranty dataset
warranty.select([sum(col(c).isNull().cast('int')).alias(c) for c in warranty.columns]).show()

+--------+----------+-------+-------------+
|claim_id|claim_date|sale_id|repair_status|
+--------+----------+-------+-------------+
|       0|         0|      0|            0|
+--------+----------+-------+-------------+



In [15]:
# checking for duplicate in warranty
warranty.groupby(['claim_id','claim_date','sale_id','repair_status']).agg(count('claim_date').alias('no_of_claimid')).filter('no_of_claimid>1').show()

+--------+----------+-------+-------------+-------------+
|claim_id|claim_date|sale_id|repair_status|no_of_claimid|
+--------+----------+-------+-------------+-------------+
+--------+----------+-------+-------------+-------------+



In [16]:
# total no of stores
stores.count()

75

In [17]:
# total no of products
products.count()

29

In [18]:
# total no of products category
category.count()

10

In [19]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("AppleStoreETL")
    .config("spark.jars", r"C:\spark\spark-3.3.2-bin-hadoop3\jars\postgresql-42.7.7.jar")  # ✅ Use raw string
    .getOrCreate()
)



In [20]:
def save_to_postgres(df, table_name, mode="append"):
    (
        df.write
          .format("jdbc")
          .option("url", "jdbc:postgresql://localhost:5432/apple")  # your database
          .option("dbtable", table_name)                             # target table
          .option("user", "postgres")                                # your username
          .option("password", "Admin")                                # your password
          .option("driver", "org.postgresql.Driver")
          .option("batchsize", 5000)                                  # optional tuning
          .mode(mode)                                                 # append or overwrite
          .save()
    )
    print(f"✅ Loaded DataFrame into PostgreSQL table: {table_name}")

# ✅ Store all DataFrames in a dictionary
tables = {
    "products": products,
    "warranty": warranty,
    "sales": sales,
    "stores": stores,
    "category": category
}

# ✅ Loop through and save all tables at once
for name, df in tables.items():
    save_to_postgres(df, name, mode="append")   # use "overwrite" for first time if needed


✅ Loaded DataFrame into PostgreSQL table: products
✅ Loaded DataFrame into PostgreSQL table: warranty
✅ Loaded DataFrame into PostgreSQL table: sales
✅ Loaded DataFrame into PostgreSQL table: stores
✅ Loaded DataFrame into PostgreSQL table: category
