In [18]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("EcommerceProductAnalysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode-master:8020") \
    .getOrCreate()

# Load CSV files into DataFrames
df_shoes = spark.read.csv("hdfs://namenode-master:8020/shoes1.csv", header=True, inferSchema=True)
df_jewelry = spark.read.csv("hdfs://namenode-master:8020/jewelry1.csv", header=True, inferSchema=True)
df_accessories = spark.read.csv("hdfs://namenode-master:8020/accessories1.csv", header=True, inferSchema=True)

# Ensure all DataFrames have the same columns
df_shoes = df_shoes.select(*df_jewelry.columns)  
df_accessories = df_accessories.select(*df_jewelry.columns)

# Combine all DataFrames into one
df_combined = df_shoes.unionAll(df_jewelry).unionAll(df_accessories)

# Show the final DataFrame
df_combined.show(10, truncate=False)
df_combined.printSchema()




                                                                                

+--------+------------------+---------------------------------------------------------------------------------------+-------------+---------+--------+--------+-----------+------+-------+----------------------------------------------+-----------------+-----------------+-----------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------+-------+---------+
|category|subcategory       |name                  

In [19]:
from pyspark.sql import functions as F

# Function to calculate the average price for each category
def avg_price_by_category(df):
    return df.groupBy("category").agg(F.avg("current_price").alias("avg_price_category"))

# Function to calculate the average price for each subcategory
def avg_price_by_subcategory(df):
    return df.groupBy("subcategory").agg(F.avg("current_price").alias("avg_price_subcategory"))

# Function to calculate the average discount for each category
def avg_discount_by_category(df):
    return df.groupBy("category").agg(F.avg("discount").alias("avg_discount_category"))

# Function to calculate the average discount for each subcategory
def avg_discount_by_subcategory(df):
    return df.groupBy("subcategory").agg(F.avg("discount").alias("avg_discount_subcategory"))

# Function to calculate the average likes for each category
def avg_likes_by_category(df):
    return df.groupBy("category").agg(F.avg("likes_count").alias("avg_likes_category"))

# Function to calculate the average likes for each subcategory
def avg_likes_by_subcategory(df):
    return df.groupBy("subcategory").agg(F.avg("likes_count").alias("avg_likes_subcategory"))

# Function to calculate the sum of unused products in each category
def sum_unused_by_category(df):
    # Ensure the "is_new" column is numeric before summing it up
    df = df.withColumn("is_new_numeric", F.col("is_new").cast("int"))
    return df.groupBy("category").agg(F.sum("is_new_numeric").alias("sum_unused_category"))

# Function to calculate the sum of unused products in each subcategory
def sum_unused_by_subcategory(df):
    # Ensure the "is_new" column is numeric before summing it up
    df = df.withColumn("is_new_numeric", F.col("is_new").cast("int"))
    return df.groupBy("subcategory").agg(F.sum("is_new_numeric").alias("sum_unused_subcategory"))

In [20]:
# Assuming df_shoes is your dataframe

output_path = "hdfs://namenode-master:8020/output/"

# 1. Average price by category
df_avg_price_category = avg_price_by_category(df_combined)
df_avg_price_category.show(20)

# 2. Average price by subcategory
df_avg_price_subcategory = avg_price_by_subcategory(df_combined)
df_avg_price_subcategory.show(20)

# 3. Average discount by category
df_avg_discount_category = avg_discount_by_category(df_combined)
df_avg_discount_category.show(20)

# 4. Average discount by subcategory
df_avg_discount_subcategory = avg_discount_by_subcategory(df_combined)
df_avg_discount_subcategory.show(20)

# 5. Average likes by category
df_avg_likes_category = avg_likes_by_category(df_combined)
df_avg_likes_category.show(20)

# 6. Average likes by subcategory
df_avg_likes_subcategory = avg_likes_by_subcategory(df_combined)
df_avg_likes_subcategory.show(20)

# 7. Sum of unused products by category
df_sum_unused_category = sum_unused_by_category(df_combined)
df_sum_unused_category.show(20)

# 8. Sum of unused products by subcategory
df_sum_unused_subcategory = sum_unused_by_subcategory(df_combined)
df_sum_unused_subcategory.show(20)



output_path1 = "hdfs://namenode-master:8020/output_parquet/"

# Save each DataFrame to HDFS as CSV
df_avg_price_category.write.mode("overwrite").csv(output_path + "avg_price_category", header=True)
df_avg_price_subcategory.write.mode("overwrite").csv(output_path + "avg_price_subcategory", header=True)
df_avg_discount_category.write.mode("overwrite").csv(output_path + "avg_discount_category", header=True)
df_avg_discount_subcategory.write.mode("overwrite").csv(output_path + "avg_discount_subcategory", header=True)
df_avg_likes_category.write.mode("overwrite").csv(output_path + "avg_likes_category", header=True)
df_avg_likes_subcategory.write.mode("overwrite").csv(output_path + "avg_likes_subcategory", header=True)
df_sum_unused_category.write.mode("overwrite").csv(output_path + "sum_unused_category", header=True)
df_sum_unused_subcategory.write.mode("overwrite").csv(output_path + "sum_unused_subcategory", header=True)

df_avg_price_category.write.mode("overwrite").parquet(output_path1 + "avg_price_category")
df_avg_price_subcategory.write.mode("overwrite").parquet(output_path1 + "avg_price_subcategory")
df_avg_discount_category.write.mode("overwrite").parquet(output_path1 + "avg_discount_category")
df_avg_discount_subcategory.write.mode("overwrite").parquet(output_path1 + "avg_discount_subcategory")
df_avg_likes_category.write.mode("overwrite").parquet(output_path1 + "avg_likes_category")
df_avg_likes_subcategory.write.mode("overwrite").parquet(output_path1 + "avg_likes_subcategory")
df_sum_unused_category.write.mode("overwrite").parquet(output_path1 + "sum_unused_category")
df_sum_unused_subcategory.write.mode("overwrite").parquet(output_path1 + "sum_unused_subcategory")


                                                                                

+-----------+------------------+
|   category|avg_price_category|
+-----------+------------------+
|      shoes| 38.08437959908639|
|    jewelry|16.452732330517176|
|accessories| 12.15106637307336|
+-----------+------------------+



                                                                                

+--------------------+---------------------+
|         subcategory|avg_price_subcategory|
+--------------------+---------------------+
|  Claquettes & Tongs|   25.934100529100505|
|             Baskets|   38.950834597875605|
|ACCESSOIRES CHAUS...|    9.556521739130435|
|             Slipper|    16.41359281437126|
| Chaussures de ville|    43.64528340080969|
|              Bottes|               21.485|
| Derbies & Mocassins|   32.199906015037676|
|        Plate-formes|                15.87|
|               Pumps|   26.358205128205128|
|Bottes & Chaussur...|    46.25041958041958|
|  Sneakers & Baskets|    32.69600414078672|
|Chaussures de jeu...|                16.95|
|      Flat & Loafers|   24.061111111111106|
|           Escarpins|   46.614247311827846|
|            Sandales|    38.51979856115107|
|           Mocassins|    44.90436288901939|
|          Plateforme|    31.75794871794872|
|    Sandales & Mules|    34.92400000000005|
|           Chaussons|     24.5769512195122|
|   Bottes

                                                                                

+-----------+---------------------+
|   category|avg_discount_category|
+-----------+---------------------+
|      shoes|    53.04076799458682|
|accessories|    50.75762818496383|
|    jewelry|     53.1318771893674|
+-----------+---------------------+



                                                                                

+--------------------+------------------------+
|         subcategory|avg_discount_subcategory|
+--------------------+------------------------+
|  Claquettes & Tongs|       50.46560846560847|
|             Baskets|      55.091805766312596|
|ACCESSOIRES CHAUS...|       51.73913043478261|
|             Slipper|       52.34131736526946|
| Chaussures de ville|      55.388663967611336|
|              Bottes|                    55.5|
| Derbies & Mocassins|      52.845238095238095|
|        Plate-formes|      58.333333333333336|
|               Pumps|       53.56410256410256|
|Bottes & Chaussur...|      56.215384615384615|
|  Sneakers & Baskets|       51.79710144927536|
|      Flat & Loafers|      52.592592592592595|
|           Escarpins|       52.20967741935484|
|            Sandales|      56.257553956834535|
|           Mocassins|      49.517322372284205|
|          Plateforme|      52.243589743589745|
|    Sandales & Mules|      52.473134328358206|
|           Chaussons|       56.94308943

                                                                                

+-----------+------------------+
|   category|avg_likes_category|
+-----------+------------------+
|      shoes| 284.0918548591728|
|    jewelry| 166.1458891407377|
|accessories| 96.49606794589494|
+-----------+------------------+



                                                                                

+--------------------+---------------------+
|         subcategory|avg_likes_subcategory|
+--------------------+---------------------+
|  Claquettes & Tongs|   204.86772486772486|
|             Baskets|    89.13581183611532|
|ACCESSOIRES CHAUS...|    427.5217391304348|
|             Slipper|    30.45508982035928|
| Chaussures de ville|   162.88663967611336|
|              Bottes|                169.0|
| Derbies & Mocassins|    497.7136591478697|
|        Plate-formes|   33.666666666666664|
|               Pumps|     76.8974358974359|
|Bottes & Chaussur...|   175.77902097902097|
|  Sneakers & Baskets|   293.22256728778467|
|      Flat & Loafers|                142.0|
|           Escarpins|    519.8844086021505|
|            Sandales|   104.38561151079136|
|           Mocassins|    197.0093951849677|
|          Plateforme|    249.7948717948718|
|    Sandales & Mules|   381.47985074626865|
|           Chaussons|    103.5650406504065|
|   Bottes & Bottines|   452.96699669966995|
|    CHAUS

                                                                                

+-----------+-------------------+
|   category|sum_unused_category|
+-----------+-------------------+
|      shoes|                271|
|accessories|                107|
|    jewelry|                113|
+-----------+-------------------+



                                                                                

+--------------------+----------------------+
|         subcategory|sum_unused_subcategory|
+--------------------+----------------------+
|  Claquettes & Tongs|                    27|
|             Baskets|                    68|
|ACCESSOIRES CHAUS...|                     2|
|             Slipper|                     2|
| Chaussures de ville|                     5|
|              Bottes|                     0|
| Derbies & Mocassins|                    38|
|        Plate-formes|                     0|
|               Pumps|                     0|
|Bottes & Chaussur...|                    19|
|  Sneakers & Baskets|                    41|
|      Flat & Loafers|                     0|
|           Escarpins|                     2|
|            Sandales|                    10|
|           Mocassins|                    21|
|          Plateforme|                     2|
|    Sandales & Mules|                    15|
|           Chaussons|                     1|
|   Bottes & Bottines|            

                                                                                

In [21]:
df_combined.columns

['category',
 'subcategory',
 'name',
 'current_price',
 'raw_price',
 'currency',
 'discount',
 'likes_count',
 'is_new',
 'brand',
 'brand_url',
 'codCountry',
 'variation_0_color',
 'variation_1_color',
 'variation_0_thumbnail',
 'variation_0_image',
 'variation_1_thumbnail',
 'variation_1_image',
 'image_url',
 'url',
 'id',
 'model']

In [22]:
df_combined = df_combined.withColumn("is_new", df_combined["is_new"].cast("integer"))

In [23]:
from pyspark.sql import functions as F

# 1️⃣ Least Popular Product Type (lowest average likes)
least_popular_product = df_combined.groupBy("category") \
    .agg(F.avg("likes_count").alias("avg_likes")) \
    .orderBy("avg_likes", ascending=True) \
    .limit(1)
least_popular_product.show()

# 2️⃣ Most Popular Product Type (highest average likes)
most_popular_product = df_combined.groupBy("category") \
    .agg(F.avg("likes_count").alias("avg_likes")) \
    .orderBy("avg_likes", ascending=False) \
    .limit(1)
most_popular_product.show()

# 3️⃣ Most Popular Subcategory (highest likes)
most_popular_subcategory = df_combined.groupBy("subcategory") \
    .agg(F.avg("likes_count").alias("avg_likes")) \
    .orderBy("avg_likes", ascending=False) \
    .limit(1)
most_popular_subcategory.show()

# 4️⃣ Least Popular Subcategory (lowest likes)
least_popular_subcategory = df_combined.groupBy("subcategory") \
    .agg(F.avg("likes_count").alias("avg_likes")) \
    .orderBy("avg_likes", ascending=True) \
    .limit(1)
least_popular_subcategory.show()

# 5️⃣ Most Expensive Accessory Subcategory
most_expensive_accessory = df_combined.filter(df_combined["category"] == "accessories") \
    .groupBy("subcategory") \
    .agg(F.avg("current_price").alias("avg_price")) \
    .orderBy("avg_price", ascending=False) \
    .limit(1)
most_expensive_accessory.show()

# 6️⃣ Product Type with the Highest Discount
highest_discount_category = df_combined.groupBy("category") \
    .agg(F.avg("discount").alias("avg_discount")) \
    .orderBy("avg_discount", ascending=False) \
    .limit(1)
highest_discount_category.show()

# 7️⃣ Product Type with the Lowest Discount
lowest_discount_category = df_combined.groupBy("category") \
    .agg(F.avg("discount").alias("avg_discount")) \
    .orderBy("avg_discount", ascending=True) \
    .limit(1)
lowest_discount_category.show()

# 8️⃣ Most Common Men's Product Subcategory
common_mens_subcategory = df_combined.filter(df_combined["category"].contains("shoes")) \
    .groupBy("subcategory") \
    .count() \
    .orderBy("count", ascending=False) \
    .limit(1)
common_mens_subcategory.show()

# 9️⃣ Product Type with the Most Unused Products
most_unused_category = df_combined.groupBy("category") \
    .agg(F.sum("is_new").alias("total_unused")) \
    .orderBy("total_unused", ascending=False) \
    .limit(1)
most_unused_category.show()

# 🔟 Subcategory Product with the Most Unused Products
most_unused_subcategory = df_combined.groupBy("subcategory") \
    .agg(F.sum("is_new").alias("total_unused")) \
    .orderBy("total_unused", ascending=False) \
    .limit(1)
most_unused_subcategory.show()

                                                                                

+-----------+-----------------+
|   category|        avg_likes|
+-----------+-----------------+
|accessories|96.49606794589494|
+-----------+-----------------+

+--------+-----------------+
|category|        avg_likes|
+--------+-----------------+
|   shoes|284.0918548591728|
+--------+-----------------+

+-----------+-----------------+
|subcategory|        avg_likes|
+-----------+-----------------+
|  Escarpins|519.8844086021505|
+-----------+-----------------+

+--------------------+-----------------+
|         subcategory|        avg_likes|
+--------------------+-----------------+
|Sun Protection Sl...|6.054054054054054|
+--------------------+-----------------+



                                                                                

+------------+-----------------+
| subcategory|        avg_price|
+------------+-----------------+
|Military Hat|18.22666666666667|
+------------+-----------------+

+--------+----------------+
|category|    avg_discount|
+--------+----------------+
| jewelry|53.1318771893674|
+--------+----------------+



                                                                                

+-----------+-----------------+
|   category|     avg_discount|
+-----------+-----------------+
|accessories|50.75762818496383|
+-----------+-----------------+



                                                                                

+-----------+-----+
|subcategory|count|
+-----------+-----+
|  Mocassins| 1703|
+-----------+-----+



                                                                                

+--------+------------+
|category|total_unused|
+--------+------------+
|   shoes|         271|
+--------+------------+





+-----------+------------+
|subcategory|total_unused|
+-----------+------------+
|    Baskets|          68|
+-----------+------------+



                                                                                

In [24]:
# Import required functions
from pyspark.sql.functions import col

# Repartition and write to HDFS as Parquet
df_combined.repartition(16).write.mode("overwrite").partitionBy("category").parquet("hdfs://namenode-master:8020/df_combined")

                                                                                

In [None]:
# i tried the bonus 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HDFS to PostgreSQL") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.2.23.jar") \
    .config("spark.driver.extraClassPath", "/opt/spark/jars/postgresql-42.2.23.jar") \
    .getOrCreate()

# Define Schema (Modify according to your Parquet file structure)
schema = StructType([
    StructField("category", StringType(), True),
    StructField("avg_discount_category", DoubleType(), True)
])

# Load Parquet file from HDFS with Schema
df = spark.read.schema(schema).parquet("hdfs://namenode-master:8020/output_parquet/avg_discount_category/part-00000-d154ddcd-e856-4040-86ba-8c334543cdcb-c000.snappy.parquet")

# Print Schema and Preview Data
df.printSchema()
df.show(5)

# Define PostgreSQL connection properties
db_url = "jdbc:postgresql://postgres-db:5432/sparkdb"
db_properties = {
    "user": "user",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

# ✅ FIXED: Corrected the `.jdbc()` call
df.write.mode("overwrite").jdbc(db_url, "output_parquet_table", properties=db_properties)

print("✅ Data successfully written to PostgreSQL!")