In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, to_date, broadcast, when


In [2]:
# intialize spark session
spark = SparkSession.builder \
        .appName("DataProcessingPipeline1") \
        .config("spark.driver.port" , "4041") \
        .master("local") \
        .getOrCreate()


In [3]:
#   1.load data into pyspark dataframes 

In [4]:
# Read sales data (csv)
#  Use Raw Strings (r"string"): to avoid interuption of backslashes as escape characters
sales_df = spark.read.csv(r"C:\Users\arjun\Downloads\sales.csv" , header = True , inferSchema=True)


In [5]:
# read product data (json)
product_df = spark.read.json(r"C:\Users\arjun\Downloads\products.json")

In [6]:
# read region data(csv)
region_df = spark.read.csv(r"C:\Users\arjun\Downloads\regions.csv" , header = True , inferSchema = True)

In [7]:
sales_df.show()

+-------+----------+------+----------+---------+
|sale_id|product_id|amount|      date|region_id|
+-------+----------+------+----------+---------+
|      1|       101|   500|2025-02-28|        1|
|      2|       102|  1000|2025-02-28|        2|
|      3|       103|   150|2025-02-27|        1|
|      4|       104|   300|2025-02-26|        3|
+-------+----------+------+----------+---------+



In [8]:
product_df = spark.read.json(r"C:\Users\arjun\Downloads\products.json", multiLine=True)
product_df.show()


+-----------+----------+------------+
|   category|product_id|product_name|
+-----------+----------+------------+
|Electronics|       101|      Laptop|
|Electronics|       102|      Tablet|
| Stationery|       103|        Book|
|  Furniture|       104|       Chair|
+-----------+----------+------------+



In [9]:
region_df .show()

+---------+-----------+
|region_id|region_name|
+---------+-----------+
|        1|      North|
|        2|      South|
|        3|       East|
|        4|       West|
+---------+-----------+



In [None]:
# 2. Transformations

In [10]:
# Handling null values
sales_df = sales_df.fillna({'amount': 1}) # replace missing amount with 0
products_df = product_df.fillna({'category': 'Unknown'}) # Replace missing category with 'unknown'
region_df = region_df.dropna() # drop rows with missing region 

In [11]:
sales_df.show()
products_df.show()
region_df.show()

+-------+----------+------+----------+---------+
|sale_id|product_id|amount|      date|region_id|
+-------+----------+------+----------+---------+
|      1|       101|   500|2025-02-28|        1|
|      2|       102|  1000|2025-02-28|        2|
|      3|       103|   150|2025-02-27|        1|
|      4|       104|   300|2025-02-26|        3|
+-------+----------+------+----------+---------+

+-----------+----------+------------+
|   category|product_id|product_name|
+-----------+----------+------------+
|Electronics|       101|      Laptop|
|Electronics|       102|      Tablet|
| Stationery|       103|        Book|
|  Furniture|       104|       Chair|
+-----------+----------+------------+

+---------+-----------+
|region_id|region_name|
+---------+-----------+
|        1|      North|
|        2|      South|
|        3|       East|
|        4|       West|
+---------+-----------+



In [12]:
# standardizing dates
sales_df = sales_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))


In [13]:
sales_df.show()
sales_df.printSchema()


+-------+----------+------+----------+---------+
|sale_id|product_id|amount|      date|region_id|
+-------+----------+------+----------+---------+
|      1|       101|   500|2025-02-28|        1|
|      2|       102|  1000|2025-02-28|        2|
|      3|       103|   150|2025-02-27|        1|
|      4|       104|   300|2025-02-26|        3|
+-------+----------+------+----------+---------+

root
 |-- sale_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- amount: integer (nullable = false)
 |-- date: date (nullable = true)
 |-- region_id: integer (nullable = true)



In [14]:
# Adding derived column 
sales_df = sales_df.withColumn("transaction_category", when(sales_df["amount"]< 500, "low") .when (sales_df["amount"] >= 500, "High").otherwise("Unknown"))


In [15]:
sales_df.show()


+-------+----------+------+----------+---------+--------------------+
|sale_id|product_id|amount|      date|region_id|transaction_category|
+-------+----------+------+----------+---------+--------------------+
|      1|       101|   500|2025-02-28|        1|                High|
|      2|       102|  1000|2025-02-28|        2|                High|
|      3|       103|   150|2025-02-27|        1|                 low|
|      4|       104|   300|2025-02-26|        3|                 low|
+-------+----------+------+----------+---------+--------------------+



In [15]:
# 3.Join DataFrames

In [16]:
# join sales & product table
sales_products_df = sales_df.join(products_df, "product_id", "left")

In [17]:
sales_products_df.show()
sales_products_df.persist()

+----------+-------+------+----------+---------+--------------------+-----------+------------+
|product_id|sale_id|amount|      date|region_id|transaction_category|   category|product_name|
+----------+-------+------+----------+---------+--------------------+-----------+------------+
|       101|      1|   500|2025-02-28|        1|                High|Electronics|      Laptop|
|       102|      2|  1000|2025-02-28|        2|                High|Electronics|      Tablet|
|       103|      3|   150|2025-02-27|        1|                 low| Stationery|        Book|
|       104|      4|   300|2025-02-26|        3|                 low|  Furniture|       Chair|
+----------+-------+------+----------+---------+--------------------+-----------+------------+



DataFrame[product_id: int, sale_id: int, amount: int, date: date, region_id: int, transaction_category: string, category: string, product_name: string]

In [18]:
# Use broadcast join for smaller regions data (performance optimization)
final_df = sales_products_df.join(broadcast(region_df), "region_id", "left")

In [19]:
final_df.show()

+---------+----------+-------+------+----------+--------------------+-----------+------------+-----------+
|region_id|product_id|sale_id|amount|      date|transaction_category|   category|product_name|region_name|
+---------+----------+-------+------+----------+--------------------+-----------+------------+-----------+
|        1|       101|      1|   500|2025-02-28|                High|Electronics|      Laptop|      North|
|        2|       102|      2|  1000|2025-02-28|                High|Electronics|      Tablet|      South|
|        1|       103|      3|   150|2025-02-27|                 low| Stationery|        Book|      North|
|        3|       104|      4|   300|2025-02-26|                 low|  Furniture|       Chair|       East|
+---------+----------+-------+------+----------+--------------------+-----------+------------+-----------+



In [None]:
# 4. Compute business metrics 

In [20]:
# Total sales per category & region
Total_sales = final_df.groupBy("category", "region_name").agg(sum("amount").alias("total_sales"))
Total_sales.show(truncate = False)
    

+-----------+-----------+-----------+
|category   |region_name|total_sales|
+-----------+-----------+-----------+
|Furniture  |East       |300        |
|Electronics|North      |500        |
|Stationery |North      |150        |
|Electronics|South      |1000       |
+-----------+-----------+-----------+



In [21]:
# avg transactions per product category
avg_transaction_per_category = final_df.groupBy("category").agg(avg("amount").alias("average_transaction_amount"))
avg_transaction_per_category.show()                                                               

+-----------+--------------------------+
|   category|average_transaction_amount|
+-----------+--------------------------+
| Stationery|                     150.0|
|Electronics|                     750.0|
|  Furniture|                     300.0|
+-----------+--------------------------+



In [22]:
final_df.persist()

DataFrame[region_id: int, product_id: int, sale_id: int, amount: int, date: date, transaction_category: string, category: string, product_name: string, region_name: string]

In [33]:
final_df.write.partitionBy("region_name").mode("overwrite").parquet("final_df.parquet")


Py4JJavaError: An error occurred while calling o165.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
