In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

In [2]:
spark = SparkSession.builder.appName("CustomerOrdersJobSQL").getOrCreate()

In [3]:
customers_path = "/opt/spark-apps/input/customers.csv"
orders_path = "/opt/spark-apps/input/orders.json"
output_path_csv = "/tmp/orders_enriched_csv"
output_path_parquet = "/tmp/orders_enriched_parquet"

In [4]:
df_customers = spark.read.option("header", True).csv(customers_path)
df_orders = spark.read.json(orders_path)

In [5]:
df_customers.show()

+-----------+----------+----------------+---+-------+
|customer_id|      name|           email|age|country|
+-----------+----------+----------------+---+-------+
|        101|  John Doe|john@example.com| 28|     US|
|        102|Jane Smith|jane@example.com| 34|     UK|
|        103|  Ali Khan| ali@example.com| 40|    UAE|
|        104|Rita Mehra|rita@example.com| 25|  India|
|        105| Wei Zhang| wei@example.com| 31|  China|
+-----------+----------+----------------+---+-------+



In [6]:
df_customers.createOrReplaceTempView("customers")
df_orders.createOrReplaceTempView("orders")

In [7]:
spark.sql("""
SELECT * FROM customers;
        """)

DataFrame[customer_id: string, name: string, email: string, age: string, country: string]

In [8]:
spark.sql("""
SELECT * FROM customers;
        """).show()

+-----------+----------+----------------+---+-------+
|customer_id|      name|           email|age|country|
+-----------+----------+----------------+---+-------+
|        101|  John Doe|john@example.com| 28|     US|
|        102|Jane Smith|jane@example.com| 34|     UK|
|        103|  Ali Khan| ali@example.com| 40|    UAE|
|        104|Rita Mehra|rita@example.com| 25|  India|
|        105| Wei Zhang| wei@example.com| 31|  China|
+-----------+----------+----------------+---+-------+



In [9]:
spark.sql("""
SELECT order_id,name,amount,
CASE WHEN o.amount >= 200 THEN "High Value"
WHEN o.amount >= 100 THEN "Medium Value"
ELSE "Low Value" END AS order_type
FROM customers c INNER JOIN orders o ON o.customer_id = c.customer_id;
        """).show()

+--------+----------+------+------------+
|order_id|      name|amount|  order_type|
+--------+----------+------+------------+
|    5001|  John Doe| 250.5|  High Value|
|    5002|Jane Smith| 145.0|Medium Value|
|    5003|Rita Mehra|389.99|  High Value|
|    5004| Wei Zhang| 89.99|   Low Value|
+--------+----------+------+------------+



In [11]:
df_enriched_op_sql = spark.sql("""
SELECT order_id,name,amount,
CASE WHEN o.amount >= 200 THEN "High Value"
WHEN o.amount >= 100 THEN "Medium Value"
ELSE "Low Value" END AS order_type
FROM customers c INNER JOIN orders o ON o.customer_id = c.customer_id;
        """)
df_enriched_op_sql.show()

+--------+----------+------+------------+
|order_id|      name|amount|  order_type|
+--------+----------+------+------------+
|    5001|  John Doe| 250.5|  High Value|
|    5002|Jane Smith| 145.0|Medium Value|
|    5003|Rita Mehra|389.99|  High Value|
|    5004| Wei Zhang| 89.99|   Low Value|
+--------+----------+------+------------+



In [14]:
df_enriched_op_sql.createOrReplaceTempView("df_enriched_op_sql")
spark.sql("""SELECT * FROM 
            df_enriched_op_sql 
            WHERE order_type in ("High Value","Medium Value");""").show()

+--------+----------+------+------------+
|order_id|      name|amount|  order_type|
+--------+----------+------+------------+
|    5001|  John Doe| 250.5|  High Value|
|    5002|Jane Smith| 145.0|Medium Value|
|    5003|Rita Mehra|389.99|  High Value|
+--------+----------+------+------------+



In [17]:
df_enriched_sql_output = spark.sql("""SELECT * FROM 
            df_enriched_op_sql 
            WHERE order_type in ("High Value","Medium Value");""")

In [18]:
df_enriched_sql_output.write.mode("overwrite").option("header",True).csv(output_path_csv)