In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pathlib import Path

spark = SparkSession.builder.appName("ExampleApp").master("local[*]").getOrCreate()

user_df = spark.read.csv("users.csv", header=True, inferSchema=True)
transaction_df = spark.read.csv("transactions.csv", header=True, inferSchema=True)  

transaction_df = transaction_df.withColumn("timestamp", F.to_date("timestamp", "yyyy-MM-dd"))

joined_df = user_df.join(transaction_df, on = "user_id", how = "inner")

windowDriver = Window.partitionBy("user_id").orderBy(F.asc("timestamp"))

joined_df = joined_df.withColumn("prev_date", F.lag("timestamp", 1).over(windowDriver))
joined_df = joined_df.withColumn("date_diff", F.dateDiff(F.col("timestamp"), F.col("prev_date")))

result_df = joined_df.filter("date_diff > 30")

result_df.coalesce(1).write.parquet("output/date_diff.parquet", mode="overwrite")
result_df.show(5, truncate=False)

In [3]:
#Ticket 1: The "Flattening" Pipeline (Ingestion)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os
from pathlib import Path

current_dir = Path(os.getcwd())
PROJECT_ROOT = current_dir.parent
DATA_DIR = PROJECT_ROOT / "notebook" / "sample_1.json"
# print(f"Project root directory: {DATA_DIR}")


spark = SparkSession.builder.appName("FlatteningPipeline").master("local[*]").getOrCreate()
df = spark.read.json(str(DATA_DIR))

df_explode = df.withColumn("history", F.explode_outer("watch_history"))

final_df = df_explode.select(
    "event_id",
    "user_id",
    F.col("timestamp"),
    F.when(F.col("device.type").isNull(), "Unknown").otherwise(F.col("device.type")).alias("device_type"),
    F.when((F.col("device.os").isNull())| (F.col("device.os") == ""), "Unknown").otherwise(F.col("device.os")).alias("device_os"),
    F.when(F.col("device.ip").isNull(), "Unknown").otherwise(F.col("device.ip")).alias("device_ip"),
    F.when(F.col("history.content_id").isNull(), "Unknown").otherwise(F.col("history.content_id")).alias("content_id"),
    F.when(F.col("history.genre").isNull(), "Unknown").otherwise(F.col("history.genre")).alias("genre"),
    F.col("history.minutes_watched").alias("minutes_watched")
)

# ticket 2: 
clean_df = final_df.filter((F.col("timestamp") != "0") & (F.col("minutes_watched") != "0"))
clean_df = clean_df.withColumn("is_high_value", F.when(F.col("minutes_watched") > 40, True).otherwise(False))

#ticket 3:
session_df = clean_df.withColumn("timestamp", F.regexp_replace("timestamp", "T", " "))
session_df = session_df.withColumn("ts_seconds", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

windowSpec = Window.partitionBy("user_id").orderBy(F.asc("timestamp"))
session_df = session_df.withColumn("prev_ts", F.lag("ts_seconds").over(windowSpec))
session_df = session_df.withColumn("diffseconds", F.col("ts_seconds") - F.col("prev_ts"))

session_result_df = session_df.withColumn("is_session_start", F.when((F.col("diffseconds") > 1800), True).otherwise(False))

# ticket 4:
total_minutes_df = clean_df.groupBy("device_type", "genre").agg(F.sum("minutes_watched").alias("total_minutes_watched"))

device_type_window = Window.partitionBy("device_type").orderBy(F.desc("total_minutes_watched"))
genre_result_df = total_minutes_df.withColumn("top_genre", F.rank().over(device_type_window))



final_df.show(10, truncate=False)
clean_df.show(10, truncate=False)
session_result_df.show(10, truncate=False)
genre_result_df.show(10, truncate=False)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Invalid Spark URL: spark://HeartbeatReceiver@AVI_LEGION.mshome.net:63528
	at org.apache.spark.rpc.RpcEndpointAddress$.apply(RpcEndpointAddress.scala:66)
	at org.apache.spark.rpc.netty.NettyRpcEnv.asyncSetupEndpointRefByURI(NettyRpcEnv.scala:143)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:362)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:66)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:136)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:229)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:629)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os
from pathlib import Path

DIR = Path(os.getcwd())
DATA_DIR = DIR / "sample_2.json"

spark = SparkSession.builder.appName("SpeedyKart").master("local[*]").getOrCreate()
df = spark.read.json(str(DATA_DIR))

exploded_df = df.withColumn("items", F.explode_outer(F.col("items")))

exploded_df = exploded_df.select(
    "order_id",
    "user_id",
    F.to_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("timestamp"),
    "status",
    "delivery_address",
    F.col("items.product_id").alias("product_id"),
    F.col("items.price").alias("price"),
    F.when(F.col("items.qty").isNull(),1).otherwise(F.col("items.qty")).alias("quantity")
)

# Ticket 1: Calculate the total revenue for each order, excluding cancelled orders.
Order_Revenue_df = exploded_df.filter(F.col("status") != "CANCELLED") \
    .withColumn("line_total", F.col("price") * F.col("quantity")) \
    .groupBy("order_id", "user_id", "timestamp").agg(F.coalesce(F.sum("line_total"), F.lit(0.0)).alias("order_revenue"))

# Ticket 2: Address Parsing (The String Challenge)0
address_df = exploded_df.withColumn("address_parts", F.split(F.col("delivery_address"), ","))
address_df = address_df.withColumn("last_part", F.trim(F.element_at(F.col("address_parts"), -1)))
address_df = address_df.withColumn("second_last_part", F.trim(F.element_at(F.col("address_parts"), -2)))

parsed_df = address_df.withColumn("state", F.when(F.length(F.col("last_part")) > 2, F.col("second_last_part")).otherwise(F.col("last_part")))
parsed_df = parsed_df.select("order_id", "user_id", "timestamp", "status", "delivery_address", "state")

# ticket 3: 
runningTotalWindow = Window.partitionBy("user_id").orderBy(F.asc("timestamp")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
remainingTotalWindow = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.currentRow, Window.unboundedFollowing)
vip_df = Order_Revenue_df.withColumn("running_total", F.sum("order_revenue").over(runningTotalWindow)).withColumn("remaining_total", F.sum("Order_Revenue").over(remainingTotalWindow))
vip_df = vip_df.withColumn("is_vip", F.when(F.col("running_total") >= 100, True).otherwise(False))

exploded_df.show(10, truncate=False)
Order_Revenue_df.show(10, truncate=False)
parsed_df.show(10, truncate=False)
vip_df.show(10, truncate=False)

+--------+-------+-------------------+---------+---------------------------------+----------+-----+--------+
|order_id|user_id|timestamp          |status   |delivery_address                 |product_id|price|quantity|
+--------+-------+-------------------+---------+---------------------------------+----------+-----+--------+
|NULL    |NULL   |NULL               |NULL     |NULL                             |NULL      |NULL |1       |
|o101    |501    |2023-06-01 10:00:00|DELIVERED|123 Apple St, New York, NY, 10001|p01       |10.0 |2       |
|o101    |501    |2023-06-01 10:00:00|DELIVERED|123 Apple St, New York, NY, 10001|p02       |5.0  |1       |
|o102    |502    |2023-06-01 11:30:00|PENDING  |456 Orange Ave, Los Angeles, CA  |p03       |20.0 |1       |
|o103    |501    |2023-06-02 09:15:00|CANCELLED|123 Apple St, New York, NY, 10001|p01       |10.0 |10      |
|o104    |501    |2023-06-05 14:00:00|DELIVERED|123 Apple St, NYC, NY, 10001     |p04       |50.0 |1       |
|o104    |501    |2