<a href="https://colab.research.google.com/github/drmartins2/EDIT_DE/blob/main/5.%20Data%20Streaming/challenges/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [1]:
%pip install pyspark



# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
    - extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

In [2]:
%pip install faker

Collecting faker
  Downloading Faker-33.1.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-33.1.0-py3-none-any.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.9 MB[0m [31m5.9 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━[0m [32m1.6/1.9 MB[0m [31m23.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m20.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-33.1.0


# Producer [DEPRECATED]

In [3]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("content/lake/bronze/messages")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.trigger(processingTime='1 seconds')
.foreachBatch(insert_messages)
.start()
)

query.awaitTermination(60)


False

In [4]:
query.stop()

In [5]:
df = spark.read.format("parquet").load("content/lake/bronze/messages/*")
df.show()

+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2024-12-16 12:01:...|    0|   CLICKED|2d60aaa5-f9b9-45a...|   CHAT|      2004|   1040|
|2024-12-16 12:02:...|    2|   CLICKED|2d60aaa5-f9b9-45a...|   CHAT|      2004|   1040|
|2024-12-16 12:02:...|    4|   CLICKED|2d60aaa5-f9b9-45a...|   CHAT|      2004|   1040|
|2024-12-16 12:01:...|    1|   CLICKED|2d60aaa5-f9b9-45a...|   CHAT|      2004|   1040|
|2024-12-16 12:02:...|    3|   CLICKED|2d60aaa5-f9b9-45a...|   CHAT|      2004|   1040|
|2024-12-16 12:11:...|  542|  RECEIVED|f8b87f1e-e2ac-454...|  EMAIL|      2011|   1022|
|2024-12-16 12:04:...|  172|  RECEIVED|a8c4c974-3fa1-4e4...|  EMAIL|      2004|   1049|
|2024-12-16 12:12:...|  606|  RECEIVED|67633b06-5b9e-459...|  EMAIL|      2003|   1003|
|2024-12-16 12:03:...|   65|  RE

# Additional datasets [DEPRECATED]

In [6]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)

#New Producer

In [7]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession
import shutil
import os

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
    fake = Faker()
    new_columns = {
        'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
        'message_id': F.lit(fake.random_element(elements=messages)),
        'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
        'country_id': F.lit(fake.random_int(min=2000, max=2015)),
        'user_id': F.lit(fake.random_int(min=1000, max=1050)),
    }
    df = df.withColumns(new_columns)
    return df

def insert_messages(df: DataFrame, batch_id):
    enrich = enrich_data(df)
    enrich = enrich.withColumn("value", F.col("value").cast("string"))
    enrich.write.mode("append").format("parquet").save("/content/lake/bronze/messages/data")

# Delete existing data - For first execution there's no data to delete (as directory is different from the other Producer)
bronze_path = "/content/lake/bronze/messages/data"
if os.path.exists(bronze_path):
    shutil.rmtree(bronze_path)

# Read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write stream
query = (df_stream.writeStream
    .outputMode('append')
    .trigger(processingTime='1 seconds')
    .option("checkpointLocation", "/content/lake/bronze/messages/checkpoint")
    .foreachBatch(insert_messages)
    .start()
)

# Run for 1 minute
query.awaitTermination(60)


False

In [8]:
query.stop()

In [9]:
# Check New Producer results
df = spark.read.format("parquet").load("/content/lake/bronze/messages/data")
print("Total messages in bronze layer:", df.count())

# Check dataframe with a sample of results (with ~ [valid messages], without ~ [corrupted messages])
df.filter(~((F.col("event_type").isNull()) | (F.col("event_type") == "") | (F.col("event_type") == "NONE"))).show()

Total messages in bronze layer: 74
+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2024-12-16 12:19:...|   71|  RECEIVED|26ea780c-b1e3-4fe...|  EMAIL|      2003|   1007|
|2024-12-16 12:19:...|   67|  RECEIVED|ab88e064-c736-42a...|  EMAIL|      2003|   1035|
|2024-12-16 12:19:...|   34|  RECEIVED|e1321dba-d2b5-429...|   PUSH|      2006|   1024|
|2024-12-16 12:19:...|   49|   CREATED|f1426824-a3e9-4f0...|  EMAIL|      2014|   1032|
|2024-12-16 12:19:...|   53|  RECEIVED|3eecdacf-c88e-48f...|   PUSH|      2011|   1037|
|2024-12-16 12:18:...|   17|   CREATED|3eecdacf-c88e-48f...|  EMAIL|      2003|   1012|
|2024-12-16 12:18:...|   24|  RECEIVED|b128f5ae-814f-48b...|   CHAT|      2001|   1035|
|2024-12-16 12:18:...|   20|  RECEIVED|d74e7b99-edb0-449...|   CHAT|      2012|   102

# Streaming Messages x Messages Corrupted

In [10]:
# Aux cell to delete silver folder for testing purpose
silver_path = "/content/lake/silver"
if os.path.exists(silver_path):
    shutil.rmtree(silver_path)

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, date_format, expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

spark = SparkSession.builder.appName("MessageProcessing").getOrCreate()

# Define messages schema
message_schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("value", StringType()),
    StructField("event_type", StringType()),
    StructField("message_id", StringType()),
    StructField("channel", StringType()),
    StructField("country_id", IntegerType()),
    StructField("user_id", IntegerType())
])

# Added countries data to this cell for join optimization by caching countries dataset
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries).cache()


def process_messages(df, batch_id):
      # Join with countries
      df_with_country = df.join(countries, "country_id", "left")

      # Add date column
      df_with_date = df_with_country.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))

      # Split into corrupted and non-corrupted with corresponding logic
      corrupted = df_with_date.filter((col("event_type").isNull()) | (col("event_type") == "") | (col("event_type") == "NONE"))
      valid = df_with_date.filter((col("event_type").isNotNull()) & (col("event_type") != "") & (col("event_type") != "NONE"))

      # Write corrupted data
      corrupted.write \
          .partitionBy("date") \
          .mode("append") \
          .parquet("/content/lake/silver/messages_corrupted/data")

      # Write non-corrupted data
      valid.write \
          .partitionBy("date") \
          .mode("append") \
          .parquet("/content/lake/silver/messages/data")



# Read from bronze layer
df_stream = spark.readStream \
    .schema(message_schema) \
    .parquet("/content/lake/bronze/messages/data/*")


# Process stream
query = df_stream.writeStream \
    .foreachBatch(process_messages) \
    .option("checkpointLocation", "/content/lake/silver/checkpoint") \
    .trigger(processingTime='5 seconds') \
    .start()


# Run for 20 seconds
query.awaitTermination(20)
query.stop()

## Checking data

In [12]:
# Check results
bronze_count = spark.read.schema(message_schema).parquet("/content/lake/bronze/messages/data").count()
silver_corrupted_count = spark.read.parquet("/content/lake/silver/messages_corrupted/data").count()
silver_valid_count = spark.read.parquet("/content/lake/silver/messages/data").count()

print(f"Total messages in bronze layer: {bronze_count}")
print(f"Total corrupted messages in silver layer: {silver_corrupted_count}")
print(f"Total non-corrupted messages in silver layer: {silver_valid_count}")
print(f"Total messages in silver layer: {silver_corrupted_count + silver_valid_count}")

# Assertion to automaticaly evaluate the integraty of the number of records between bronze and silver layer
try:
    assert bronze_count == silver_corrupted_count + silver_valid_count, "Mismatch in message counts"
    print("Assertion passed: Message counts match")
except AssertionError as e:
    print(f"Assertion failed: {str(e)}")

Total messages in bronze layer: 74
Total corrupted messages in silver layer: 17
Total non-corrupted messages in silver layer: 57
Total messages in silver layer: 74
Assertion passed: Message counts match


# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - in case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [14]:
# dedup data
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.format("parquet").load("/content/lake/silver/messages")
dedup = df.withColumn("row_number", F.row_number().over(Window.partitionBy("message_id", "event_type", "channel").orderBy("timestamp"))).filter("row_number = 1").drop("row_number")

### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - pivot event_type from rows into columns
  - schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [16]:
# report 1
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Leitura dos dados deduplicados
dedup = spark.read.parquet("/content/lake/silver/messages/data")

# Criação do Relatório 1
report1 = (dedup
    .withColumn("date", F.to_date("timestamp"))  # Extrair a data da coluna timestamp
    .groupBy("date", "channel")
    .pivot("event_type")
    .agg(F.count("*").alias("count"))
    .orderBy("date", "channel")
)

# Adicionar uma coluna de total
report1 = report1.withColumn("total", sum(F.coalesce(report1[col], F.lit(0)) for col in report1.columns if col not in ["date", "channel"]))


# Mostrar o esquema do relatório
report1.printSchema()

# Mostrar as primeiras linhas do relatório
report1.show(truncate=False)

# Salvar o relatório em formato parquet
report1.write.mode("overwrite").parquet("/content/lake/gold/report1/data")


root
 |-- date: date (nullable = true)
 |-- channel: string (nullable = true)
 |-- CLICKED: long (nullable = true)
 |-- CREATED: long (nullable = true)
 |-- OPEN: long (nullable = true)
 |-- RECEIVED: long (nullable = true)
 |-- SENT: long (nullable = true)
 |-- total: long (nullable = false)

+----------+-------+-------+-------+----+--------+----+-----+
|date      |channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|total|
+----------+-------+-------+-------+----+--------+----+-----+
|2024-12-16|CHAT   |1      |2      |4   |2       |2   |11   |
|2024-12-16|EMAIL  |2      |3      |1   |2       |2   |10   |
|2024-12-16|OTHER  |4      |3      |3   |1       |2   |13   |
|2024-12-16|PUSH   |1      |5      |2   |4       |2   |14   |
|2024-12-16|SMS    |NULL   |5      |1   |2       |1   |9    |
+----------+-------+-------+-------+----+--------+----+-----+



## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [18]:
# report 2
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Leitura dos dados deduplicados
dedup = spark.read.parquet("/content/lake/silver/messages/data")

# Criação do Relatório 2
report2 = (dedup
    .groupBy("user_id", "channel")
    .agg(F.count("*").alias("interactions"))
    .groupBy("user_id")
    .pivot("channel")
    .agg(F.first("interactions"))
    .fillna(0)  # Replaces NULL values for 0
)

# Adicionar a coluna 'iterations' com o total de interações por usuário
report2 = report2.withColumn("iterations", sum(report2[col] for col in report2.columns if col != "user_id"))

# Reordenar as colunas para que 'iterations' venha logo após 'user_id'
columns_order = ["user_id", "iterations"] + [col for col in report2.columns if col not in ["user_id", "iterations"]]
report2 = report2.select(columns_order)

# Mostrar o esquema do relatório
report2.printSchema()

# Mostrar as primeiras linhas do relatório
report2.show(truncate=False)

# Salvar o relatório em formato parquet
report2.write.mode("overwrite").parquet("/content/lake/gold/report2/data")


root
 |-- user_id: integer (nullable = true)
 |-- iterations: long (nullable = true)
 |-- CHAT: long (nullable = true)
 |-- EMAIL: long (nullable = true)
 |-- OTHER: long (nullable = true)
 |-- PUSH: long (nullable = true)
 |-- SMS: long (nullable = true)

+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|1005   |1         |0   |1    |0    |0   |0  |
|1031   |1         |0   |0    |0    |0   |1  |
|1030   |2         |0   |0    |1    |1   |0  |
|1019   |2         |1   |0    |1    |0   |0  |
|1008   |1         |0   |0    |0    |1   |0  |
|1047   |1         |0   |0    |0    |0   |1  |
|1021   |1         |0   |0    |1    |0   |0  |
|1026   |2         |0   |0    |0    |1   |1  |
|1028   |3         |1   |1    |0    |1   |0  |
|1032   |2         |1   |1    |0    |0   |0  |
|1010   |1         |0   |1    |0    |0   |0  |
|1050   |1         |0   |0    |1    |0   |0  |
|1035   |2         |1   |1    |0    |0

# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?

- Which storage would you use and why? (database?, data lake?, kafka?)


To achieve **near real-time** aggregation of message data with low latency, using Spark Structured Streaming, it's an excellent choice for this use case due to the following reasons:

*   Supports real-time event processing with exactly-once guarantees;
*   Allows for event-time based window aggregations and use of watermarks to handle late data.´;
*   Easily integrates with various data sources and sinks;
*   Offers good scalability and fault tolerance.

Suggested implementation:


*   Use event-time based window aggregations (e.g., 1-minute windows);
*   Set a short trigger interval (e.g., 30 seconds) for frequent updates;
*   Use the "append" output mode to write only new aggregated results.



For storage, I would recommend using a database optimized for real-time analytics, such as Apache Cassandra or Amazon DynamoDB. The reasons for this choice are:
They offer low-latency reads and writes, essential for serving a real-time dashboard.
Support high transaction rates, suitable for continuous ingestion of aggregated data.
Allow for fast queries to feed the dashboard embedded in the platform website.
Provide good horizontal scalability to handle growing data.


Alternative: If ultra-low latency (sub-second) is critical, consider using Apache Kafka as an intermediate storage layer. This would allow consumers (like the dashboard) to subscribe directly to Kafka topics for real-time updates.