In [89]:
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("spark_orders_job").config("spark.driver.memory", "16g").getOrCreate()

print("Done!", spark.sparkContext.uiWebUrl)

Done! http://DESKTOP-BHGT2IG:4040


In [90]:
PATH_ORD = r'C:\projects\etl\pyspark_airflow\data\orders.csv'
PATH_PDT = r'C:\projects\etl\pyspark_airflow\data\products.csv'

In [91]:
orders = spark.read.csv(path=PATH_ORD, header=True)
products = spark.read.csv(path=PATH_PDT, header=True)

In [92]:
products = (
    products
    .withColumn("product_id", F.col("product_id").cast("int"))
    .withColumn("price", F.col("price").cast("int"))
)

In [93]:
json_sample = '["234", "23"]'
json_schema = F.schema_of_json(json_sample)

orders = (
    orders
    .withColumn("order_id", F.col("order_id").cast("int"))
    .withColumn("creation_time", F.to_timestamp("creation_time", "dd/MM/yy HH:mm"))
    .withColumn("product_ids", F.from_json("product_ids", json_schema))
)

In [94]:
orders_exploded = orders.select(
    "order_id",
    "creation_time",
    F.explode("product_ids").alias("product_id")
)

In [95]:
orders_exploded.show()

+--------+-------------------+----------+
|order_id|      creation_time|product_id|
+--------+-------------------+----------+
|       1|2022-08-24 01:52:00|        65|
|       1|2022-08-24 01:52:00|        28|
|       2|2022-08-24 06:37:00|        35|
|       2|2022-08-24 06:37:00|        30|
|       2|2022-08-24 06:37:00|        42|
|       2|2022-08-24 06:37:00|        34|
|       3|2022-08-24 07:35:00|        24|
|       3|2022-08-24 07:35:00|        70|
|       3|2022-08-24 07:35:00|        77|
|       4|2022-08-24 10:39:00|        72|
|       4|2022-08-24 10:39:00|        40|
|       4|2022-08-24 10:39:00|        20|
|       5|2022-08-24 12:34:00|        48|
|       6|2022-08-24 13:46:00|        76|
|       6|2022-08-24 13:46:00|        52|
|       6|2022-08-24 13:46:00|        65|
|       6|2022-08-24 13:46:00|        75|
|       6|2022-08-24 13:46:00|        77|
|       7|2022-08-24 13:59:00|        35|
|       7|2022-08-24 13:59:00|        74|
+--------+-------------------+----

In [96]:
res = orders_exploded.join(products, on="product_id", how="left")


In [97]:
res = res.select(
    "order_id",
    "product_id",
    "price"
)

In [98]:
res = (
    res
    .orderBy(F.col("order_id"), F.col("product_id"))
    .limit(1000)
)

In [99]:
res.show()

+--------+----------+-----+
|order_id|product_id|price|
+--------+----------+-----+
|       1|        28|   75|
|       1|        65|  100|
|       2|        30|  140|
|       2|        34|   98|
|       2|        35|   90|
|       2|        42|   66|
|       3|        24|  120|
|       3|        70|   75|
|       3|        77|  298|
|       4|        20|   50|
|       4|        40|   40|
|       4|        72|   66|
|       5|        48|   51|
|       6|        52|   76|
|       6|        65|  100|
|       6|        75|  120|
|       6|        76|   56|
|       6|        77|  298|
|       7|        15|  450|
|       7|        34|   98|
+--------+----------+-----+
only showing top 20 rows
