#Big Data Project<br>
####Made by:<br>
* Paul ROUXEL
* Victor JOUET
* Antoine LOPEZ

In [0]:
%pip install pywikibot
%pip install requests-sse
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


##Import Libraries

In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    BooleanType, LongType, MapType
)
from pyspark.sql import functions as F
from pyspark.sql.streaming import StreamingQuery

from typing import List

import os
import shutil
import requests
from concurrent.futures import ThreadPoolExecutor

import json
from pywikibot.comms.eventstreams import EventStreams
from datetime import datetime, timedelta
import time
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    LongType,
    BooleanType
)

##Import Data

Data source: https://datasets.imdbws.com/

Databricks cluster configuration

In [0]:
catalog = 'imdb_db'
uc_schema_raw_events = 'raw_events'
raw_data_volume = 'imdb_raw' 
raw_data_path = f'/Volumes/{catalog}/{uc_schema_raw_events}/{raw_data_volume}'
db_schema_checkpoints = 'checkpoints'

In [0]:
# Catalog creation
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") 

# Schemas creation
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{uc_schema_raw_events}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_schema_checkpoints}")

# Volume creation
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{uc_schema_raw_events}.{raw_data_volume}")

DataFrame[]

In [0]:
# IMDB files
BASE_URL = "https://datasets.imdbws.com/"
FILE_NAMES: List[str] = [
    "name.basics.tsv.gz",
    "title.akas.tsv.gz",
    "title.basics.tsv.gz",
    "title.crew.tsv.gz",
    "title.episode.tsv.gz",
    "title.principals.tsv.gz",
    "title.ratings.tsv.gz"
]

In [0]:
# Volume
dbutils.fs.mkdirs(raw_data_path)

DBFS_PATH = raw_data_path


Download from extern URL to the DBFS

In [0]:
# DBFS_PATH = /Volumes/imdb_db/raw_events/imdb_raw
LOCAL_TMP_PATH = "/tmp/imdb_downloads"
os.makedirs(LOCAL_TMP_PATH, exist_ok=True)

In [0]:
def download_and_copy(file_name: str) -> str | None:
    """Télécharge un fichier et le copie vers le Volume UC."""
    source_url = BASE_URL + file_name
    local_file_path = os.path.join(LOCAL_TMP_PATH, file_name)
    dbfs_file_path = DBFS_PATH + "/" + file_name 
    
    try:
        # Download file to local temporaty file system
        with requests.get(source_url, stream=True, timeout=300) as r:
            r.raise_for_status()
            with open(local_file_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192 * 1024):
                    f.write(chunk)
        
        # Copy file to DBFS
        dbutils.fs.cp(f"file:{local_file_path}", dbfs_file_path, recurse=True)
        print(f"Succès: {file_name} copié vers {dbfs_file_path}")

        os.remove(local_file_path)
        
    except Exception as e:
        print(f"Error while loading {file_name}: {e}")
        print(f"URL: {source_url}")
        return file_name
    
    return None

In [0]:
# Parallel download
failed_files = []
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(download_and_copy, FILE_NAMES)
    failed_files = [res for res in results if res is not None]

shutil.rmtree(LOCAL_TMP_PATH, ignore_errors=True)

if failed_files:
    print(" The following files could not be loaded :")
    for f in failed_files:
        print(f"{f} - load it manually to {DBFS_PATH}")
else:
    print(f"Files correctly load to {DBFS_PATH}")

Error while loading title.episode.tsv.gz: (java.lang.SecurityException) Cannot use com.databricks.backend.daemon.driver.WorkspaceLocalFileSystem - local filesystem access is forbidden

JVM stacktrace:
java.lang.SecurityException
	at com.databricks.hadoop.safety.FileSystemCallSiteAllowlist.assertCallSiteIsAllowed(FileSystemCallSiteAllowlist.java:51)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at com.databricks.backend.daemon.dbutils.FSUtils.$anonfun$withCpSafetyChecks$2(DBUtilsCore.scala:220)
	at com.databricks.backend.daemon.dbutils.FSUtils.withFsSafetyCheck(DBUtilsCore.scala:210)
	at com.databricks.backend.daemon.dbutils.FSUtils.$anonfun$withCpSafetyChecks$1(DBUtilsCore.scala:217)
	at com.databricks.backend.daemon.dbutils.FSUtils.withFsSafetyCheck(DBUtilsCore.scala:210)
	at com.databricks.backend.daemon.dbutils.FSUtils.withCpSafetyChecks(DBUtilsCore.scala:217)
	at com.databricks.backend.daemon.dbutils.FSUtils

At this point we see that we can't download the files from Internet so we need to add them by hand to make our notebook able to access it.

To correctly load the data in to our project (on Databricks):
* Open the catalog (Ctrl + Alt + C or click on the three forms at the right of the widow)
* Extend imdb_db, raw_events
* On the imdb_raw click on the three vertical dots 
* Then click on Upload to Volume
* Insert the 7 files (gzipped) download from the link


Reading Gziped TSV files in Spark Dataframes

In [0]:
def read_imdb_dataset(file_name: str):
    """
    Read Gzipped TSV file from DBFS into a Sprak DF
    """
    full_path = DBFS_PATH +"/"+ file_name
    df_name = file_name.replace(".tsv.gz", "").replace(".", "_") 
    
    try:
        df = (spark.read
              .option("header", "true")     # Utilise la première ligne comme en-tête
              .option("delimiter", "\t")    # Spécifie le délimiteur de tabulation (TSV)
              .option("inferSchema", "true")# Infère les types de données (pratique pour l'exploration)
              .csv(full_path))              # Charge le fichier (Spark gère le .gz)
        
        # Create a temporary view for easy SQL access
        df.createOrReplaceTempView(df_name) 
        return df_name, df
        
    except Exception as e:
        print(f"Error while loading {file_name}: {e}")
        return None, None

1. Load data

In [0]:
imdb_dataframes = {}

for file_name in FILE_NAMES:
    df_name, df = read_imdb_dataset(file_name)
    if df_name:
        imdb_dataframes[df_name] = df

print(f"Number of Spark DF ready: {len(imdb_dataframes)}")

Number of Spark DF ready: 7


## Datasets Exploration

In [0]:
for df in imdb_dataframes:
    print(f"Schema of {df}:")
    print(imdb_dataframes[df].schema)
    print("\n")

Schema of name_basics:
StructType([StructField('nconst', StringType(), True), StructField('primaryName', StringType(), True), StructField('birthYear', StringType(), True), StructField('deathYear', StringType(), True), StructField('primaryProfession', StringType(), True), StructField('knownForTitles', StringType(), True)])


Schema of title_akas:
StructType([StructField('titleId', StringType(), True), StructField('ordering', IntegerType(), True), StructField('title', StringType(), True), StructField('region', StringType(), True), StructField('language', StringType(), True), StructField('types', StringType(), True), StructField('attributes', StringType(), True), StructField('isOriginalTitle', IntegerType(), True)])


Schema of title_basics:
StructType([StructField('tconst', StringType(), True), StructField('titleType', StringType(), True), StructField('primaryTitle', StringType(), True), StructField('originalTitle', StringType(), True), StructField('isAdult', IntegerType(), True), Struct

2. How many total people in data set?

In [0]:
# Calculation of the number of people
num_people = imdb_dataframes['name_basics'].count()
print(f"Number of people in the dataset: {num_people}")

Number of people in the dataset: 14925446


3. What is the earliest year of birth?

Let's clean birthYear, some value are missing then replaced by '\N' so we need to cast everything in 'int' to find the exact earliest birthYear

In [0]:
# Cleaning of birthYear
df_names = imdb_dataframes["name_basics"].filter(
    F.regexp_extract(
        F.col("birthYear"),
        "^[0-9]+$",
        0
    ) != ""
).withColumn(
    "birthYear_int",
    F.col("birthYear").cast("int")
)

In [0]:
earliest_birth_year = df_names.agg(
    F.min("birthYear_int")
).collect()[0][0]

print(f"3. The earliest year of birth: {earliest_birth_year}")

3. The earliest year of birth: 4


4. How many years ago was this person born?

In [0]:
current_year = datetime.now().year
age = current_year - earliest_birth_year if earliest_birth_year else "N/A"

print(f"4. This person is born {age} years ago")

4. This person is born 2021 years ago


5. Using only the data in the data set, determine if this date of birth correct.

We will take the raw data and just check if the minimal value is the same found above

In [0]:
# Show minimal birthYear values for inspection
min_birth_year = imdb_dataframes['name_basics'].agg(
    F.min("birthYear")
).collect()[0][0]


if earliest_birth_year == min_birth_year:
    print(f"{earliest_birth_year} is also the minimum birthYear in the dataset.")
else:
    print(f"{earliest_birth_year} is NOT the minimum birthYear in the dataset (minimum is {min_birth_year}).")

4 is NOT the minimum birthYear in the dataset (minimum is 100).


In [0]:
display(imdb_dataframes['name_basics'].select("birthYear").distinct())

birthYear
1561
1546
1735
1628
2005
61
1494
1788
2025
1970


We have a different number from what we calculate first. But it's not really surprising it's because the dataset from internet is not clean so if some data is missing or is simply not clean it might not find it as minimal.
By running a quick display we can see that some date are below 100 so the result without cleaning might be the correct one.

6. Explain the reasoning for the answer in a code comment or new markdown cell.

#Give a good explanation

7. What is the most recent date of birth?

In [0]:
earliest_birth_year = df_names.agg(
    F.max("birthYear_int")
).collect()[0][0]

print(f"7. The most recent year of birth: {earliest_birth_year}")

7. The most recent year of birth: 2025


8. What percentage of the people do not have a listed date of birth?

In [0]:
total_rows = imdb_dataframes['name_basics'].count()
null_birth_rows = imdb_dataframes['name_basics'].filter(
    (F.col("birthYear").isNull()) | (F.col("birthYear") == "\\N")
).count()

percentage_no_birth_date = (null_birth_rows / total_rows) * 100 if total_rows > 0 else 0

print(f"8. Percentage of people without birthday listed is : {percentage_no_birth_date:.2f}%")

8. Percentage of people without birthday listed is : 95.57%


In [0]:
df_titles_clean = imdb_dataframes['title_basics'].filter(
    (F.col("runtimeMinutes") != "\\N") &
    (F.col("runtimeMinutes").isNotNull()) &
    (F.col("startYear") != "\\N") &
    (F.col("startYear").isNotNull())
).withColumn(
    "runtimeMinutes_int",
    F.expr("try_cast(runtimeMinutes as int)")
).withColumn(
    "startYear_int",
    F.expr("try_cast(startYear as int)")
).filter(
    (F.col("runtimeMinutes_int").isNotNull()) &
    (F.col("startYear_int") > 1900)
)

9. What is the length of the longest "short" after 1900?

In [0]:
# Find the longest 'short' runtime
longest_short_runtime = df_titles_clean.filter(
    F.col("titleType") == "short"
).agg(
    F.max("runtimeMinutes_int")
).collect()[0][0]

print(f"9. The longest short after 1900: {longest_short_runtime} minutes")

9. The longest short after 1900: 1311 minutes


10. What is the length of the shortest "movie" after 1900? 

In [0]:
shortest_movie_runtime = df_titles_clean.filter(
    F.col("titleType") == "movie"
).agg(
    F.min("runtimeMinutes_int")
).collect()[0][0]

print(f"10. The shortest movie after 1900 is : {shortest_movie_runtime} minutes")

10. The shortest movie after 1900 is : 1 minutes


11. List of all of the genres represented.

In [0]:
all_genres = imdb_dataframes['title_basics'].select(
    F.explode(F.split(F.col("genres"), ","))
).distinct().filter(F.col("col") != "\\N").collect()

genre_list = [row['col'] for row in all_genres]

print(f"11. List of all represented gender: {genre_list}")

11. List of all represented gender: ['Adventure', 'Horror', 'Thriller', 'History', 'Romance', 'Western', 'Comedy', 'Fantasy', 'Film-Noir', 'Adult', 'Short', 'Crime', 'Animation', 'Talk-Show', 'Mystery', 'Biography', 'Reality-TV', 'Sci-Fi', 'Game-Show', 'Family', 'War', 'Documentary', 'Musical', 'Drama', 'News', 'Music', 'Action', 'Sport']


In [0]:
df_rated_titles = imdb_dataframes['title_basics'].alias("b").join(
    imdb_dataframes['title_ratings'].alias("r"),
    F.col("b.tconst") == F.col("r.tconst"),
    "inner"
).filter(
    (F.col("b.titleType") == "movie") & 
    (F.col("b.genres").like("%Comedy%")) 
)

12. What is the highest rated comedy "movie" in the dataset? Note, if there is a tie, the tie shall be broken by the movie with the most votes .


In [0]:
best_comedy_movie = df_rated_titles.orderBy(
    F.col("averageRating").desc(), 
    F.col("numVotes").desc()
).select("b.tconst", "b.primaryTitle").limit(1).collect()

if best_comedy_movie:
    best_tconst = best_comedy_movie[0]['tconst']
    best_title = best_comedy_movie[0]['primaryTitle']
    print(f"10. The best comic movie is : {best_title} (tconst: {best_tconst})")
else:
    best_tconst = None
    print("12. No comic movie were found with the grade.")

10. The best comic movie is : O La La (tconst: tt8458418)


13. Who was the director of the movie?


In [0]:
if best_tconst:
    # 1. Join title_crew and name_basics
    df_crew_names = imdb_dataframes['title_crew'].alias("tc").join(
        imdb_dataframes['name_basics'].alias("nb"),
        F.split(F.col("tc.directors"), ",").contains(F.col("nb.nconst")),
        "inner"
    ).filter(F.col("tc.tconst") == best_tconst)

    # 2. Get director names
    director_nconsts_row = imdb_dataframes['title_crew'].filter(F.col("tconst") == best_tconst).select("directors").collect()

    if director_nconsts_row and director_nconsts_row[0]['directors'] != '\\N':
        director_nconsts = director_nconsts_row[0]['directors'].split(',')
        
        directors_df = imdb_dataframes['name_basics'].filter(F.col("nconst").isin(director_nconsts)).select("primaryName").collect()
        director_names = [row['primaryName'] for row in directors_df]
        
        print("-" * 50)
        print(f"13. Director of '{best_title}': {', '.join(director_names)}")
    else:
        print("13. Director not listed")
else:
    print("13. Director not found because no movie was found question 12")

--------------------------------------------------
13. Director of 'O La La': Sripad Pai


14. List, if any, the alternate titles for the movie.

In [0]:
if best_tconst:
    # Join title_akas (alternate titles) using titleId
    alternate_titles = imdb_dataframes['title_akas'].filter(
        F.col("titleId") == best_tconst
    ).select("title").distinct().collect()
    
    aka_titles = [row['title'] for row in alternate_titles]
    
    print("-" * 50)
    print(f"14. Alternate title for the movie '{best_title}':")
    if aka_titles:
        for title in aka_titles:
            print(f" - {title}")
    else:
        print(" - No title alternate found")
else:
    print("14. Alternate titles not found because no movie was found question 12")

--------------------------------------------------
14. Alternate title for the movie 'O La La':
 - O La La


##Stream Processing

Choose any five entities from the data set. These can be specific movies, actors, crews, etc, or more abstract concepts such as specific genres, etc. The main criteria is that the entities chosen must have a trackable wiki page. Set up a stream processing job that will track events for the chosen entities from the wikimedia Events Platform. These tracking jobs should provide some simple metrics. These metrics should be stored in a database or file (depending on the platform used). At least one of the metrics should be of the "alert" type (meaning some event that would require further action. For instance imagine wanting to be notified each time a specific user makes a change. Capture this "alert" and mimic an alerting system by routing these events to a different file/database.) These tables/data do not need to be shared, but the structure of the output should be clearly noted in the code and/or markdown cells. Additionally, a brief explanation/overview of this section should be provided in a separate markdown cell or in the project readme.

###Définition des Entités et Configuration

Path management

In [0]:
# Where the python script will write raw JSONs (and Spark will read from)
ingestion_volume = 'wiki_stream_buffer'
RAW_INGESTION_PATH = f'/Volumes/{catalog}/{uc_schema_raw_events}/{ingestion_volume}/'

# Where Spark will write the alerts
alert_volume = 'wiki_alerts'
ALERT_PATH = f'/Volumes/{catalog}/{uc_schema_raw_events}/{alert_volume}/critical_events/'

# Where Spark will write the Metrics Table (Delta)
METRICS_TABLE_PATH = f'{catalog}.{uc_schema_raw_events}.wiki_entity_metrics'

# Checkpoints
checkpoint_volume = 'wiki_pipeline'
CHECKPOINT_PATH = f'/Volumes/{catalog}/{db_schema_checkpoints}/{checkpoint_volume}/'

Cleaning of raw files

In [0]:
dbutils.fs.rm(CHECKPOINT_PATH, True)
dbutils.fs.rm(RAW_INGESTION_PATH, True)

True

In [0]:
# Create Objects
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{uc_schema_raw_events}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_schema_checkpoints}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{uc_schema_raw_events}.{ingestion_volume}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{uc_schema_raw_events}.{alert_volume}") 
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{db_schema_checkpoints}.{checkpoint_volume}")


DataFrame[]

In [0]:
# Ensure directories exist
dbutils.fs.mkdirs(RAW_INGESTION_PATH)
dbutils.fs.mkdirs(ALERT_PATH)
dbutils.fs.mkdirs(CHECKPOINT_PATH)

True

Entities to tracks

In [0]:
ENTITIES_TO_TRACK = [
    "Dune",         # Film
    "Tom Hanks",    # Actor
    "Comedy",       # Genre
    "Breaking Bad", # TV Series
    "France"        # Country
]
print(f"Tracking entities: {ENTITIES_TO_TRACK}")

Tracking entities: ['Dune', 'Tom Hanks', 'Comedy', 'Breaking Bad', 'France']


Schema definition

In [0]:
# Meta schema (nested)
meta_schema = StructType([
    StructField("uri", StringType(), True),
    StructField("request_id", StringType(), True),
    StructField("id", StringType(), True),
    StructField("dt", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("stream", StringType(), True)
])

# Length schema (nested)
length_schema = StructType([
    StructField("old", IntegerType(), True),
    StructField("new", IntegerType(), True)
])

# Revision schema (nested)
revision_schema = StructType([
    StructField("old", LongType(), True),
    StructField("new", LongType(), True)
])

# Main recent change schema
recentchange_schema = StructType([
    StructField("$schema", StringType(), True),
    StructField("meta", meta_schema, True),
    StructField("id", LongType(), True),
    StructField("type", StringType(), True),
    StructField("namespace", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("comment", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("user", StringType(), True),
    StructField("bot", BooleanType(), True),
    StructField("minor", BooleanType(), True),
    StructField("patrolled", BooleanType(), True),
    StructField("length", length_schema, True),
    StructField("revision", revision_schema, True),
    StructField("server_url", StringType(), True),
    StructField("server_name", StringType(), True),
    StructField("wiki", StringType(), True),
    StructField("parsedcomment", StringType(), True),
])

### Ingestion - Python Script to fetch Wiki Events and write to Volume

In [0]:
# Configuration
output_path = RAW_INGESTION_PATH 
print(f"Starting ingestion to: {output_path}")

# Duration of the ingestion
duration_minutes = 2
stop_time = datetime.now() + timedelta(minutes=duration_minutes)

Starting ingestion to: /Volumes/imdb_db/raw_events/wiki_stream_buffer/


Initilaize Stream

In [0]:
stream = EventStreams(streams=['recentchange'])

stream.register_filter(type='edit') 

print(f"Listening for events regarding: {ENTITIES_TO_TRACK}")
print(f"Will stop automatically at: {stop_time}")

Listening for events regarding: ['Dune', 'Tom Hanks', 'Comedy', 'Breaking Bad', 'France']
Will stop automatically at: 2025-12-14 21:44:24.686783


Pre-Filtering
To make the Spark job effective, we assume we only want to save events related to our specific list (or everything if you prefer).
Since specific entities like "Tom Hanks" are edited rarely, 
We will save everything so the pipeline processes data, but mark the ones in our list.

In [0]:
try:
    count = 0
    # Loop to get streaming data
    for change in stream:
        if datetime.now() > stop_time:
            print("Time limit reached. Stopping ingestion.")
            break
            
        # Get the title
        title = change.get('title', '')

        # Extract ID for filename
        revision_id = change.get('revision', {}).get('new', 'unknown')
        if revision_id == 'unknown':
            revision_id = change.get('id', datetime.now().timestamp())
            
        file_name = f"{output_path}/event_{revision_id}.json"
        
        # Write event to file
        with open(file_name, 'w') as f:
            json.dump(change, f)
            
        count += 1
        if count % 50 == 0:
            print(f"Ingested {count} events...")

except Exception as e:
    print(f"Stream stopped or error occurred: {e}")

print(f"Ingestion finished. Total files written: {count}")

Ingested 50 events...
Ingested 100 events...
Ingested 150 events...
Ingested 200 events...
Ingested 250 events...
Ingested 300 events...
Ingested 350 events...
Ingested 400 events...
Ingested 450 events...
Ingested 500 events...
Ingested 550 events...
Ingested 600 events...
Ingested 650 events...
Ingested 700 events...
Ingested 750 events...
Ingested 800 events...
Ingested 850 events...
Ingested 900 events...
Ingested 950 events...
Ingested 1000 events...
Ingested 1050 events...
Ingested 1100 events...
Ingested 1150 events...
Ingested 1200 events...
Ingested 1250 events...
Ingested 1300 events...
Ingested 1350 events...
Ingested 1400 events...
Ingested 1450 events...
Ingested 1500 events...
Ingested 1550 events...
Ingested 1600 events...
Ingested 1650 events...
Ingested 1700 events...
Ingested 1750 events...
Ingested 1800 events...
Ingested 1850 events...
Ingested 1900 events...
Ingested 1950 events...
Time limit reached. Stopping ingestion.
Ingestion finished. Total files written: 199

###Spark Structured Streaming Processing

Read Stream from the Volume (instead of "rate")

In [0]:
raw_stream_df = (
    spark.readStream
    .format("json")
    .schema(recentchange_schema)
    .option("maxFilesPerTrigger", 10)
    .load(RAW_INGESTION_PATH)
)

Filter & Prepare Data

In [0]:
processed_stream_df = raw_stream_df.filter(
    F.col("title").isin(ENTITIES_TO_TRACK)
).select(
    F.col("title").alias("entity"),
    F.col("timestamp"),
    F.col("user"),
    F.col("bot"),
    F.col("server_name"),
    F.col("comment")
).withColumn(
    "event_time", F.col("timestamp").cast("timestamp")
)

Define Alert Logic

In [0]:
# Alert if: Entity is "Dune" AND it was edited by a Bot (mimicking the Target Code logic)
alert_stream_df = processed_stream_df.filter(
    (F.col("entity") == "Dune") & 
    (F.col("bot") == True)
)

Define Metrics Logic

In [0]:
# Count events per entity every 10 minutes
metrics_stream_df = processed_stream_df.withWatermark("event_time", "10 minutes").groupBy(
    F.window(F.col("event_time"), "10 minutes"),
    F.col("entity")
).count().withColumnRenamed("count", "event_count")


### Writing the streams

Read Stream from volume

In [0]:
raw_stream_df = (
    spark.readStream
    .format("cloudFiles") 
    .option("cloudFiles.format", "json")
    .schema(recentchange_schema) 
    .option("cloudFiles.schemaLocation", f"{CHECKPOINT_PATH}autoload_schema")
    .load(RAW_INGESTION_PATH)
)

Filtering the target entity

In [0]:
# Cleaning and projecting of the timestamps for the watermark
processed_stream_df = raw_stream_df.select(
    F.col("title"),
    F.col("bot"),
    F.col("user"),
    F.to_timestamp(F.col("meta.dt"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").alias("event_time") 
).filter(
    F.col("title").isin(ENTITIES_TO_TRACK) 
).withColumn(
    "entity", F.col("title")
)

Defining the alert and metric logic

In [0]:
df_alert = processed_stream_df.filter(
    (F.col("entity") == "Dune") & (F.col("bot") == True)
).select(
    "event_time", "entity", "user", "bot", F.lit("CRITICAL: Bot edit on DUNE page").alias("alert_message")
)

metrics_stream_df = processed_stream_df.withWatermark("event_time", "10 minutes").groupBy(
    F.window(F.col("event_time"), "10 minutes"),
    F.col("entity")
).count().withColumnRenamed("count", "event_count")

### Stream writing

In [0]:
# Alert writing
query_alerts: StreamingQuery = (
    df_alert.writeStream
    .format("json")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}alerts")
    .option("path", ALERT_PATH)
    .queryName("wiki_alerts_writer")
    .trigger(availableNow=True)
    .start()
)

# Metrics writing
query_metrics: StreamingQuery = (
    metrics_stream_df.writeStream
    .format("delta")
    .outputMode("complete")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}metrics")
    .trigger(availableNow=True)
    .toTable(METRICS_TABLE_PATH) 
)

print("Streaming jobs started, the cell will wait the end of the micro-batch")


Démarrage de la lecture des fichiers JSON depuis: /Volumes/imdb_db/raw_events/wiki_stream_buffer/

Démarrage des requêtes de streaming en mode AvailableNow...


### Synchronisation

In [0]:
query_alerts.awaitTermination()
print("Alert Stream done")

query_metrics.awaitTermination()
print("Metrics Stream donc")

print("\nmicro-batch is done. The result are ready for the verification")

### Results verification

Checking metrics table

In [0]:
try:
    spark.sql(f"SELECT * FROM {METRICS_TABLE_PATH} ORDER BY window DESC").show(truncate=False)
except Exception as e:
    print(f"Error in the reading of the delta table: {e}")


Checking alert system files

In [0]:
files = dbutils.fs.ls(ALERT_PATH)
print(f"Found {len(files)} alert files.")

if len(files) > 0:
    print("Sample Alert Content:")
    alert_read_schema = StructType([
        StructField("event_time", StringType(), True),
        StructField("entity", StringType(), True),
        StructField("user", StringType(), True),
        StructField("bot", BooleanType(), True)
    ])
    
    (spark.read
        .schema(alert_read_schema)
        .json(ALERT_PATH)
        .show(5, truncate=False)
    )
else:
    print("No alerts generated yet (La condition 'Dune' + 'Bot' n'a pas été satisfaite dans ce lot de données).")

--- 1. Checking Metrics Table (Delta) ---
+------+------+-----------+
|window|entity|event_count|
+------+------+-----------+
+------+------+-----------+


--- 2. Checking Alert System (Files) ---
Found 1353 alert files.
Sample Alert Content:
+----------+------+----+---+
|event_time|entity|user|bot|
+----------+------+----+---+
+----------+------+----+---+





###Clean the Volume of Raw Ingestion

Run this code in case of error in the stream

In [0]:
# Delete the json files that have been processed.
#print(f"Cleaning of raw files in : {RAW_INGESTION_PATH}")

#try:
#    files_to_delete = dbutils.fs.ls(RAW_INGESTION_PATH)
#    count = 0
#    for file_info in files_to_delete:
#        if file_info.name.endswith(".json"):
#            dbutils.fs.rm(file_info.path)
#            count += 1
#    
#    print(f"Cleaning done {count} JSON files deleted")
#    
#except Exception as e:
#    print(f"Error : {e}")

Nettoyage des fichiers bruts dans : /Volumes/imdb_db/raw_events/wiki_stream_buffer/
✅ Nettoyage terminé. 1875 fichiers JSON supprimés du Volume d'ingestion.




### Check Results