In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

spark = SparkSession.builder.appName("CustomerOrdersJob").getOrCreate()

In [2]:
customers_path = "/opt/spark-apps/input/customers.csv"
orders_path = "/opt/spark-apps/input/orders.json"
output_path_csv = "/tmp/orders_enriched_csv"
output_path_parquet = "/tmp/orders_enriched_parquet"

In [3]:
df_customers = spark.read.option("header", True).csv(customers_path)
df_orders = spark.read.json(orders_path)

In [4]:
df_customers.show()


+-----------+----------+----------------+---+-------+
|customer_id|      name|           email|age|country|
+-----------+----------+----------------+---+-------+
|        101|  John Doe|john@example.com| 28|     US|
|        102|Jane Smith|jane@example.com| 34|     UK|
|        103|  Ali Khan| ali@example.com| 40|    UAE|
|        104|Rita Mehra|rita@example.com| 25|  India|
|        105| Wei Zhang| wei@example.com| 31|  China|
+-----------+----------+----------------+---+-------+



In [5]:
df_customers.show(4, False)

+-----------+----------+----------------+---+-------+
|customer_id|name      |email           |age|country|
+-----------+----------+----------------+---+-------+
|101        |John Doe  |john@example.com|28 |US     |
|102        |Jane Smith|jane@example.com|34 |UK     |
|103        |Ali Khan  |ali@example.com |40 |UAE    |
|104        |Rita Mehra|rita@example.com|25 |India  |
+-----------+----------+----------------+---+-------+
only showing top 4 rows



In [6]:
df_orders.show(4)

+------+-----------+--------+----------+
|amount|customer_id|order_id|    status|
+------+-----------+--------+----------+
| 250.5|        101|    5001|   shipped|
| 145.0|        102|    5002| cancelled|
|389.99|        104|    5003|processing|
| 89.99|        105|    5004|   shipped|
+------+-----------+--------+----------+
only showing top 4 rows



In [7]:

df_joined = df_orders.join(df_customers, on="customer_id", how="inner")

In [8]:
df_joined.show(4)


+-----------+------+--------+----------+----------+----------------+---+-------+
|customer_id|amount|order_id|    status|      name|           email|age|country|
+-----------+------+--------+----------+----------+----------------+---+-------+
|        101| 250.5|    5001|   shipped|  John Doe|john@example.com| 28|     US|
|        102| 145.0|    5002| cancelled|Jane Smith|jane@example.com| 34|     UK|
|        104|389.99|    5003|processing|Rita Mehra|rita@example.com| 25|  India|
|        105| 89.99|    5004|   shipped| Wei Zhang| wei@example.com| 31|  China|
+-----------+------+--------+----------+----------+----------------+---+-------+



In [9]:
df_enriched = df_joined.withColumn(
    "order_type",
    when(df_joined.amount >= 200, "High Value")
    .when(df_joined.amount >= 100, "Medium Value")
    .otherwise("Low Value")
)

In [10]:

df_enriched.select("order_id", "name", "amount", "order_type").show()

+--------+----------+------+------------+
|order_id|      name|amount|  order_type|
+--------+----------+------+------------+
|    5001|  John Doe| 250.5|  High Value|
|    5002|Jane Smith| 145.0|Medium Value|
|    5003|Rita Mehra|389.99|  High Value|
|    5004| Wei Zhang| 89.99|   Low Value|
+--------+----------+------+------------+



In [11]:
df_enriched_op = df_enriched.select("order_id", "name", "amount", "order_type")

In [12]:
df_enriched_op.write.mode("overwrite").option("header", True).csv(output_path_csv)

In [13]:
df_enriched_op.write.mode("overwrite").parquet(output_path_parquet)