# Big Data Processing Project #

##Project Part I: Batch Processing ##
**Q.1.** Load data

In [0]:
# Define the name of the new catalogue, schema & volume
catalog = "Big_data_project"
schema = "IMDB_schema"
volume = "IMDB_volume"
# Create the new schema & volume
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume}")

# Download data from website
IMDb_URL = "https://datasets.imdbws.com/"
data_name = ["name.basics.tsv.gz", "title.akas.tsv.gz", "title.basics.tsv.gz", "title.crew.tsv.gz", "title.episode.tsv.gz", "title.principals.tsv.gz", "title.ratings.tsv.gz"]
for name in data_name:
  data_url = f"{IMDb_URL}{name}"
  target_path = f"/Volumes/{catalog}/{schema}/{volume}/{name}"
  dbutils.fs.cp(data_url, target_path)
  print(f"Data {name} downloaded to {target_path}")

In [0]:
# Load data into a spark dataframe
df_name = spark.read.format('csv')\
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[0]}")
df_title = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[1]}")
df_basic = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[2]}")
df_crew = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[3]}")
df_episode = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[4]}")
df_principals = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[5]}")
df_ratings = spark.read.format("csv")\
    .option("header", "true") \
    .option("sep", "\t") \
        .load(f"/Volumes/{catalog}/{schema}/{volume}/{data_name[6]}")

df_name.limit(5).display()

In [0]:
df_name.printSchema()

In [0]:
from pyspark.sql.functions import col

df_name = (df_name
    .withColumn('birthYear', col('birthYear').cast('integer'))
    .withColumn('deathYear', col('deathYear').cast('integer'))
)

# Check the schema to confirm the change
df_name.printSchema()

**Q.2.** How many total people in data set?

In [0]:
# Number of rows in name table
nbr_prs = df_name.select('nconst').distinct().count()
print(f"There are {nbr_prs} people in the data set. ")

**Q.3.** What is the earliest year of birth?

In [0]:
# Find the earliest year of birth 
df_name.select('birthYear').filter(col("birthYear").isNotNull()).orderBy('birthYear', ascending=False).limit(2).show()

The earliest year of birth is 2025

**Q.4.** How many years ago was this person born?

This person was born this year.

**Q.5.** Using only the data in the data set, determine if this date of birth correct.

No the birth date is not correct.

**Q.6.** Explain the reasoning for the answer in a code comment or new markdown cell.

The birth date is not correct because this person is supposedly born this year but she already have a profession. This is not logical.

In [0]:
df_name.filter(col("birthYear").isNotNull()).orderBy('birthYear', ascending=False).limit(2).show()

**Q.7.** What is the most recent date of birth?

In [0]:

# Find the most recent data of birth
print("Dataframe result : ")
df_name.filter(col('birthYear').isNotNull()).filter(col("birthYear")< 2025).orderBy('birthYear', ascending=False).limit(2).show()


The most recent date of birth is 2024

**Q.8.** What percentage of the people do not have a listed date of birth?

In [0]:
per_of_no_birth = df_name.filter(col("birthYear").isNull()).count() / df_name.count()
print(f"{per_of_no_birth*100:.2f}% of people have no birth year.")


**Q.9.** What is the length of the longest "short" after 1900?

In [0]:
# Define the appropriate data type
df_basic = (df_basic.withColumn('endYear', col('endYear').cast('integer')) \
    .withColumn('startYear', col('startYear').cast('integer')) \
    .withColumn('runtimeMinutes', col('runtimeMinutes').cast('integer')) \
    .withColumn('isAdult', col('isAdult').cast('boolean')))

df_basic.printSchema()

In [0]:
# Find the longest run time "short"
longest_short = df_basic.filter(col("runtimeMinutes").isNotNull()) \
    .filter(col("titleType")=='short') \
    .filter(col("startYear")>1900) \
    .orderBy('runtimeMinutes', ascending=False).limit(1)

print("DataFrame result:")
longest_short.show() 

# Collect runtime value
longest_short_row = longest_short.collect()[0]
runtime = longest_short_row['runtimeMinutes']
print(f"The length of the longest short film after 1900 is {runtime} minutes")


**Q.10.** What is the length of the shortest "movie" after 1900?

In [0]:
# Find the shortest movie after 1900
shortest_movie = df_basic.filter(col("runtimeMinutes").isNotNull()) \
    .filter(col("titleType")=='movie') \
    .filter(col("startYear")>1900) \
    .orderBy('runtimeMinutes', ascending=True).limit(1)

print("DataFrame result:")
shortest_movie.show()

# collect the runtime value
shortest_movie_row = shortest_movie.collect()[0]
runtime = shortest_movie_row['runtimeMinutes']
print(f"The length of the shortest movie after 1900 is {runtime} minutes")

**Q.11.** List of all of the genres represented.

In [0]:
# Collect the list of the genres represented
genres = df_basic.select('genres').distinct().collect()
print(f"Those are the genres represented in the data set {genres}")

**Q.12.** What is the highest rated comedy "movie" in the dataset? Note, if there is a tie, the tie shall be broken by the movie with the most votes .

In [0]:
df_ratings.limit(5).display()
df_ratings.printSchema()

In [0]:
# Change the data type of df_rating into the appropriate data type
df_ratings = df_ratings.withColumn('averageRating', col('averageRating').cast('float')) \
    .withColumn('numVotes', col('numVotes').cast('integer'))

df_ratings.printSchema()

In [0]:
# Find the highest rated "comedy" movie
highest_rated_comedy = df_basic.join(df_ratings, df_basic.tconst == df_ratings.tconst) \
    .filter(col("titleType")=='movie') \
    .filter(col("genres").contains('Comedy')) \
    .orderBy(['averageRating', 'numVotes'], ascending=False).limit(5)

print("DataFrame result:")
display(highest_rated_comedy)

# Collect the names of the movies
highest_rated_comedy1 = highest_rated_comedy.collect()[0]
highest_rated_comedy2 = highest_rated_comedy.collect()[1]
print(f"The highest rated comedy movies, with the same score and the same number of votes are : {highest_rated_comedy1['primaryTitle']} and {highest_rated_comedy2['primaryTitle']}")


**Q.13.** Who was the director of the movie?

In [0]:
# Find the directors of those movies
highest_rated_comedyM = highest_rated_comedy.join(df_crew, on='tconst', how='inner') \
    .filter(col("directors").isNotNull()) \
    .orderBy(["averageRating","numVotes"], ascending=False)

print("DataFrame result:")
print("Highest rated comedy")
highest_rated_comedyM.display()

director1 = highest_rated_comedyM.collect()[0]
director2 = highest_rated_comedyM.collect()[1]

# Get the director name with the collected id
director_name1 = df_name.filter(col("nconst")==director1['directors']) \
    .select('primaryName')
director_name2 = df_name.filter(col("nconst")==director2['directors']) \
    .select('primaryName')

print("Directors names")
director_name1.display()
director_name2.display()

print(f'The director of the highest rated comedy movie "O La La" is {director_name1.collect()[0]['primaryName']}')
print(f'The  other director of the second highest rated comedy movie(same score and same number of votes), "Space Melody", is {director_name2.collect()[0]['primaryName']}')

#print(f"The directors of the highest rated comedy movies are : {director1['directors']} and {director12['directors']}")

**Q.14.** List, if any, the alternate titles for the movie.

Those highest rated comedy movie doesn't have any alternate title

## Project Part II: Streaming Processing ##

1. Data Ingestion (Prerequisite)

Before running this analysis, you must execute the separate ingestion notebook: wikimedia_get_raw_stream_data.

**Purpose**: This notebook establishes a connection to the Wikimedia EventStreams API.

**Function**: It captures live JSON events for all English Wikipedia edits and persists them into a Unity Catalog Volume as raw files.

**Action**: Ensure that the ingestion notebook has been running for at least 10–15 minutes to provide a sufficient data sample for the filters.

2. Stream Processing & Filtering

Once the raw data is available, this notebook performs the following processing steps:

**Schema Definition**: We apply a strict StructType schema to the raw JSON to ensure data integrity during the readStream process.

**Entity Selection**: We define five specific entities chosen randomly (e.g., Drama, Star Wars, Johnny Depp, Game of Thrones, and Thriller) and some entities extracted from the IMDb dataset (e.g., Comedy movie) to track within the global edit stream.

**Filtering Logic**: We use Regex (rlike) and namespace filtering to isolate edits belonging to our entities while excluding "noise" (such as internal Wikipedia talk pages or category maintenance).

3. Metrics and Alerting

The structured stream is split into two distinct outputs (Sinks):

**Simple Metrics**: A windowed aggregation that counts the number of edits per entity  over a 10-minute period, stored in a Delta Table.

**Alerting System**: A secondary stream that filters for specific event types (e.g., automated Bot edits). These "alerts" are routed to a separate destination to mimic a real-world notification or auditing system.

### Get Raw stream data ###

In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    BooleanType, LongType, MapType
)
from pyspark.sql.functions import window, col, when, count, lit
from datetime import datetime

In [0]:
# project data catalog defined and created in <placeholder> notebook
catalog = 'wikimedia_db'

# db_schema containing unprocessed/streaming data
uc_schema_raw_events = 'raw_events'

# raw data is saved in a temp volume by yy_mm_day
raw_events_volume_time = datetime.now()
raw_events_volume =  f"events_tmp_{raw_events_volume_time.strftime('%y_%m_%d')}"
raw_data_path = f'/Volumes/{catalog}/{uc_schema_raw_events}/{raw_events_volume}'

# db schema for checkpointing streaming tables
db_schema_checkpoints = 'checkpoints'
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_schema_checkpoints}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.cleaned_events")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.gold_events")


In [0]:

# Meta schema (nested)
meta_schema = StructType([
    StructField("uri", StringType(), True),
    StructField("request_id", StringType(), True),
    StructField("id", StringType(), True),
    StructField("dt", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("stream", StringType(), True)
])

# Length schema (nested)
length_schema = StructType([
    StructField("old", IntegerType(), True),
    StructField("new", IntegerType(), True)
])

# Revision schema (nested)
revision_schema = StructType([
    StructField("old", LongType(), True),
    StructField("new", LongType(), True)
])

# Main recent change schema
recentchange_schema = StructType([
    StructField("$schema", StringType(), True),
    StructField("meta", meta_schema, True),
    StructField("id", LongType(), True),
    StructField("type", StringType(), True),
    StructField("namespace", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("comment", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("user", StringType(), True),
    StructField("bot", BooleanType(), True),
    StructField("minor", BooleanType(), True),
    StructField("patrolled", BooleanType(), True),
    StructField("length", length_schema, True),
    StructField("revision", revision_schema, True),
    StructField("server_url", StringType(), True),
    StructField("server_name", StringType(), True),
    StructField("wiki", StringType(), True),
    StructField("parsedcomment", StringType(), True),
])


In [0]:
# Read data from a file
streamingInputDF = (
    spark
    .readStream
    .schema(recentchange_schema)
    .option("maxFilesPerTrigger", 1)
    .json(raw_data_path)     # now dynamically resolved
)

# temp volume for checkpoint storage
volume = 'tmp_streamingInputDF'
volume_path = f'/Volumes/{catalog}/{db_schema_checkpoints}/{volume}'
volume_name = f'{catalog}.{db_schema_checkpoints}.{volume}'

# drop old temp volume and recreate
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")

# Display the streaming dataframe
streamingInputDF.display(checkpointLocation=volume_path)

#### Get your entities ###

In [0]:
from pyspark.sql.functions import col, window

# 1. Define 5 High-Activity Entities
# We use simple keywords to ensure we catch many related pages
project_entities = ["Drama", "Star_Wars", "Johnny_Depp", "Game_of_Thrones", "Thriller"]
entity_pattern = "(?i)" + "|".join(project_entities)

# 2. Filter the stream (Strictly removing non-article noise)
df_active_entities = streamingInputDF.filter(
    ~col("title").contains(":") &           # Keeps only main articles
    col("title").rlike(entity_pattern)       # Matches any of our 5 entities
)

# 3. Simple Metric: Count edits per entity every 10 minutes
streamingCountDF = (df_active_entities
    .groupBy(
        "title", 
        window(col("timestamp").cast("timestamp"), "10 minutes")
    )
    .count()
)

streamingCountbotDF = (df_active_entities
                       .groupBy("bot",
                                window(col("timestamp").cast("timestamp"), "10 minutes"))
                       .count())


# 4. Alert Metric: Route Bot Edits to a different file
df_alerts = df_active_entities.filter(col("bot") == True)

In [0]:
# Setup paths
# temp volume for checkpoint storage
volume = 'final_project_streamingDF'
volume_path = f'/Volumes/{catalog}/{db_schema_checkpoints}/{volume}'
volume_name = f'{catalog}.{db_schema_checkpoints}.{volume}'

# drop old temp volume and recreate
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")

# Define sub-paths
output_title_path = f"{volume_path}/data_title"
checkpoint_title_path = f"{volume_path}/checkpoints_title"

output_bot_path = f"{volume_path}/data_bot"
checkpoint_bot_path = f"{volume_path}/checkpoints_bot"

output_path = f"{volume_path}/data"
checkpoint_path = f"{volume_path}/checkpoints"

# Start the Metrics Stream
count_title_metrics = (streamingCountDF.writeStream
    .format("delta")
    .outputMode("complete")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_title_path)
    .start(output_title_path))

count_bot_metrics = (streamingCountbotDF.writeStream
    .format("delta")
    .outputMode("complete")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_bot_path)
    .start(output_bot_path))


# Start the Alert Stream
query_alerts = (df_alerts.writeStream
    .format("delta") # Output as JSON to mimic a different system
    .outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .start(output_path))



In [0]:
# Display data
print('Display of the title metrics (Count the number of edit per page)')
display(spark.read.format("delta").load(output_title_path))
print('Display of the bot metrics (Count the number of bot edits)')
display(spark.read.format("delta").load(output_bot_path))
print('Display of alert metric (See all bot edits)')
display(spark.read.format("delta").load(output_path))


In [0]:
# Check if the data folders are being created right now
display(dbutils.fs.ls(output_title_path))
# Check that the stream has 'consumed' the raw data
display(dbutils.fs.ls(f"{checkpoint_title_path}/offsets"))

Now we are going to follow edit on wikimedia stream using the IMDb dataset.


**Comedy series and movies**

In [0]:
##--- Get Comedy titles from your IMDb data --##
df_comedy = (df_basic
    .filter(col("genres").contains("Comedy"))
    .filter(col("startYear")>2000)
    .filter((col("titleType")=='movie'))\
    .select("primaryTitle")\
    .distinct().limit(100) # Take a good sample size
)
comedy_list = [row['primaryTitle'] for row in df_comedy.collect()]

# Pre-process the keywords to replace spaces with a "match space or underscore" pattern
cleaned_comedy_list = [rf"\b{k.replace(' ', '[ _]')}\b" for k in comedy_list]

# Create the final pattern and makes it case-insensitive
comedy_pattern = "(?i)" + "|".join(cleaned_comedy_list)

# --- SIMPLE METRIC (Edit Counts) ---
streamingCommedyCountsDF = (
  streamingInputDF.filter(col("title").rlike(comedy_pattern) & 
    ~col("title").startswith("Wikipedia:") & 
    ~col("title").startswith("Category:") &
    ~col("title").startswith("Talk:") &
    ~col("title").startswith("User:"))\
    .groupBy(
      streamingInputDF.title, # group by edit made by bot boolean
      window(
        col("timestamp").cast("timestamp"), 
        "10 minutes"
      )
    )
    .count())

# temp volume for checkpoint storage
volume = 'tmp_streamingDF'
volume_path = f'/Volumes/{catalog}/{db_schema_checkpoints}/{volume}'
volume_name = f'{catalog}.{db_schema_checkpoints}.{volume}'

# drop old temp volume and recreate
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")


# Define sub-paths
data_output_path = f"{volume_path}/data"
checkpoint_path = f"{volume_path}/checkpoints"

# Update the writeStream
streamingCommedyCountsDF.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("complete") \
    .trigger(availableNow=True) \
    .start(data_output_path)


In [0]:
# Check if the data folders are being created right now
display(dbutils.fs.ls(data_output_path))

# Check that the stream has 'consumed' the raw data
display(dbutils.fs.ls(f"{checkpoint_path}/offsets"))

# Check if your Metrics actually saved data
display(spark.read.format("delta").load(data_output_path))

Verify if wikipedia pages exist in our raw data.

In [0]:
# Load the raw data statically for testing
df_raw_test = spark.read.json(raw_data_path)

# Check how many rows exist in the raw data total
total_raw = df_raw_test.count()
print(f"Total rows in raw source: {total_raw}")

# Apply your exact filter logic
df_test_matches = df_raw_test.filter(
    col("title").contains(comedy_pattern) & 
    ~col("title").contains(":")
)

# 4. Show findings
match_count = df_test_matches.count()
print(f"Total rows matching your Comedy pattern: {match_count}")

if match_count > 0:
    print("Matches found! Sample titles:")
    df_test_matches.select("title").distinct().show(5, False)
else:
    print("⚠️ No matches found. The filter might be too strict, or no edit was being made for those pages.")

In [0]:
# Clean-up Job

print(f"Scanning raw event volumes in schema: {catalog}.{uc_schema_raw_events}")

# 1. Get list of volumes
volumes_df = spark.sql(f"SHOW VOLUMES IN {catalog}.{uc_schema_raw_events}")

volumes_df.show()   

# 2. Loop through returned volumes
for row in volumes_df.collect():
    volume_name = row['volume_name']   

    if "events_tmp" in volume_name:
        full_path = f"/Volumes/{catalog}/{uc_schema_raw_events}/{volume_name}"
        print(f"Deleting old raw volume: {full_path}")
        dbutils.fs.rm(full_path, recurse=True)

print("Clean-up job finished.")


### Project Documentation: Analysis & Observations ###

While the streaming pipeline is fully functional and correctly configured to track the chosen entities (Drama, Star Wars, Johnny Depp, Game of Thrones, and Thriller but also entities collected from the IMDb dataset), the results in the final output tables may appear limited. This is due to a "Low Activity" phenomenon inherent to real-time event processing.

**Technical Validation**

The implementation was validated by:
- Checking the _delta_log which confirms that the Delta transactions are committing successfully.
- Verifying that the checkpointLocation is tracking offsets, proving the stream is "listening" for events even when none match the filters.
- Routing "Bot" activity to separate files, demonstrating the functional logic of the Alerting System.