<a href="https://colab.research.google.com/github/anaferreira744/DE-DP-ADF/blob/main/final_challenges.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!rm -rf /content/lake

# Setting up PySpark

In [None]:
%pip install pyspark



# Context
Message events are coming from platform message broker (kafka, pubsub, kinesis...).
You need to process the data according to the requirements.

Message schema:
- timestamp
- value
- event_type
- message_id
- country_id
- user_id



# Challenge 1

Step 1
- Change exising producer
	- Change parquet location to "/content/lake/bronze/messages/data"
	- Add checkpoint (/content/lake/bronze/messages/checkpoint)
	- Delete /content/lake/bronze/messages and reprocess data
	- For reprocessing, run the streaming for at least 1 minute, then stop it

Step 2
- Implement new stream job to read from messages in bronze layer and split result in two locations
	- "messages_corrupted"
		- logic: event_status is null, empty or equal to "NONE"
    - extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages_corrupted/data

	- "messages"
		- logic: not corrupted data
		- extra logic: add country name by joining message with countries dataset
		- partition by "date" -extract it from timestamp
		- location: /content/lake/silver/messages/data

	- technical requirements
		- add checkpint (choose location)
		- use StructSchema
		- Set trigger interval to 5 seconds
		- run streaming for at least 20 seconds, then stop it

	- alternatives
		- implementing single streaming job with foreach/- foreachBatch logic to write into two locations
		- implementing two streaming jobs, one for messages and another for messages_corrupted
		- (paying attention on the paths and checkpoints)


  - Check results:
    - results from messages in bronze layer should match with the sum of messages+messages_corrupted in the silver layer

In [None]:
%pip install faker



# Producer

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from faker import Faker
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Test streaming').getOrCreate()
sc = spark.sparkContext

fake = Faker()
messages = [fake.uuid4() for _ in range(50)]

def enrich_data(df, messages=messages):
  fake = Faker()
  new_columns = {
      'event_type': F.lit(fake.random_element(elements=('OPEN', 'RECEIVED', 'SENT', 'CREATED', 'CLICKED', '', 'NONE'))),
      'message_id': F.lit(fake.random_element(elements=messages)),
      'channel': F.lit(fake.random_element(elements=('CHAT', 'EMAIL', 'SMS', 'PUSH', 'OTHER'))),
      'country_id': F.lit(fake.random_int(min=2000, max=2015)),
      'user_id': F.lit(fake.random_int(min=1000, max=1050)),
  }
  df = df.withColumns(new_columns)
  return df

def insert_messages(df: DataFrame, batch_id):
  enrich = enrich_data(df)
  enrich.write.mode("append").format("parquet").save("/content/lake/bronze/messages/data")

# read stream
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# write stream
query = (df_stream.writeStream
.outputMode('append')
.trigger(processingTime='1 seconds')
.option("checkpointLocation", "/content/lake/bronze/messages/checkpoint")
.foreachBatch(insert_messages)
.start()
)


query.awaitTermination(60)


False

In [None]:
query.isActive

True

In [None]:
query.stop()

In [None]:
df = spark.read.format("parquet").load("/content/lake/bronze/messages/data/*")
df.show()
df.count()

+--------------------+-----+----------+--------------------+-------+----------+-------+
|           timestamp|value|event_type|          message_id|channel|country_id|user_id|
+--------------------+-----+----------+--------------------+-------+----------+-------+
|2024-12-11 21:56:...|    6|  RECEIVED|897004f2-01ba-415...|  EMAIL|      2003|   1002|
|2024-12-11 21:56:...|   49|  RECEIVED|8047a735-0310-403...|  OTHER|      2006|   1013|
|2024-12-11 21:56:...|   11|  RECEIVED|1e381dd3-81b1-4f7...|   CHAT|      2000|   1034|
|2024-12-11 21:57:...|   54|   CLICKED|8047a735-0310-403...|  EMAIL|      2008|   1038|
|2024-12-11 21:56:...|   47|   CLICKED|024b470e-46af-4f7...|  EMAIL|      2006|   1018|
|2024-12-11 21:56:...|   53|   CREATED|c3883efb-4464-4b6...|  OTHER|      2002|   1016|
|2024-12-11 21:56:...|    0|   CREATED|0cf9eec4-6142-4d8...|  OTHER|      2000|   1015|
|2024-12-11 21:57:...|   62|   CLICKED|1e381dd3-81b1-4f7...|  OTHER|      2001|   1015|
|2024-12-11 21:56:...|   36|   C

64

# Additional datasets

In [None]:
countries = [
    {"country_id": 2000, "country": "Brazil"},
    {"country_id": 2001, "country": "Portugal"},
    {"country_id": 2002, "country": "Spain"},
    {"country_id": 2003, "country": "Germany"},
    {"country_id": 2004, "country": "France"},
    {"country_id": 2005, "country": "Italy"},
    {"country_id": 2006, "country": "United Kingdom"},
    {"country_id": 2007, "country": "United States"},
    {"country_id": 2008, "country": "Canada"},
    {"country_id": 2009, "country": "Australia"},
    {"country_id": 2010, "country": "Japan"},
    {"country_id": 2011, "country": "China"},
    {"country_id": 2012, "country": "India"},
    {"country_id": 2013, "country": "South Korea"},
    {"country_id": 2014, "country": "Russia"},
    {"country_id": 2015, "country": "Argentina"}
]

countries = spark.createDataFrame(countries)

# Streaming Messages x Messages Corrupted

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import functions as F


# Definição do schema explícito para os dados
schema = StructType([
    StructField("event_type", StringType(), True),
    StructField("message_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])



# Função para escrever os dados em dois caminhos diferentes (dados válidos e corrompidos)
def split(df: DataFrame, batch_id):

    df=df.join(countries, on="country_id", how="left")

    # Dados corrompidos: event_type é null, vazio ou "NONE"
    corrupted_df = df.filter(
        (F.col("event_type").isNull()) |
        (F.col("event_type") == "") |
        (F.col("event_type") == "NONE")
    )

    # Escreve os dados corrompidos
    corrupted_df.write.mode("append").format("parquet") \
        .partitionBy("date") \
        .save("/content/lake/silver/messages_corrupted/data")

    # Dados válidos: não corrompidos
    valid_df = df.filter(~(
        (F.col("event_type").isNull()) |
        (F.col("event_type") == "") |
        (F.col("event_type") == "NONE")
    ))

    # Escreve os dados válidos
    valid_df.write.mode("append").format("parquet") \
        .partitionBy("date") \
        .save("/content/lake/silver/messages/data")

# Leitura do streaming de dados da camada Bronze
df_stream = spark.readStream.format("parquet") \
    .schema(schema) \
    .load("/content/lake/bronze/messages/data/*")

# Enriquecendo os dados com o nome do país e extraindo a data
df_enriched = df_stream.withColumn("date", F.to_date("timestamp"))

# Configuração do streaming para dividir os dados e escrever nos caminhos apropriados
query = (df_enriched.writeStream
    .outputMode("append")
    .trigger(processingTime="5 seconds")
    .option("checkpointLocation", "/content/lake/silver/checkpoint")
    .foreachBatch(split)
    .start()
)

# Executar o streaming por pelo menos 20 segundos
query.awaitTermination(20)



False

In [None]:
query.stop()

## Checking data

In [None]:
# Count messages in Bronze Layer (messages in /content/lake/bronze/messages/data/*)
bronze_df = spark.read.parquet("/content/lake/bronze/messages/data/*")
bronze_count = bronze_df.count()

# Count messages in Silver Layer (valid messages in /content/lake/silver/messages/data and corrupted messages in /content/lake/silver/messages_corrupted/data)
valid_messages_df = spark.read.parquet("/content/lake/silver/messages/data")
corrupted_messages_df = spark.read.parquet("/content/lake/silver/messages_corrupted/data")

valid_messages_count = valid_messages_df.count()
corrupted_messages_count = corrupted_messages_df.count()

# Calculate total messages in Silver Layer (valid + corrupted)
silver_total_count = valid_messages_count + corrupted_messages_count

# Perform the check
if bronze_count == silver_total_count:
    print("The counts match: Bronze layer count is equal to the sum of valid and corrupted messages in the Silver layer.")
else:
    print("The counts do not match!")
    print(f"Bronze Layer Count: {bronze_count}")
    print(f"Silver Layer Count (Valid + Corrupted): {silver_total_count}")

The counts match: Bronze layer count is equal to the sum of valid and corrupted messages in the Silver layer.


# Challenge 2

- Run business report
- But first, there is a bug in the system which is causing some duplicated messages, we need to exclude these lines from the report

- removing duplicates logic:
  - Identify possible duplicates on message_id, event_type and channel
  - in case of duplicates, consider only the first message (occurrence by timestamp)
  - Ex:
    In table below, the correct message to consider is the second line

```
    message_id | channel | event_type | timestamp
    123        | CHAT    | CREATED    | 10:10:01
    123        | CHAT    | CREATED    | 07:56:45 (first occurrence)
    123        | CHAT    | CREATED    | 08:13:33
```

- After cleaning the data we're able to create the busines report

In [None]:
# dedup data
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.format("parquet").load("/content/lake/silver/messages")
dedup = df.withColumn("row_number", F.row_number().over(Window.partitionBy("message_id", "event_type", "channel").orderBy("timestamp"))).filter("row_number = 1").drop("row_number")

### Report 1
  - Aggregate data by date, event_type and channel
  - Count number of messages
  - pivot event_type from rows into columns
  - schema expected:
  
```
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-03|    SMS|      4|      4|   1|       1|   5|
|2024-12-03|   CHAT|      3|      7|   5|       8|   4|
|2024-12-03|   PUSH|   NULL|      3|   4|       3|   4|
```

In [None]:
# 1. Agrupar por 'date', 'event_type', e 'channel' e contar o número de mensagens
aggregated_df = df.groupBy("date", "event_type", "channel").agg(
    F.count("*").alias("message_count")
)

# 2. Pivotar a coluna 'event_type' para que cada tipo de evento se torne uma coluna
pivoted_df = aggregated_df.groupBy("date", "channel").pivot("event_type").agg(
    F.sum("message_count").alias("message_count")
)

# 3. Exibir os resultados
pivoted_df.show()

+----------+-------+-------+-------+----+--------+----+
|      date|channel|CLICKED|CREATED|OPEN|RECEIVED|SENT|
+----------+-------+-------+-------+----+--------+----+
|2024-12-11|   PUSH|   NULL|      1|   1|    NULL|   3|
|2024-12-11|    SMS|   NULL|      3|   1|       2|   1|
|2024-12-11|  EMAIL|      2|   NULL|   4|       1|NULL|
|2024-12-11|  OTHER|      1|      2|   3|       1|   4|
|2024-12-11|   CHAT|      4|   NULL|NULL|       1|   5|
+----------+-------+-------+-------+----+--------+----+



In [None]:
# report 1
# TODO

## Report 2

- Identify the most active users by channel (sorted by number of iterations)
- schema expected:

```
+-------+----------+----+-----+-----+----+---+
|user_id|iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------+----+-----+-----+----+---+
|   1022|         5|   2|    0|    1|   0|  2|
|   1004|         4|   1|    1|    1|   1|  0|
|   1013|         4|   0|    0|    2|   1|  1|
|   1020|         4|   2|    0|    1|   1|  0|
```


In [None]:
from pyspark.sql import functions as F

# 1. Group by 'user_id' and 'channel', then count the number of messages per user for each channel
user_activity_df = df.groupBy("user_id", "channel").agg(
    F.count("*").alias("iterations")
)

# 2. Pivot the data to have each channel as a separate column
pivot_df = user_activity_df.groupBy("user_id").pivot("channel", ["CHAT", "EMAIL", "OTHER", "PUSH", "SMS"]).agg(
    F.sum("iterations").alias("iterations")
)

# 3. Fill null values with 0 (as users may not have messages in some channels)
pivot_df = pivot_df.fillna(0)

# 4. Calculate the total iterations (sum of messages across all channels for each user)
pivot_df = pivot_df.withColumn("total_iterations",
                               F.col("CHAT") + F.col("EMAIL") + F.col("OTHER") + F.col("PUSH") + F.col("SMS"))

# 5. Reorganize columns to place 'total_iterations' in the second position
pivot_df = pivot_df.select(
    "user_id", "total_iterations", "CHAT", "EMAIL", "OTHER", "PUSH", "SMS"
)

# 6. Sort by the total number of iterations in descending order
sorted_df = pivot_df.orderBy(F.col("total_iterations"), ascending=False)

# 7. Show the results
sorted_df.show()


+-------+----------------+----+-----+-----+----+---+
|user_id|total_iterations|CHAT|EMAIL|OTHER|PUSH|SMS|
+-------+----------------+----+-----+-----+----+---+
|   1016|               2|   1|    0|    1|   0|  0|
|   1034|               2|   1|    0|    1|   0|  0|
|   1028|               2|   1|    1|    0|   0|  0|
|   1032|               2|   1|    1|    0|   0|  0|
|   1002|               2|   1|    1|    0|   0|  0|
|   1015|               2|   0|    0|    2|   0|  0|
|   1007|               2|   0|    0|    1|   0|  1|
|   1049|               2|   0|    0|    0|   2|  0|
|   1038|               2|   1|    1|    0|   0|  0|
|   1013|               2|   0|    0|    2|   0|  0|
|   1009|               2|   1|    1|    0|   0|  0|
|   1031|               1|   0|    0|    1|   0|  0|
|   1019|               1|   0|    0|    0|   0|  1|
|   1046|               1|   0|    0|    0|   1|  0|
|   1047|               1|   0|    0|    0|   0|  1|
|   1021|               1|   0|    0|    0|   

In [None]:
# report 2
# TODO

# Challenge 3

In [None]:
# Theoretical question:

# A new usecase requires the message data to be aggregate in near real time
# They want to build a dashboard embedded in the platform website to analyze message data in low latency (few minutes)
# This application will access directly the data aggregated by streaming process

# Q1:
- What would be your suggestion to achieve that using Spark Structure Streaming?
Or would you choose a different data processing tool?

- Which storage would you use and why? (database?, data lake?, kafka?)

