In [1]:
# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

# Python standard library imports
import random
import string
import hashlib
import os

from pyspark.sql.window import Window

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# Calculate memory and core allocation
total_memory = 16 * 1024  # Convert GB to MB
executor_memory = total_memory // 2  # Allocate half for executor
driver_memory = total_memory // 4  # Allocate a quarter for driver
num_cores = 16  # Logical processors available

# Configure Spark Session
spark = SparkSession.builder \
    .appName("ParquetFileProcessing") \
    .master("local[16]") \
    .config("spark.executor.memory", f"{executor_memory}m") \
    .config("spark.driver.memory", f"{driver_memory}m") \
    .config("spark.sql.shuffle.partitions", num_cores * 2) \
    .config("spark.default.parallelism", num_cores * 2) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Print Spark session details
print(spark.sparkContext.getConf().getAll())


[('spark.driver.memory', '4096m'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.driver.port', '59207'), ('spark.executor.memory', '8192m'), ('s

### Reading Parquet Files


In [3]:
sales = spark.read.parquet("sales.parquet")
products = spark.read.parquet("products.parquet")
sellers = spark.read.parquet("sellers.parquet")

### Columns in the Tables


In [4]:
sales.columns

['order_id',
 'product_id',
 'seller_id',
 'date',
 'num_pieces_sold',
 'bill_raw_text']

In [5]:
products.columns

['product_id', 'product_name', 'price']

In [6]:
products.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows



In [7]:
sellers.columns

['seller_id', 'seller_name', 'daily_target']

In [8]:
sellers.show()

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|     1375559|
|        2|   seller_2|      205349|
|        3|   seller_3|       71546|
|        4|   seller_4|     1315668|
|        5|   seller_5|      627802|
|        6|   seller_6|     1997104|
|        7|   seller_7|      593329|
|        8|   seller_8|       24388|
|        9|   seller_9|      348255|
+---------+-----------+------------+



In [9]:
sales.columns

['order_id',
 'product_id',
 'seller_id',
 'date',
 'num_pieces_sold',
 'bill_raw_text']

In [10]:
sales.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-03|             98|frlnwjcoaxsaubnat...|
|       2|         0|        0|2020-07-07|             23|zsnrbwrlflvqqmbcz...|
|       3|         0|        0|2020-07-02|             79|gmxnirkafafnohboh...|
|       4|         0|        0|2020-07-07|              5|xrgknaskXkfcxcnzj...|
|       5|         0|        0|2020-07-10|             79|tzkqoynsqnfomkpbt...|
|       6|         0|        0|2020-07-05|             87|qoluiczrckaygkzbi...|
|       7|         0|        0|2020-07-08|             14|ivwpwrpuhrjgjdauj...|
|       8|         0|        0|2020-07-02|             64|hoalxshwHpqgyvqtm...|
|       9|         0|        0|2020-07-02|             45|vysrvsdfvekabcmwo...|
|      10|         0|        0|2020-07-0

### ERD

![ERD]('./Pyspark_exercise.png')


### Solving Questions


In [11]:
# 1. Find out how many orders, how many products and how many sellers are in the data.
num_orders = sales.select("order_id").distinct().count()
num_products = products.select("product_id").distinct().count()
num_sellers = sellers.select("seller_id").distinct().count()

print(f"Number of orders: {num_orders}")
print(f"Number of products: {num_products}")
print(f"Number of sellers: {num_sellers}")

Number of orders: 20000040
Number of products: 75000000
Number of sellers: 10


In [12]:
# 2. How many products have been sold at least once? Which product is included in the most orders?

# Checking Products that are sold at least once
products_sold = sales.select("product_id").distinct().count()
print(f"Number of products sold at least once: {products_sold}")

# Product included in the most orders
most_ordered_product = sales.groupBy("product_id").count().orderBy(col("count").desc()).first()
# most_ordered_product.show()
print(f"Most ordered product ID: {most_ordered_product['product_id']} with {most_ordered_product['count']} orders")

Number of products sold at least once: 993299
Most ordered product ID: 0 with 19000000 orders


In [13]:
# 3. How many distinct products have been sold on each day?
products_per_day = sales.groupBy("date").agg(countDistinct("product_id").alias("distinct_products_sold"))
products_per_day.show()

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-06|                 99869|
|2020-07-07|                 99453|
|2020-07-03|                100224|
|2020-07-04|                100294|
|2020-07-08|                100048|
|2020-07-01|                 99755|
|2020-07-05|                 99991|
|2020-07-02|                 99768|
|2020-07-10|                100218|
|2020-07-09|                 99801|
+----------+----------------------+



In [14]:
# 4. What is the average revenue of the orders?
sales_with_prices = sales.join(products, "product_id")
average_revenue = sales_with_prices.withColumn("revenue", col("num_pieces_sold") * col("price")).select(avg("revenue").alias("average_revenue")).first()

print(f"Average revenue of orders: {average_revenue['average_revenue']}")


Average revenue of orders: 1245.9236386027228


In [15]:
# 5. For each seller, what is the average % contribution of an order to the seller's daily quota?
seller_contribution = sales.join(sellers, "seller_id") \
    .withColumn("contribution", (col("num_pieces_sold") / col("daily_target")) * 100) \
    .groupBy("seller_id").agg(avg("contribution").alias("average_contribution"))

seller_contribution.show()

+---------+--------------------+
|seller_id|average_contribution|
+---------+--------------------+
|        9|0.014492762751895831|
|        3| 0.07060842894390354|
|        6|0.002534518215186...|
|        7|0.008510553537463742|
|        8| 0.20716465462088773|
|        5|0.008038980497175765|
|        4|0.003845384604576...|
|        2|0.024567219459515087|
|        0|0.002019736225265152|
|        1|0.003670188787904571|
+---------+--------------------+



In [16]:
# checking distinct products names
products.select("product_name").distinct().show()

+------------+
|product_name|
+------------+
|  product_43|
|  product_83|
|  product_97|
| product_179|
| product_224|
| product_240|
| product_263|
| product_287|
| product_327|
| product_329|
| product_348|
| product_368|
| product_378|
| product_382|
| product_396|
| product_446|
| product_450|
| product_519|
| product_526|
| product_533|
+------------+
only showing top 20 rows



In [17]:
# 6. Who are the second most selling and least selling sellers for each product?
seller_sales = (
    sales.groupBy("product_id", "seller_id")
    .count()
    .withColumnRenamed("count", "total_orders")
)

# importing window to use rank function
from pyspark.sql.window import Window

window_spec = Window.partitionBy("product_id").orderBy(desc("total_orders"))

ranked_sellers = seller_sales.withColumn("rank", rank().over(window_spec))

# Getting second most selling and least selling sellers for each product
second_most_selling = ranked_sellers.filter(col("rank") == 2)
least_selling = ranked_sellers.groupBy("product_id").agg(min("total_orders").alias("least_orders"))

second_most_selling.show()
least_selling.show()

+----------+---------+------------+----+
|product_id|seller_id|total_orders|rank|
+----------+---------+------------+----+
|  50367901|        6|           1|   2|
|  29394753|        7|           1|   2|
|  50426396|        4|           1|   2|
|  64857790|        9|           1|   2|
+----------+---------+------------+----+

+----------+------------+
|product_id|least_orders|
+----------+------------+
|  24534260|           1|
|  10595877|           1|
|  33224799|           1|
|  26201136|           1|
|   6299736|           1|
|  12413954|           1|
|  25976104|           1|
|  47687973|           1|
|  39897173|           1|
|  21897068|           1|
|   9207121|           1|
|  28969115|           1|
|  58662235|           1|
|  34307460|           1|
|  46418551|           1|
|  17786306|           1|
|  67627268|           1|
|  74189134|           1|
|  65369717|           1|
|  45715030|           1|
+----------+------------+
only showing top 20 rows



In [18]:
sales.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- seller_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: long (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [19]:
from pyspark.sql.functions import regexp_replace

sales_df = sales.withColumn("A_count", 
    length(col("bill_raw_text")) - length(regexp_replace(col("bill_raw_text"), "A", ""))
)

In [20]:
sales_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- seller_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: long (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- A_count: integer (nullable = true)



In [21]:
from pyspark.sql.functions import col, when, udf, length
from pyspark.sql.types import StringType
import hashlib

spark.conf.set("spark.sql.shuffle.partitions", 4)  # Set shuffle partitions

# UDFs
def iterative_md5(text, count):
    for _ in range(count):
        text = hashlib.md5(text.encode()).hexdigest()
    return text

def sha256_hash(text):
    return hashlib.sha256(text.encode()).hexdigest()

md5_udf = udf(lambda text, count: iterative_md5(text, count), StringType())
sha256_udf = udf(sha256_hash, StringType())


sales_df = sales_df.withColumn(
    "hashed_bill",
    when(
        col("order_id") % 2 == 0,
        md5_udf(col("bill_raw_text"), col("A_count"))
    ).otherwise(
        sha256_udf(col("bill_raw_text"))
    )
)

# Repartition and cache
sales_df = sales_df.repartition(4, col("order_id")).cache()

# Trigger computation (action to ensure caching happens)
# sales_df.count()

# Show result
sales_df.select("order_id", "bill_raw_text", "A_count", "hashed_bill").show(truncate=False)

+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
# 8. Finally, check if there are any duplicate on the new column.
duplicates = (
    sales_df.groupBy("hashed_bill").count().filter(col("count") > 1)
)

if duplicates.count() > 0:
    print("There are duplicates in the hashed_bill column.")
    duplicates.show()
else:
    print("No duplicates found in the hashed_bill column.")


No duplicates found in the hashed_bill column.
