# DataGrokr - Spark Assignment
### Intern: Sumit Dhar 
### Date: 27th October, 2025

In [None]:
# Download all the datsets in zip format from:
# https://datagrokranalytics-my.sharepoint.com/:u:/r/personal/naveen_gainedi_datagrokr_co/Documents/Public/Training/Courses/Spark/DatasetToCompleteTheSixSparkExercises.zip?csf=1&web=1&e=oln35S

In [2]:
# Setting up the environment
import os,sys
print(os.environ.get("PYSPARK_PYTHON"))
print(os.environ.get("PYSPARK_DRIVER_PYTHON"))

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print(os.environ.get("PYSPARK_PYTHON"))
print(os.environ.get("PYSPARK_DRIVER_PYTHON"))

None
None
c:\Users\Admin\anaconda3\envs\testing_DG_pyspark\python.exe
c:\Users\Admin\anaconda3\envs\testing_DG_pyspark\python.exe


In [3]:
# Importing all the necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from hashlib import sha256, md5

In [4]:
# Initializing the SparkSession
spark = SparkSession.builder.appName("Spark_Assignment").config("spark.driver.memory", "4g").getOrCreate()

products_file_path = "products_parquet" 
sales_file_path = "sales_parquet" 
sellers_file_path = "sellers_parquet" 

# Reading the Parquet file into respective DataFrames
products = spark.read.parquet(products_file_path)
orders = spark.read.parquet(sales_file_path)
sellers = spark.read.parquet(sellers_file_path)

In [5]:
# Printing the schema for Orders
print("Orders Schema:")
orders.printSchema()

# Casting the datatypes for Orders
orders = orders\
        .withColumn("order_id", col("order_id").cast("int")) \
        .withColumn("product_id", col("product_id").cast("int")) \
        .withColumn("seller_id", col("seller_id").cast("int")) \
        .withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
        .withColumn("num_pieces_sold", col("num_pieces_sold").cast("int"))

print("After casting Orders Schema:")
orders.printSchema()

Orders Schema:
root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

After casting Orders Schema:
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [26]:
orders.show(10)

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

In [5]:
# Printing the schema for Products
print("Products Schema:")
products.printSchema()

# Casting the datatypes for Orders
products = products\
        .withColumn("product_id", col("product_id").cast("int")) \
        .withColumn("price", col("price").cast("float")) \

print("After casting Products Schema:")
products.printSchema()

Products Schema:
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

After casting Products Schema:
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)



In [27]:
products.show(10)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0| 22.0|
|         1|   product_1| 30.0|
|         2|   product_2| 91.0|
|         3|   product_3| 37.0|
|         4|   product_4|145.0|
|         5|   product_5|128.0|
|         6|   product_6| 66.0|
|         7|   product_7|145.0|
|         8|   product_8| 51.0|
|         9|   product_9| 44.0|
+----------+------------+-----+
only showing top 10 rows


In [6]:
# Printing the schema for Sellers
print("Sellers Schema:")
sellers.printSchema()

# Casting the datatypes for Orders
sellers = sellers\
        .withColumn("seller_id", col("seller_id").cast("int")) \
        .withColumn("daily_target", col("daily_target").cast("integer")) \

print("After casting Sellers Schema:")
sellers.printSchema()

Sellers Schema:
root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)

After casting Sellers Schema:
root
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: integer (nullable = true)



In [28]:
sellers.show(10)

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
|        3|   seller_3|      310462|
|        4|   seller_4|     1532808|
|        5|   seller_5|     1199693|
|        6|   seller_6|     1055915|
|        7|   seller_7|     1946998|
|        8|   seller_8|      547320|
|        9|   seller_9|     1318051|
+---------+-----------+------------+



### Find out how many orders, how many products and how many sellers are in the data.

In [7]:
# Counting total no of rows in each df
orders_count = orders.count()
products_count = products.count()
sellers_count = sellers.count()

print(f"Total no. of orders: {orders_count}")
print(f"Total no. of products: {products_count}")
print(f"Total no. of sellers: {sellers_count}")

Total no. of orders: 20000040
Total no. of products: 75000000
Total no. of sellers: 10


### How many products have been sold at least once? 

In [8]:
# Filtering valid orders
valid_orders = orders.filter(col('num_pieces_sold') > 0)

# Selecting distinct products sold
products_sold = valid_orders.select('product_id')\
                    .distinct().count()

print(f"Number of products sold at least once: {products_sold}")

Number of products sold at least once: 993429


### Which is the product contained in more orders?

In [9]:
# Filtering out the most ordered product grouping by product_id
most_ordered_product = valid_orders.groupBy('product_id')\
                        .count()\
                        .orderBy(desc('count'))\
                        .limit(1)

print(f"The most ordered product is: ")
most_ordered_product.show()

The most ordered product is: 
+----------+--------+
|product_id|   count|
+----------+--------+
|         0|19000000|
+----------+--------+



### What is the average revenue of the orders?

In [10]:
# Joining orders df with products df on similar product_id
orders_with_price = valid_orders.join(products, on='product_id', how='inner')

# Printing resultant schema
orders_with_price.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)



In [11]:
# Added new column 'revenue' after calculating - num_pieces_sold * prices
orders_with_revenue = orders_with_price\
                        .withColumn('revenue', col('num_pieces_sold')*col('price'))

# Printing resultant schema
orders_with_revenue.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)
 |-- revenue: double (nullable = true)



In [12]:
# Calculating the average revenue
average_revenue = orders_with_revenue.agg(avg('revenue').alias('average_revenue'))

In [13]:
average_revenue.show()

+------------------+
|   average_revenue|
+------------------+
|1246.1338560822878|
+------------------+



### For each seller, what is the average % contribution of an order to the seller's daily quota?

In [14]:
# Joining orders with sellers on similar seller_id
orders_with_contribution = orders_with_revenue.join(sellers, on='seller_id', how='inner')

# Printing resultant schema
orders_with_contribution.printSchema()

root
 |-- seller_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)
 |-- revenue: double (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: integer (nullable = true)



In [15]:
# Adding new column 'percent_contribution' after calculating - (revenue / daily_target) * 100
orders_with_percent_contribution = orders_with_contribution.withColumn(
    'percent_contribution',
    (col('revenue') / col('daily_target'))*100
)

# Printing resultant schema
orders_with_percent_contribution.printSchema()

root
 |-- seller_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)
 |-- revenue: double (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: integer (nullable = true)
 |-- percent_contribution: double (nullable = true)



In [16]:
# Calculating the average % contribution
avg_percent_contribution = orders_with_percent_contribution\
                            .groupBy('seller_id', 'seller_name')\
                            .agg(avg('percent_contribution').alias('avg_percent_contribution'))

In [17]:
avg_percent_contribution.show()

+---------+-----------+------------------------+
|seller_id|seller_name|avg_percent_contribution|
+---------+-----------+------------------------+
|        7|   seller_7|     0.19601246306317585|
|        5|   seller_5|      0.3170589299015156|
|        2|   seller_2|      0.5064829818617169|
|        1|   seller_1|      1.4844178645806252|
|        6|   seller_6|     0.36093852517481856|
|        3|   seller_3|        1.23186781930548|
|        8|   seller_8|      0.6946060998563962|
|        4|   seller_4|     0.24841173704802327|
|        9|   seller_9|     0.29052998156736454|
|        0|   seller_0|     0.04443748977654746|
+---------+-----------+------------------------+



### Who are the second most selling and the least selling persons (sellers) for each product?

In [18]:
# Counting total orders per product per seller
sellers_sales = orders.groupBy('product_id', 'seller_id')\
                .agg(count('*').alias('total_orders'))

# Creating a window partitioned by product_id and ordered by total_orders DESC
windowSpec_desc = Window.partitionBy('product_id').orderBy(desc('total_orders'))
windowSpec_asc = Window.partitionBy('product_id').orderBy(asc('total_orders'))

# Ranking sellers per product
rank_sales_desc = sellers_sales.withColumn('rank', rank().over(windowSpec_desc))
rank_sales_asc = sellers_sales.withColumn('rank', rank().over(windowSpec_asc))

In [19]:
# Selecting the second most selling persons
second_most_selling = rank_sales_desc.filter(col('rank') == 2)
print('Most selling persons:')
second_most_selling.show(10)

Most selling persons:
+----------+---------+------------+----+
|product_id|seller_id|total_orders|rank|
+----------+---------+------------+----+
|  40579633|        9|           1|   2|
|   2316238|        5|           1|   2|
|   8916663|        5|           1|   2|
|  61540351|        3|           1|   2|
|  73385513|        2|           1|   2|
|  26915351|        6|           1|   2|
|  40193396|        9|           1|   2|
|   2839667|        9|           1|   2|
|  19978383|        8|           1|   2|
|  28183035|        8|           1|   2|
+----------+---------+------------+----+



In [20]:
# Selecting the least selling persons
least_selling = rank_sales_asc.filter(col('rank') == 1)
print('Least selling persons:')
least_selling.show(10)

Least selling persons:
+----------+---------+------------+----+
|product_id|seller_id|total_orders|rank|
+----------+---------+------------+----+
|      1650|        6|           1|   1|
|      1869|        9|           1|   1|
|      2335|        2|           1|   1|
|      2601|        4|           1|   1|
|      2656|        8|           1|   1|
|      2907|        5|           1|   1|
|      2999|        8|           1|   1|
|      2999|        7|           1|   1|
|      3481|        3|           1|   1|
|      4199|        3|           1|   1|
+----------+---------+------------+----+
only showing top 10 rows


### Who are those for product with `product_id = 0`

In [21]:
# Filtering sellers who sold product_id: 0
sellers_with_p0 = orders.filter(col('product_id') == 0).select('seller_id').distinct()

In [57]:
print('Sellers who sold product with product_id: 0')
sellers_with_p0.show()

Sellers who sold product with product_id: 0
+---------+
|seller_id|
+---------+
|        0|
+---------+



In [None]:
# Cross-checking
orders.filter(col('product_id') == 0).count()

19000000

In [59]:
orders.filter((col('product_id') == 0) & (col('seller_id') == 0)).count()

19000000

### Create a new column called "hashed_bill" defined as follows:

- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)

- if the order_id is odd: apply SHA256 hashing to the bill text

Finally, check if there are any duplicate on the new column

In [6]:
# Creating a user defined function
def hash_bill(order_id, bill_raw_text):
    if bill_raw_text is None:
        return None
    
    if order_id % 2 == 0:
        times = bill_raw_text.count('A')
        hashed = bill_raw_text.encode()
        for _ in range(times or 1):
            hashed = md5(hashed).hexdigest().encode()
        return hashed.decode()
    else:
        return sha256(bill_raw_text.encode()).hexdigest()

hash_bill_udf = udf(hash_bill, StringType())

In [7]:
orders_with_hash_bill = orders.withColumn(
    'hashed_bill',
    hash_bill_udf(col('order_id'), col('bill_raw_text'))
)

In [9]:
print('After applying hashing on even/odd order_id:')
orders_with_hash_bill.limit(10).show()

After applying hashing on even/odd order_id:
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|         hashed_bill|
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|f6fa2a8be04a4ead6...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|13af9a32eb8a31513...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|416376a64cd652e7b...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|7a73a2250f416bb81...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|787d361b162a6aa1a...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|4d0e2c457714c5d08...|
|       7|         0|        0|2020-07-04|    

In [10]:
# Checking for duplicates with top 10 values.
duplicate_hash_bill = orders_with_hash_bill.limit(10)\
                        .groupBy('hashed_bill')\
                        .count()\
                        .filter(col('count') > 1)


In [12]:
duplicate_hash_bill.count()

0

In [11]:
duplicate_hash_bill_count = duplicate_hash_bill.count()

if duplicate_hash_bill_count > 0:
    print(f"There are {duplicate_hash_bill_count} duplicate hashed_bill entries.")
    duplicate_hash_bill.show(10)
else:
    print("No duplicates found in hashed_bill.")

No duplicates found in hashed_bill.


In [12]:
# Checking for duplicates with all values
# Note: As we were running codes on local system, nodes got crashed for huge volume of data and lead to 'TimeoutError: timed out'.
# Running it on google collab or highly capable machine won't crash.

duplicate_hash_bill = orders_with_hash_bill\
                        .groupBy('hashed_bill')\
                        .count()\
                        .filter(col('count') > 1)

duplicate_hash_bill_count = duplicate_hash_bill.count()

if duplicate_hash_bill_count > 0:
    print(f"There are {duplicate_hash_bill_count} duplicate hashed_bill entries.")
    duplicate_hash_bill.show(10)
else:
    print("No duplicates found in hashed_bill.")

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "c:\Users\Admin\anaconda3\envs\testing_DG_pyspark\lib\socket.py", line 720, in readinto
    raise
TimeoutError: timed out


In [None]:
# Stop the SparkSession
spark.stop()