<a href="https://colab.research.google.com/github/ducline/edit-data_processing/blob/main/spark_streaming/challenges/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [None]:
%pip install pyspark



# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
    - extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

In [2]:
%pip install faker

Collecting faker
  Downloading Faker-33.3.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-33.3.0-py3-none-any.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.9/1.9 MB[0m [31m112.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m52.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-33.3.0


# Producer

In [6]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("content/lake/bronze/messages")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.trigger(processingTime='1 seconds')
.foreachBatch(insert_messages)
.start()
)

query.awaitTermination(60)


False

In [7]:
query.stop()

In [8]:
df = spark.read.format("parquet").load("content/lake/bronze/messages/*")
df.show()

+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2025-01-05 15:08:...|    0|   CLICKED|7fbb46e7-c645-40c...|  OTHER|      2005|   1043|
|2025-01-05 15:08:...|    2|   CLICKED|7fbb46e7-c645-40c...|  OTHER|      2005|   1043|
|2025-01-05 15:08:...|    1|   CLICKED|7fbb46e7-c645-40c...|  OTHER|      2005|   1043|
|2025-01-05 15:08:...|    3|   CLICKED|7fbb46e7-c645-40c...|  OTHER|      2005|   1043|
|2025-01-05 15:09:...|   74|  RECEIVED|d6e6164e-347c-429...|  EMAIL|      2007|   1025|
|2025-01-05 15:08:...|   32|  RECEIVED|afc92553-0692-47b...|  EMAIL|      2007|   1035|
|2025-01-05 15:08:...|    9|  RECEIVED|9a0b99b0-a894-44f...|  EMAIL|      2012|   1000|
|2025-01-05 15:08:...|   26|  RECEIVED|86f17417-9c67-40c...|   CHAT|      2001|   1040|
|2025-01-05 15:09:...|   72|  RE

# Additional datasets

In [13]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries_df = spark.createDataFrame(countries)

# Streaming Messages x Messages Corrupted

In [11]:
%mkdir /content/lake/bronze/messages/data

In [14]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define schema
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True)
])

# Read bronze messages
bronze_messages = spark.readStream.schema(schema).parquet("/content/lake/bronze/messages/data")


# Add country name
messages_with_country = bronze_messages.join(countries_df, "country_id", "left")

# Filter corrupted messages
messages_corrupted = messages_with_country.filter(
    (F.col("event_type").isNull()) |
    (F.col("event_type") == "") |
    (F.col("event_type") == "NONE")
)

# Filter valid messages
messages = messages_with_country.filter(
    ~((F.col("event_type").isNull()) |
      (F.col("event_type") == "") |
      (F.col("event_type") == "NONE"))
)

# Write messages_corrupted
query_corrupted = (messages_corrupted
    .withColumn("date", F.to_date("timestamp"))
    .writeStream
    .outputMode("append")
    .partitionBy("date")
    .format("parquet")
    .option("path", "/content/lake/silver/messages_corrupted/data")
    .option("checkpointLocation", "/content/lake/silver/messages_corrupted/checkpoint")
    .trigger(processingTime="5 seconds")
    .start()
)

# Write valid messages
query_valid = (messages
    .withColumn("date", F.to_date("timestamp"))
    .writeStream
    .outputMode("append")
    .partitionBy("date")
    .format("parquet")
    .option("path", "/content/lake/silver/messages/data")
    .option("checkpointLocation", "/content/lake/silver/messages/checkpoint")
    .trigger(processingTime="5 seconds")
    .start()
)

query_corrupted.awaitTermination(20)
query_valid.awaitTermination(20)


False

## Checking data

In [23]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType

schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("date", DateType(), True)
])


# Read bronze data as a batch DataFrame
bronze_messages_batch = spark.read.schema(schema).parquet("/content/lake/bronze/messages/data")

# Read silver data as batch DataFrames
silver_messages = spark.read.schema(schema).parquet("/content/lake/silver/messages/data")
silver_messages_corrupted = spark.read.schema(schema).parquet("/content/lake/silver/messages_corrupted/data")

# Count records
bronze_count = bronze_messages_batch.count()
silver_count = silver_messages.count() + silver_messages_corrupted.count()

# Validation
assert bronze_count == silver_count, "Mismatch between bronze and silver layer counts!"

# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - in case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [28]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define the schema for the data
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("date", TimestampType(), True)  # Add the appropriate type for date
])

# Read from the silver layer with the specified schema
df = spark.read.schema(schema).parquet("/content/lake/silver/messages/data")

# Deduplication logic (removing 'channel' as it doesn't exist in the schema)
window_spec = Window.partitionBy("message_id", "event_type").orderBy("timestamp")
dedup = df.withColumn("row_number", F.row_number().over(window_spec)).filter(F.col("row_number") == 1).drop("row_number")

# Write the deduplicated data
dedup.write.parquet("/content/lake/silver/messages/deduplicated")


### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - pivot event_type from rows into columns
  - schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [30]:
report1 = (dedup.groupBy("date")
           .pivot("event_type")
           .count()
           .orderBy("date"))
report1.show()


+----+
|date|
+----+
+----+



## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [33]:
report2 = (dedup.groupBy("user_id")
           .count()
           .groupBy("user_id")
           .pivot("event_type")
           .sum("count")
           .withColumn("iterations", F.expr("COALESCE(CHAT, 0) + COALESCE(EMAIL, 0) + COALESCE(OTHER, 0) + COALESCE(PUSH, 0) + COALESCE(SMS, 0)"))
           .orderBy(F.desc("iterations")))
report2.show()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `CHAT` cannot be resolved. Did you mean one of the following? [`user_id`].; line 1 pos 9;
'Project [user_id#2332, (((('COALESCE('CHAT, 0) + 'COALESCE('EMAIL, 0)) + 'COALESCE('OTHER, 0)) + 'COALESCE('PUSH, 0)) + 'COALESCE('SMS, 0)) AS iterations#2446]
+- Project [user_id#2332]
   +- Aggregate [user_id#2332], [user_id#2332, pivotfirst(event_type#2329, sum(count)#2442L, 0, 0) AS __pivot_sum(count) AS `sum(count)`#2444]
      +- Aggregate [user_id#2332, event_type#2329], [user_id#2332, event_type#2329, sum(count#2430L) AS sum(count)#2442L]
         +- Aggregate [user_id#2332, event_type#2329], [user_id#2332, event_type#2329, count(1) AS count#2430L]
            +- Project [timestamp#2328, event_type#2329, message_id#2330, country_id#2331, user_id#2332, country#2333, date#2334]
               +- Filter (row_number#2343 = 1)
                  +- Project [timestamp#2328, event_type#2329, message_id#2330, country_id#2331, user_id#2332, country#2333, date#2334, row_number#2343]
                     +- Project [timestamp#2328, event_type#2329, message_id#2330, country_id#2331, user_id#2332, country#2333, date#2334, row_number#2343, row_number#2343]
                        +- Window [row_number() windowspecdefinition(message_id#2330, event_type#2329, timestamp#2328 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#2343], [message_id#2330, event_type#2329], [timestamp#2328 ASC NULLS FIRST]
                           +- Project [timestamp#2328, event_type#2329, message_id#2330, country_id#2331, user_id#2332, country#2333, date#2334]
                              +- Relation [timestamp#2328,event_type#2329,message_id#2330,country_id#2331,user_id#2332,country#2333,date#2334] parquet


# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?
For real-time processing, I would use Spark Structured Streaming to process and update the data every few minutes.

- Which storage would you use and why? (database?, data lake?, kafka?)
For storage, Kafka is ideal for real-time data updates.



