In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import regexp_replace
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import uuid
import os

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("IMDBAnalysis").getOrCreate()

### Load dataset

In [3]:

df = spark.read.csv("imdb_top_1000.csv", header=True, inferSchema=True)

### Objective: Analyze movie trends (ratings, genres, runtime) and cluster movies based on ratings and runtime

### 1. Metadata

In [4]:
num_entries = df.count()
features = df.columns
rating_stats = df.select("IMDB_Rating").describe().toPandas()
runtime_stats = df.select("Runtime").describe().toPandas()
gross_stats = df.select("Gross").describe().toPandas()

### 2. Data preprocessing

In [7]:
# Clean Runtime (remove ' min' and convert to integer)
df = df.withColumn("Runtime", regexp_replace(col("Runtime"), " min", "").cast("integer"))
# Clean Gross (remove commas and convert to float)
df = df.withColumn("Gross", regexp_replace(col("Gross"), ",", "").cast("float"))

### 3. Nontrivial analysis with Spark MLlib (KMeans clustering)

In [8]:
df_clean = df.dropna(subset=["IMDB_Rating", "Runtime"])
assembler = VectorAssembler(inputCols=["IMDB_Rating", "Runtime"], outputCol="features")
df_vector = assembler.transform(df_clean)

In [9]:
# Train KMeans model
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df_vector)
predictions = model.transform(df_vector)

# Evaluate clustering
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)

### 4. Additional analysis: Genre trends

In [10]:
genre_df = df.groupBy("Genre").agg(
    avg("IMDB_Rating").alias("avg_rating"),
    count("*").alias("movie_count")
).orderBy("avg_rating", ascending=False)

In [11]:
# Convert to Pandas for visualization
predictions_pd = predictions.select("Series_Title", "IMDB_Rating", "Runtime", "prediction").toPandas()
genre_pd = genre_df.toPandas()

### 5. Visualizations

In [12]:
plt.figure(figsize=(10, 6))
sns.scatterplot(data=predictions_pd, x="IMDB_Rating", y="Runtime", hue="prediction", palette="deep")
plt.title("Movie Clusters by IMDB Rating and Runtime")
plt.savefig("rating_runtime_clusters.png")
plt.close()


In [13]:
# Plot 2: Bar plot of average rating by genre
plt.figure(figsize=(12, 6))
sns.barplot(data=genre_pd.head(10), x="avg_rating", y="Genre")
plt.title("Top 10 Genres by Average IMDB Rating")
plt.savefig("genre_ratings.png")
plt.close()

In [14]:
# 6. Save metadata and results
metadata = f"""
Dataset Metadata:
- Number of entries: {num_entries}
- Features: {', '.join(features)}
- IMDB Rating stats:
{rating_stats.to_string()}
- Runtime stats:
{runtime_stats.to_string()}
- Gross stats:
{gross_stats.to_string()}
"""
with open("metadata.txt", "w") as f:
    f.write(metadata)

# Save clustering results
predictions_pd.to_csv("clustering_results.csv", index=False)

# Stop Spark session
spark.stop()