In [1]:
import pyspark
print(pyspark.__version__)


4.0.1


In [4]:
# robust_load_parquet.py  (paste & run)
from pathlib import Path
from pyspark.sql import SparkSession
import os, sys

# --- restart Spark cleanly if already running ---
try:
    spark.stop()
except Exception:
    pass

spark = SparkSession.builder \
    .appName("RobustLoadParquet") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

base = Path(r"C:\Users\Public\DG ASSIGNMENTS\PYSPARK")   # <-- change only if different

def gather_parquet_files(folder: Path):
    files = sorted([f for f in folder.glob("**/*") if f.is_file()])
    # prefer explicit .parquet names
    explicit = [str(f) for f in files if f.name.lower().endswith(".parquet")]
    if explicit:
        return explicit
    # fallback: detect real parquet by magic bytes (safe)
    real = []
    for f in files:
        try:
            with open(f, "rb") as fh:
                if fh.read(4) == b"PAR1":
                    real.append(str(f))
        except Exception:
            pass
    return real

def read_files_in_batches(spark, file_list, batch_size=40):
    if not file_list:
        return None
    batches = [file_list[i:i+batch_size] for i in range(0, len(file_list), batch_size)]
    df_main = None
    for i, batch in enumerate(batches, start=1):
        print(f"Reading batch {i}/{len(batches)} ({len(batch)} files)...")
        # read explicit paths for this batch
        df_batch = spark.read.parquet(*batch)
        if df_main is None:
            df_main = df_batch
        else:
            df_main = df_main.unionByName(df_batch, allowMissingColumns=True)
        # free memory hints (optional)
        df_main = df_main.repartition(8)
    return df_main

# gather files
products_files = gather_parquet_files(base / "products_parquet")
sales_files    = gather_parquet_files(base / "sales_parquet")
sellers_files  = gather_parquet_files(base / "sellers_parquet")

print("Found files: products", len(products_files),
      "sales", len(sales_files),
      "sellers", len(sellers_files))

# read in batches (this avoids Hadoop native listing)
products_df = read_files_in_batches(spark, products_files, batch_size=40)
sales_df    = read_files_in_batches(spark, sales_files, batch_size=40)
sellers_df  = read_files_in_batches(spark, sellers_files, batch_size=40)

# quick sanity checks
print("Schemas:")
if products_df is not None:
    products_df.printSchema()
if sales_df is not None:
    sales_df.printSchema()
if sellers_df is not None:
    sellers_df.printSchema()

# print("Counts (this will compute):")
# print("products:", products_df.count() if products_df is not None else 0)
# print("sales:", sales_df.count() if sales_df is not None else 0)
# print("sellers:", sellers_df.count() if sellers_df is not None else 0)

# # Save cleaned parquet for future easy reads
# clean_products = base / "products_parquet_clean"
# clean_sales    = base / "sales_parquet_clean"
# clean_sellers  = base / "sellers_parquet_clean"

# if products_df is not None:
#     products_df.write.mode("overwrite").parquet(str(clean_products))
# if sales_df is not None:
#     sales_df.write.mode("overwrite").parquet(str(clean_sales))
# if sellers_df is not None:
#     sellers_df.write.mode("overwrite").parquet(str(clean_sellers))

# print("Wrote cleaned parquet folders:", clean_products, clean_sales, clean_sellers)
# print("Now you can do: spark.read.parquet('..._clean') to load normally next time.")


Found files: products 17 sales 200 sellers 1
Reading batch 1/1 (17 files)...
Reading batch 1/5 (40 files)...
Reading batch 2/5 (40 files)...
Reading batch 3/5 (40 files)...
Reading batch 4/5 (40 files)...
Reading batch 5/5 (40 files)...
Reading batch 1/1 (1 files)...
Schemas:
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [3]:
from pyspark.sql import functions as F

# show current schemas
print("Schemas (current):")
products_df.printSchema()
sales_df.printSchema()
sellers_df.printSchema()

# Cast common columns (adapt names if needed)
products_df = products_df.withColumn("product_id", F.col("product_id").cast("int")) \
                         .withColumn("price", F.col("price").cast("double"))

sales_df = sales_df.withColumn("order_id", F.col("order_id").cast("long")) \
                   .withColumn("product_id", F.col("product_id").cast("int")) \
                   .withColumn("seller_id", F.col("seller_id").cast("int")) \
                   .withColumn("num_pieces_sold", F.col("num_pieces_sold").cast("int")) \
                   .withColumn("order_date", F.to_date(F.col("date"), "yyyy-MM-dd"))  # change format if needed

sellers_df = sellers_df.withColumn("seller_id", F.col("seller_id").cast("int")) \
                       .withColumn("daily_target", F.col("daily_target").cast("double"))

print("Schemas (after casting):")
products_df.printSchema()
sales_df.printSchema()
sellers_df.printSchema()


Schemas (current):
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)

Schemas (after casting):
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: double (nullable = true)

root
 |-- order_id: long (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- order_date: date (nullable = true)

root
 |-

## QUESTION 1


In [5]:
print("total no. of products  =",products_df.count())
print("total no. of sellers  =",sellers_df.count())
print("total no. of sales  =",sales_df.count())

total no. of products  = 75000000
total no. of sellers  = 10
total no. of sales  = 20000040


## Question 2

In [3]:
from pyspark.sql import functions as F

# products sold at least once
products_sold = sales_df.select("product_id").distinct().count()
print("Products sold at least once:", products_sold)

# product appearing in the most distinct orders
product_order_counts = (sales_df
    .select("product_id", "order_id")
    .distinct() 
    .groupBy("product_id")
    .agg(F.countDistinct("order_id").alias("num_orders"))
    .orderBy(F.desc("num_orders")))

product_order_counts.show(10, truncate=False)

# top product
top_product = product_order_counts.limit(1).collect()
print("Top product (product_id, num_orders):", top_product)


Products sold at least once: 993429
+----------+----------+
|product_id|num_orders|
+----------+----------+
|0         |19000000  |
|56011040  |3         |
|72017876  |3         |
|17944574  |3         |
|35669461  |3         |
|31136332  |3         |
|19978383  |3         |
|8916663   |3         |
|3534470   |3         |
|34681047  |3         |
+----------+----------+
only showing top 10 rows
Top product (product_id, num_orders): [Row(product_id='0', num_orders=19000000)]


## Question 3

In [7]:
products_df.printSchema()
sales_df.printSchema()
sellers_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: double (nullable = true)

root
 |-- order_id: long (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- order_date: date (nullable = true)

root
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: double (nullable = true)



In [10]:
# join sales with product prices
sales_with_price = (sales_df
    .withColumn("num_pieces", F.col("num_pieces_sold").cast("long"))
    .join(products_df.select("product_id", "price"), on="product_id", how="left"))
    # .withColumn("line_revenue", F.col("price_num") * F.col("num_pieces"))
sales_with_price=sales_with_price.withColumn("line_revenue", F.col("price") * F.col("num_pieces"))
# order revenue of each order
order_revenue = (sales_with_price
    .groupBy("order_id")
    .agg(F.sum("line_revenue").alias("order_revenue")))

# average revenue (mean of order_revenue)
avg_revenue = order_revenue.agg(F.mean("order_revenue").alias("avg_order_revenue")).collect()[0]["avg_order_revenue"]
print("Average order revenue:", avg_revenue)

Average order revenue: 1246.1338560822878


## Question 4


In [11]:
order_with_revenue=sales_with_price.groupBy("order_id","seller_id")\
                                            .agg(F.sum("line_revenue").alias("order_seller_revenue"))
order_with_target = (order_with_revenue
                     .join(sellers_df.select("seller_id","daily_target"), on="seller_id", how="left"))


In [None]:
order_seller_with_pct = order_with_target.withColumn(
    "pct_of_daily_quota",
    F.when(F.col("daily_target").isNull() | (F.col("daily_target") == 0), F.lit(None))
     .otherwise((F.col("order_seller_revenue") / F.col("daily_target")) * 100)
)

seller_avg_pct = (order_seller_with_pct
    .groupBy("seller_id")
    .agg(
        F.mean("pct_of_daily_quota").alias("avg_pct_of_daily_quota"),
        F.count("order_id").alias("num_orders_considered")
    )
    .orderBy(F.desc("avg_pct_of_daily_quota"))
)

seller_avg_pct.show(200, truncate=False)

+---------+----------------------+---------------------+
|seller_id|avg_pct_of_daily_quota|num_orders_considered|
+---------+----------------------+---------------------+
|1        |1.4844178645806254    |110805               |
|3        |1.2318678193054775    |111328               |
|8        |0.6946060998563965    |110882               |
|2        |0.5064829818617164    |111233               |
|6        |0.36093852517481845   |111318               |
|5        |0.3170589299015159    |110874               |
|9        |0.2905299815673644    |111392               |
|4        |0.24841173704802363   |111168               |
|7        |0.19601246306317588   |111040               |
|0        |0.044437489776786195  |19000000             |
+---------+----------------------+---------------------+




## Question 5

In [5]:
print(products_df.columns,'\n')
print(sales_df.columns,'\n')
sellers_df.columns

['product_id', 'product_name', 'price'] 

['order_id', 'product_id', 'seller_id', 'date', 'num_pieces_sold', 'bill_raw_text'] 



['seller_id', 'seller_name', 'daily_target']

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# total pieces sold per product-seller
prod_seller_agg = (sales_df
    .withColumn("num_pieces", F.col("num_pieces_sold").cast("long"))
    .groupBy("product_id", "seller_id")
    .agg(F.sum("num_pieces").alias("total_pieces_sold"))
)

# rank sellers per product by descending total_pieces_sold
w_desc = Window.partitionBy("product_id").orderBy(F.desc("total_pieces_sold"), F.asc("seller_id"))
prod_seller_ranked = prod_seller_agg.withColumn("rank_desc", F.row_number().over(w_desc))

second_most_per_product = prod_seller_ranked.filter(F.col("rank_desc") == 2).select(
    "product_id", F.col("seller_id").alias("second_seller_id"), "total_pieces_sold"
)

# least selling: rank ascending
w_asc = Window.partitionBy("product_id").orderBy(F.asc("total_pieces_sold"), F.asc("seller_id"))
prod_seller_ranked_asc = prod_seller_agg.withColumn("rank_asc", F.row_number().over(w_asc))

least_seller_per_product = prod_seller_ranked_asc.filter(F.col("rank_asc") == 1).select(
    "product_id", F.col("seller_id").alias("least_seller_id"), "total_pieces_sold"
)

# show samples
print("Second-most selling sellers (sample):")
second_most_per_product.show(50, truncate=False)

print("Least-selling sellers (sample):")
least_seller_per_product.show(50, truncate=False)




Second-most selling sellers (sample):
+----------+----------------+-----------------+
|product_id|second_seller_id|total_pieces_sold|
+----------+----------------+-----------------+
|1015908   |8               |86               |
|10196252  |7               |41               |
|10208871  |7               |39               |
|10220712  |3               |30               |
|10291636  |3               |25               |
|10646888  |8               |38               |
|10836024  |7               |65               |
|10966459  |2               |9                |
|1101760   |7               |68               |
|1106584   |7               |66               |
|11246901  |6               |30               |
|1126740   |8               |10               |
|11364987  |6               |8                |
|11387857  |4               |15               |
|11390316  |2               |6                |
|11402318  |8               |45               |
|11414588  |4               |78               |
|1

## Question 6

In [9]:
print("Second-most seller for product 0:")
second_most_per_product.filter(F.col("product_id") == 0).show(truncate=False)

print("Least-selling seller for product 0:")
least_seller_per_product.filter(F.col("product_id") ==0).show(truncate=False)

Second-most seller for product 0:
+----------+----------------+-----------------+
|product_id|second_seller_id|total_pieces_sold|
+----------+----------------+-----------------+
+----------+----------------+-----------------+

Least-selling seller for product 0:
+----------+---------------+-----------------+
|product_id|least_seller_id|total_pieces_sold|
+----------+---------------+-----------------+
|0         |0              |959445802        |
+----------+---------------+-----------------+



## Question 7

In [5]:
import hashlib
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col

def compute_hashed_bill(order_id_val, bill_text):
    # handle null bill_text
    if bill_text is None:
        return None

    # determine even/odd safely
    try:
        oid = int(order_id_val)
        is_even = (oid % 2 == 0)
    except Exception:
        is_even = False  # if the order id is null it will throw error and is_even will be treated as odd

    if is_even:
        # count capital 'A'
        count_A = bill_text.count('A')
        result = bill_text
        # apply MD5 iteratively count_A times
        for _ in range(count_A):
            result = hashlib.md5(result.encode('utf-8')).hexdigest()
        return result
    else:
        # odd -> sha256 once
        return hashlib.sha256(bill_text.encode('utf-8')).hexdigest()

compute_hashed_bill_udf = udf(compute_hashed_bill, StringType())

# create the new DataFrame with hashed_bill
sales_hashed = sales_df.withColumn("hashed_bill", compute_hashed_bill_udf(col("order_id"), col("bill_raw_text")))

# show sample
# sales_hashed.select("order_id", "bill_raw_text", "hashed_bill").show(20, truncate=80)


In [None]:
sales_hashed.select("order_id", "bill_raw_text", "hashed_bill").show(5, truncate=80)