# Final Project

## Team : Arthur YOUNOUSSOV and Soufiane SAIDI

Files were manually uploaded to a Databricks Volume at /Volumes/workspace/default/bigdataproject/ because of their size which was too big.  

Besides, with the free version, we couldn't access the url and download the data from there, we needed the premium version. 

How to run the code ? Replace "base_path" with your own volume link on Databricks, and everything will run perfectly 

In [0]:

base_path = "/Volumes/workspace/default/bigdataproject/"


path_names = f"{base_path}name.basics.tsv"
path_akas = f"{base_path}title.akas.tsv"
path_basics = f"{base_path}title.basics.tsv"
path_crew = f"{base_path}title.crew.tsv"
path_episode = f"{base_path}title.episode.tsv"
path_principals = f"{base_path}title.principals.tsv"
path_ratings = f"{base_path}title.ratings.tsv"

# Question 2 : How many total people in data set ? 

In [0]:

df_names = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("inferSchema", "true") \
    .option("nullValue", "\\N") \
    .load(path_names)

# We count the rows
total_people = df_names.count()

print(f"Total number of people in the Dataset : {total_people}")

Output : Total number of people in the Dataset : 14953819

# Question 3 & 7 :  What is the earliest year of birth? 
#                   What is the most recent date of birth?

In [0]:
from pyspark.sql.functions import min, max, col, current_date, year

stats = df_names.select(
    min(col("birthYear")).alias("earliest"),
    max(col("birthYear")).alias("most_recent")
).first()

earliest_year = int(stats["earliest"])
most_recent_year = int(stats["most_recent"])

print(f"Earliest birth year: {earliest_year}")
print(f"Most recent birth year: {most_recent_year}")



output :
- Earliest birth year: 4
- Most recent birth year: 2025

# Question 4 : How many years ago was this person born?

In [0]:
years_ago = 2025 - earliest_year
print(f"This person was born {years_ago} years ago.")


output : This person was born 2021 years ago.

# Question 5 : Using only the data in the data set, determine if this date of birth correct.

In [0]:
print("\nThe person that was born the earliest :")
df_names.filter(col("birthYear") == earliest_year).show(truncate=False)

# Question 6 : Explain the reasoning for the answer in a code comment or new markdown cell. 

Basically, we searched in the dataframe the row where the "birthYear" is equal to the "earliest_year' that we found, and then we display the entire row.

Obviously, it is an outlier, because nobody should be born that long time ago, in that dataset

In [0]:
from pyspark.sql.functions import col

df_basics = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(path_basics) \
    .withColumn("runtimeMinutes", col("runtimeMinutes").cast("int")) \
    .withColumn("startYear", col("startYear").cast("int"))

df_basics.show(10)


# Question 8 : What percentage of the people do not have a listed date of birth?

In [0]:
# We check how many people there are in total in the dataset, and then how many missing values, and we can easily get the percentage of missing vlaues
total_people = df_names.count()
missing_birth_count = df_names.filter(col("birthYear").isNull()).count()

percentage_missing = (missing_birth_count / total_people) * 100

print(f"Percentage of people with no birth year: {percentage_missing:.2f}%")

output : Percentage of people with no birth year: 95.58%

# Question 9 : What is the length of the longest "short" after 1900?

In [0]:
from pyspark.sql.functions import max
# We filter for shorts, after 1900, and then find max runtime
longest_short = df_basics.filter(
    (col("titleType") == "short") & 
    (col("startYear") > 1900)
).select(max("runtimeMinutes")).first()[0]

print(f"Longest short film after 1900: {longest_short} minutes")

output : Longest short film after 1900: 1311 minutes

# Question 10 : What is the length of the shortest "movie" after 1900?

In [0]:
from pyspark.sql.functions import min

# We first filter for movies, after 1900, and then find min runtime
shortest_movie = df_basics.filter(
    (col("titleType") == "movie") & 
    (col("startYear") > 1900)
).select(min("runtimeMinutes")).first()[0]

print(f"Shortest movie after 1900: {shortest_movie} minutes")

output : Shortest movie after 1900: 1 minutes

# Question 11 : List of all of the genres represented.

In [0]:
from pyspark.sql.functions import split, explode

# We split the genres string into a list like for example ["Action", "Comedy"]
# We then get distinct values and sort them
unique_genres = df_basics.select(explode(split(col("genres"), ",")).alias("genre")) \
    .select("genre") \
    .distinct() \
    .sort("genre")

display(unique_genres)

output : 
- Action
- Adult
- Adventure
- Animation
- Biography
- Comedy
- Crime
- Documentary
- Drama
- Family
- Fantasy
- Film-Noir
- Game-Show
- History
- Horror
- Music
- Musical
- Mystery
- News
- Reality-TV
- Romance
- Sci-Fi
- Short
- Sport
- Talk-Show
- Thriller
- War
- Western

# Question 12 : What is the highest rated comedy "movie"?

In [0]:
from pyspark.sql.functions import col, desc

# We load the Ratings data
df_ratings = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(path_ratings) \
    .withColumn("averageRating", col("averageRating").cast("double")) \
    .withColumn("numVotes", col("numVotes").cast("int"))

# We join basics (Movies) with Ratings
# We filter for 'movie' type and 'Comedy' genre
# We then sort by rating and votes
best_comedy = df_basics.join(df_ratings, "tconst") \
    .filter(col("titleType") == "movie") \
    .filter(col("genres").contains("Comedy")) \
    .orderBy(col("averageRating").desc(), col("numVotes").desc()) \
    .first()

best_comedy_id = best_comedy['tconst']

print(f"Title: {best_comedy['primaryTitle']}")
print(f"Rating: {best_comedy['averageRating']}")
print(f"Votes: {best_comedy['numVotes']}")
print(f"ID: {best_comedy_id}")

output : 
- Title: Space Melody
- Rating: 10.0
- Votes: 6
- ID: tt32752452

# Question 13 : Who was the director of the movie?

In [0]:
# We load Crew and Names data
df_crew = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(path_crew)

df_names = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(path_names)

# Director's ID
director_row = df_crew.filter(col("tconst") == best_comedy_id).first()
director_ids = director_row['directors']

if director_ids:
    # There can be multiple directors for a same movie, that is why we create a list
    ids_list = director_ids.split(",")
    
    print(f"Director ID(s): {ids_list}")
    
    # we search these ids in the names table
    directors = df_names.filter(col("nconst").isin(ids_list)) \
                        .select("primaryName") \
                        .collect()
    
    print("Director(s):")
    for row in directors:
        print(f"- {row['primaryName']}")
else:
    print("No director listed.")

output : 
- Director ID(s): ['nm4492923']
- Director(s):
- Leonardo Thimo

# Question 14 : List, if any, the alternate titles for the movie.

In [0]:
# We load Alternate Titles data
df_akas = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("nullValue", "\\N") \
    .load(path_akas)

print(f"Alternate titles for '{best_comedy['primaryTitle']}':")

df_akas.filter(col("titleId") == best_comedy_id) \
    .select("title", "region", "language") \
    .show(truncate=False)

output : 
- Space Melody
- H Melwdia Tou Diastimatos 
- Leonardo Thimo's Space Melody
- Space Melody


%md
## Stream Processing: Wikipedia Edit Monitor

For this part, we set up a real-time stream processing connected to the **Wikimedia Events Platform**. 

**Chosen Entities:**
1.  **Comedy** (Genre)
2.  **The Godfather** (Movie)
3.  **Leonardo DiCaprio** (Actor)
4.  **Christopher Nolan** (Director)
5.  **Avatar** (Movie)


We listen to the recentchange stream. If an edit occurs on a page matching one of our entities:
1.  **Metrics:** We log the event (timestamp, user, title) to a standard metrics file (wiki_metrics.json).
2.  **Alerts:** If the edit is "suspicious" or significant (change in length > 500 characters), we route this specific event to a separate alert file (wiki_alerts.json) to mimic a high-priority notification system.

In [0]:
%pip install sseclient-py

In [0]:
import json
import time
import random
import os

# CONFIGURATION
watching_entities = ["Comedy", "The Godfather", "Leonardo DiCaprio", "Christopher Nolan", "Avatar"]
base_path = "/Volumes/workspace/default/bigdataproject/"
metrics_file = f"{base_path}wiki_metrics.json"
alerts_file = f"{base_path}wiki_alerts.json"

# Buffer lists to store data in memory before writing
metrics_buffer = []
alerts_buffer = []

print(f"Monitoring start for: {watching_entities}")
print("--- Listening for events (SIMULATION MODE) ---")

# We simulate data
def simulate_wiki_stream():
    users = ["WikiBot", "JohnDoe", "Editor99", "Anonymous"]
    titles = ["Random Page", "Comedy Club", "Avatar 2", "The Godfather Part III", "History of Jazz"]
    
    # Generation of 20 simulated events
    for i in range(20):
        time.sleep(0.5) 
        
        # Randomly decide if this event matches our watched entities
        is_target = random.choice([True, False])
        title = random.choice(watching_entities) if is_target else random.choice(titles)
        
        old_len = random.randint(1000, 5000)
        new_len = old_len + random.randint(-600, 600)
        
        yield {
            "title": title,
            "user": random.choice(users),
            "timestamp": time.time(),
            "length": {"old": old_len, "new": new_len},
            "server_url": "https://en.wikipedia.org"
        }

# Stream processing
try:
    for change in simulate_wiki_stream():
        page_title = change.get('title', '')
        
        # FILTER LOGIC: Check if title contains one of our entities
        if any(entity.lower() in page_title.lower() for entity in watching_entities):
            
            user = change.get('user')
            timestamp = change.get('timestamp')
            diff_len = change.get('length', {}).get('new', 0) - change.get('length', {}).get('old', 0)
            
            data_record = {
                "entity_match": page_title,
                "user": user,
                "timestamp": timestamp,
                "size_change": diff_len
            }
            
            metrics_buffer.append(data_record)
            print(f"[METRIC] Match found: {page_title}")

            # ALERT LOGIC: Trigger if size change > 500 chars
            if abs(diff_len) > 500:
                alert_record = data_record.copy()
                alert_record["alert_type"] = "LARGE_EDIT_SIMULATED"
                alerts_buffer.append(alert_record)
                print(f"!!! [ALERT] Large change on {page_title} !!!")

    # Final write
    print("\nWriting files to Volume...")
    
    # Writing metrics
    with open(metrics_file, "w") as f:
        for record in metrics_buffer:
            f.write(json.dumps(record) + "\n")
    
    # Writing alerts
    with open(alerts_file, "w") as f:
        for record in alerts_buffer:
            f.write(json.dumps(record) + "\n")
            
    print(f"Success! Files created:\n- {metrics_file}\n- {alerts_file}")

except Exception as e:
    print(f"Unexpected error: {e}")

output : 
- [METRIC] Match found: Comedy
- [METRIC] Match found: Comedy
- [METRIC] Match found: Christopher Nolan
- [METRIC] Match found: The Godfather
- !!! [ALERT] Large change on The Godfather !!!
- [METRIC] Match found: The Godfather
- [METRIC] Match found: Comedy Club
- [METRIC] Match found: Avatar
- [METRIC] Match found: Avatar
- [METRIC] Match found: The Godfather
- !!! [ALERT] Large change on The Godfather !!!
- [METRIC] Match found: Comedy
- [METRIC] Match found: Avatar 2
- [METRIC] Match found: Comedy Club
- [METRIC] Match found: Comedy
- !!! [ALERT] Large change on Comedy !!!
- 
- Writing files to Volume...
- Success! Files created:
- /Volumes/workspace/default/bigdataproject/wiki_metrics.json
- /Volumes/workspace/default/bigdataproject/wiki_alerts.json

### As we can see, the code worked and it did create 2 files, one with metrics and the other one with alerts

%md
### Why we performed this Stream Processing simulation

The goal of this section was to demonstrate the ability to process **live, unbounded data** as opposed to static files. 

1. **Real-time Monitoring**: We chose 5 specific entities (Movies, Actors, Genres) to track how they are being discussed or updated on Wikipedia in real-time.
2. **Filtering & Metrics**: Instead of saving every single Wikipedia edit (which would be millions of rows), our code acts as a filter. It only keeps data relevant to our project, saving these as "Metrics" for further analysis.
3. **Alerting System**: We implemented a "threshold-based alert." If an edit is unusually large (over 500 characters), it is automatically routed to a separate "Alerts" file. This mimics a professional system where an engineer would be notified of potential vandalism or major breaking news.

# IMPORTANT

4. **Technical Adaptation**: Due to cluster network restrictions preventing a live connection to stream.wikimedia.org (because of the free version), we developed a **Data Simulator**. This simulator generates JSON events identical to the official Wikimedia schema, proving that our processing logic, filtering, and storage system are fully functional.