#PySpark Coding Assessment

##Basic PySpark SetUp and Loading Data

Basic PySpark Set-up

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("CodingAssessment").getOrCreate()

Loading the CSV file

In [6]:
orders_df = spark.read.option("header", True).csv("orders.csv")
orders_df.show()
orders_df.printSchema()
orders_df.createOrReplaceTempView("orders")

customers_df = spark.read.option("header", True).csv("customers.csv")
customers_df.show()
customers_df.printSchema()

+-------+----------+----------+----------+-----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|
+-------+----------+----------+----------+-----------+
|      1|      john|       doe|         5|     active|
|      2|      jane|     smith|         8|     active|
|      3|   micheal|   jhonson|         3|   inactive|
|      4|      abhi|   wiliams|         1|     active|
|      5|       ram|     brown|         4|   inactive|
|      6|     emily|  anderson|         2|     active|
|      7|   william|     jones|        10|     active|
|      8|     susan|     davis|         7|   inactive|
|      9|     david|    miller|         9|     active|
|     10|      sara|     moore|         2|   inactive|
|     11|     james|    tailor|         5|   inactive|
|     12|    olivia|    wilson|         3|   inactive|
|     13|    robert|     evans|        11|     active|
|     14|      emma|    thomas|        29|     active|
|     15|    mathew|     haris|         5|   inactive|
|     16| 

##Transformations

Filter

In [7]:
# Filter customers with status 'active'
orders_df.filter(col("cust_status") == "active").show()


+-------+----------+----------+----------+-----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|
+-------+----------+----------+----------+-----------+
|      1|      john|       doe|         5|     active|
|      2|      jane|     smith|         8|     active|
|      4|      abhi|   wiliams|         1|     active|
|      6|     emily|  anderson|         2|     active|
|      7|   william|     jones|        10|     active|
|      9|     david|    miller|         9|     active|
|     13|    robert|     evans|        11|     active|
|     14|      emma|    thomas|        29|     active|
|     18|     grace|       lee|         5|     active|
|     20|       ava|    joesph|         3|     active|
+-------+----------+----------+----------+-----------+



GroupBy and Aggregation

In [13]:
#1.Count
orders_df.groupBy("cust_status").agg(count("*").alias("order_count")).show()

#2.avg
orders_df.groupBy("cust_id").agg(avg("cust_order").alias("avg_order_value")).show()

#3.sum
orders_df.groupBy("cust_id").agg(sum("cust_order").alias("total_order_value")).show()


+-----------+-----------+
|cust_status|order_count|
+-----------+-----------+
|     active|         10|
|   inactive|         10|
+-----------+-----------+

+-------+---------------+
|cust_id|avg_order_value|
+-------+---------------+
|      7|           10.0|
|     15|            5.0|
|     11|            5.0|
|      3|            3.0|
|      8|            7.0|
|     16|            6.0|
|      5|            4.0|
|     18|            5.0|
|     17|            4.0|
|      6|            2.0|
|     19|            8.0|
|      9|            9.0|
|      1|            5.0|
|     20|            3.0|
|     10|            2.0|
|      4|            1.0|
|     12|            3.0|
|     13|           11.0|
|     14|           29.0|
|      2|            8.0|
+-------+---------------+

+-------+-----------------+
|cust_id|total_order_value|
+-------+-----------------+
|      7|             10.0|
|     15|              5.0|
|     11|              5.0|
|      3|              3.0|
|      8|             

Window Functions

In [9]:
# Row number per customer based on order (example assuming 'cust_order' is numeric)
window_spec = Window.partitionBy("cust_id").orderBy(col("cust_order").desc())
orders_df.withColumn("row_num", row_number().over(window_spec)).show()


+-------+----------+----------+----------+-----------+-------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|row_num|
+-------+----------+----------+----------+-----------+-------+
|      1|      john|       doe|         5|     active|      1|
|     10|      sara|     moore|         2|   inactive|      1|
|     11|     james|    tailor|         5|   inactive|      1|
|     12|    olivia|    wilson|         3|   inactive|      1|
|     13|    robert|     evans|        11|     active|      1|
|     14|      emma|    thomas|        29|     active|      1|
|     15|    mathew|     haris|         5|   inactive|      1|
|     16|  isabella|     white|         6|   inactive|      1|
|     17|    joseph|    martin|         4|   inactive|      1|
|     18|     grace|       lee|         5|     active|      1|
|     19|chrisopher|      basa|         8|   inactive|      1|
|      2|      jane|     smith|         8|     active|      1|
|     20|       ava|    joesph|         3|     active| 

Joins

In [11]:
orders_df.join(customers_df, "cust_id", "inner").show()

+-------+----------+----------+----------+-----------+-------------------+----------+---------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|         cust_email|cust_phone|cust_city|
+-------+----------+----------+----------+-----------+-------------------+----------+---------+
|      1|      john|       doe|         5|     active|  alice@example.com|1234567890|  Chennai|
|      2|      jane|     smith|         8|     active|    bob@example.com|2345678901|Bangalore|
|      3|   micheal|   jhonson|         3|   inactive|charlie@example.com|3456789012|Hyderabad|
|      4|      abhi|   wiliams|         1|     active|   dave@example.com|4567890123|   Mumbai|
+-------+----------+----------+----------+-----------+-------------------+----------+---------+



WithColumn

In [None]:
orders_df.withColumn("cust_order_plus1", col("cust_order") + 1).show()

Select

In [None]:
orders_df.select("cust_id", "cust_fname", "cust_order").show()

Distinct

In [None]:
orders_df.distinct().show()

OrderBy

In [None]:
orders_df.orderBy(col("cust_order").desc()).show()

Limit

In [None]:
orders_df.limit(3).show()

Union

In [None]:
extra_data = [(106, "New", "User", 4, "ACTIVE")]
extra_df = spark.createDataFrame(extra_data, schema=orders_df.schema)

union_df = orders_df.union(extra_df)
union_df.show()

Drop

In [None]:
orders_df.drop("cust_status").show()

Fillna & Dropna

In [None]:
# Fill null values in cust_order with 0
orders_df.fillna({"cust_order": 0}).show()

# Drop rows where any value is null
orders_df.dropna().show()

##Actions

First

In [None]:
print(orders_df.first())

Head(n)

In [None]:
print(orders_df.head(3))

Take(n)

In [None]:
print(orders_df.take(2))

Write

In [None]:
orders_df.write.csv("orders_output", header=True)