<a href="https://colab.research.google.com/github/PedroTechy/DataProcessingEdit/blob/main/spark_streaming/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [85]:
#!rm -rf /content/content
!rm -rf /content/lake/silver

In [22]:
%pip install pyspark



# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
    - extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

In [24]:
%pip install faker



# Producer

In [70]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("/content/lake/bronze/messages/data")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.trigger(processingTime='1 seconds')
.foreachBatch(insert_messages)
.option('checkpointLocation', '/content/lake/bronze/messages/checkpoint')
.start()
)

query.awaitTermination(60)


False

In [None]:
query.stop()

In [84]:
df = spark.read.format("parquet").load("/content/lake/bronze/messages/data")
#df.show() #usefull to visualize the data and understand the schema

# some data exploration to be able to do step 2
df.show()


+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2024-12-10 23:08:...|   97|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|   99|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  101|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  103|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  105|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  107|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  109|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  111|      SENT|73c8ac47-1c78-441...|  EMAIL|      2006|   1018|
|2024-12-10 23:08:...|  113|    

# Additional datasets

In [28]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)

# Streaming Messages x Messages Corrupted

In [86]:
#Step 2

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType


# Can not use readStreamwithout  data schema, defining that first
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("value", LongType(), True),
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True)
])

# Getting the types of the schema rights required looking at some errors printed when writing to silver

bronze_df = spark.readStream.format("parquet").schema(schema).load("/content/lake/bronze/messages/data")

# Join with countries dataset
joined_df = bronze_df.join(countries, on="country_id", how="left")

# Split into messages_corrupted and messages (based on the exploration done before, for correupted values i have "", "NONE" and in case lets also clean the nulls (None))
messages_corrupted_df = joined_df.filter(col("event_type").isin([None, "", "NONE"]))
messages_df = joined_df.filter(col("event_type").isin(["CLICKED", "CREATED", "RECEIVED", "OPEN", "SENT"]))

# NOTE: I actualy tried to avoid this solution for messages_df as its not elegant at all.
# I had done it using subtract (joined_df.subtract(messages_corrupted_df) or joined_df.join(messages_corrupted_df, on=["message_id", "event_type"], how="left_anti")~
# but both cases are not possible. I also tried negating messages_corrupted_df, using joined_df.filter(~col("event_type").isin([None, "", "NONE"]))
# but this resulted in 0 rows (not sure why :/)

"""print(messages_corrupted_df.count())
print(messages_df.count())
print(joined_df.count())
print(messages_df.show())
print(messages_corrupted_df.show())"""

# Add date column using the timestamp already provided
messages_corrupted_df = messages_corrupted_df.withColumn("date", to_date(col("timestamp")))
messages_df = messages_df.withColumn("date", to_date(col("timestamp")))

# Given that we are wreiting under the same conditions, but just for two different locations, a functions saves us time
def write_to_silver(df, data_path, checkpoint_location):
    query = (df.writeStream
             .format("parquet")
             .outputMode("append")
             .partitionBy("date")
             .option("checkpointLocation", checkpoint_location)
             .trigger(processingTime="5 seconds")
             .start(data_path))
    return query

query_corrupted = write_to_silver(messages_corrupted_df, "/content/lake/silver/messages_corrupted/data", "/content/lake/silver/messages_corrupted/checkpoint")
query_messages = write_to_silver(messages_df, "/content/lake/silver/messages/data", "/content/lake/silver/messages/checkpoint")

# Run streaming for at least 20 seconds
query_corrupted.awaitTermination(20)
query_messages.awaitTermination(20)

# Stop streaming queries
query_corrupted.stop()
query_messages.stop()

## Checking data

In [87]:
# To check the results i will simply read them as batch and count. I could have read as strem and "freeze" but this is simpler I guess
# Count bronze records
bronze_count = spark.read.format("parquet").load("/content/lake/bronze/messages/data").count()

# Count silver records
silver_messages_count = spark.read.schema(schema).format("parquet").load("/content/lake/silver/messages/data").count()
silver_corrupted_count = spark.read.schema(schema).format("parquet").load("/content/lake/silver/messages_corrupted/data").count()

# Compare counts
if bronze_count == silver_messages_count + silver_corrupted_count:
    print("Data validation successful: Bronze count matches Silver count")
else:
    print("Data validation failed: Bronze count does not match Silver count")
    print(f"Bronze count: {bronze_count}")
    print(f"Silver messages count: {silver_messages_count}")
    print(f"Silver corrupted count: {silver_corrupted_count}")

Data validation successful: Bronze count matches Silver count


# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - in case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [88]:
# dedup data
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.format("parquet").load("/content/lake/silver/messages/data")
dedup = df.withColumn("row_number", F.row_number().over(Window.partitionBy("message_id", "event_type", "channel").orderBy("timestamp"))).filter("row_number = 1").drop("row_number")

### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - pivot event_type from rows into columns
  - schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [89]:
# report 1
from pyspark.sql.functions import col, count, to_date, date_format
from pyspark.sql import functions as F

# Assuming 'dedup' is the DataFrame after deduplication
report1 = dedup \
    .groupBy(date_format(col("timestamp"), "yyyy-MM-dd").alias("date"), "channel") \
    .pivot("event_type", ['CLICKED', 'CREATED', 'OPEN', 'RECEIVED', 'SENT']) \
    .agg(count("*")) \
    .orderBy("date", "channel")

report1.show()

"""
groupBy: We group the data by date and channel. The date_format function is used to extract the date from the timestamp column.
pivot: We use the pivot function to transform the event_type column into separate columns for each event type, with the values representing the count of messages for that event type.
agg(count("*")): We aggregate the data by counting the number of messages for each combination of date, channel, and event_type.
orderBy: We sort the results by date and channel for better readability.
"""

+----------+-------+-------+-------+----+--------+----+
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-10|   CHAT|      2|      1|   1|       3|   6|
|2024-12-10|  EMAIL|      3|      4|   2|       7|   3|
|2024-12-10|  OTHER|      3|      3|   6|       3|   3|
|2024-12-10|   PUSH|      5|      5|   8|       1|   2|
|2024-12-10|    SMS|      4|      8|   5|       5|   3|
+----------+-------+-------+-------+----+--------+----+



## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [None]:
# report 2

from pyspark.sql.functions import col, count, sum

# Assuming 'dedup' is the DataFrame after deduplication
report2 = dedup \
    .groupBy("user_id") \
    .agg(count("*").alias("iterations"),
         sum(F.when(col("channel") == "CHAT", 1, 0)).alias("CHAT"),
         sum(F.when(col("channel") == "EMAIL", 1, 0)).alias("EMAIL"),
         sum(F.when(col("channel") == "OTHER", 1, 0)).alias("OTHER"),
         sum(F.when(col("channel") == "PUSH", 1, 0)).alias("PUSH"),
         sum(F.when(col("channel") == "SMS", 1, 0)).alias("SMS")) \
    .orderBy(col("iterations").desc())

report2.show()

"""
groupBy: We group the data by user_id to analyze user activity.
agg: We use aggregate functions to calculate the following:
count("*").alias("iterations"): Counts the total number of interactions for each user, aliased as "iterations".
sum(F.when(col("channel") == "CHAT", 1, 0)).alias("CHAT"): Counts the number of interactions for each user in the "CHAT" channel, aliased as "CHAT". Similar logic is applied for other channels (EMAIL, OTHER, PUSH, SMS).
orderBy: We sort the results by "iterations" in descending order to identify the most active users.
"""

# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?

- Which storage would you use and why? (database?, data lake?, kafka?)

"""
R: For this use case, Spark Structured Streaming is a strong contender due to its ability to handle low-latency processing, a key requirement here.
However, Spark's strengths lie in its processing capabilities, not necessarily in direct data ingestion or output.
Therefore, a more robust solution would involve using Spark alongside a message broker like Kafka, Pub/Sub, or Kinesis for efficient and low-latency data ingestion.

Once Spark processes the data, the aggregated results need to be stored in a system optimized for frequent,
low-latency queries by the dashboard. Options include databases like Cassandra or MongoDB, or data lakes designed for fast reads like Delta Lake should be a good option.

"""

