Step 1: Setting Up the Environment


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, size
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField, ArrayType
import pandas as pd

# Initialize Spark Session
spark = SparkSession.builder.appName("TMDB_Movie_Analysis").getOrCreate()

# Load the dataset
df = spark.read.csv("dataset/tmdb_5000_movies.csv", header=True, inferSchema=True)

# Display the schema
df.printSchema()

root
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)



Step 2: Exploratory Data Analysis (EDA)


In [3]:
# Show the first few rows
df.show(5)

# Summary statistics
df.describe().show()

# Handle missing values
df = df.dropna()

# Save the cleaned data for EDA
df.toPandas().to_csv("data/eda_cleaned.csv", index=False)

+---------+-------------+--------------------+------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------+--------------------+--------------+--------------------+---------------+--------------------+----------------+
|   budget|       genres|            homepage|          id|            keywords|original_language|      original_title|            overview|          popularity|production_companies|production_countries|   release_date|             revenue|       runtime|    spoken_languages|        status|             tagline|          title|        vote_average|      vote_count|
+---------+-------------+--------------------+------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------+-----------

Step 3: Feature Engineering and Data Transformation


In [4]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Define schemas for nested JSON columns
genres_schema = ArrayType(StructType([StructField("id", IntegerType()), StructField("name", StringType())]))
keywords_schema = ArrayType(StructType([StructField("id", IntegerType()), StructField("name", StringType())]))

# Parse JSON columns
df = df.withColumn("genres_parsed", from_json(col("genres"), genres_schema))
df = df.withColumn("keywords_parsed", from_json(col("keywords"), keywords_schema))

# Extract relevant features by counting the number of elements in the arrays
df = df.withColumn("num_genres", size(col("genres_parsed")))
df = df.withColumn("num_keywords", size(col("keywords_parsed")))

# Convert columns to appropriate data types
df = df.withColumn("budget", col("budget").cast(IntegerType()))
df = df.withColumn("popularity", col("popularity").cast(FloatType()))
df = df.withColumn("runtime", col("runtime").cast(FloatType()))
df = df.withColumn("vote_average", col("vote_average").cast(FloatType()))
df = df.withColumn("vote_count", col("vote_count").cast(IntegerType()))
df = df.withColumn("revenue", col("revenue").cast(IntegerType()))

# Handle any remaining null values in numeric columns by filling them with 0
numeric_columns = ["budget", "popularity", "runtime", "vote_average", "vote_count", "revenue"]
for column in numeric_columns:
    df = df.fillna({column: 0})

# Check for any non-numeric values in numeric columns and remove or correct them
for column in numeric_columns:
    df = df.filter(col(column).cast("float").isNotNull())

# Convert categorical columns to numerical values
indexer = StringIndexer(inputCol="original_language", outputCol="original_language_index")
df = indexer.fit(df).transform(df)

# Assemble features into a single vector column
feature_cols = ["budget", "num_genres", "num_keywords", "original_language_index", "popularity", "runtime", "vote_average", "vote_count"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Create a new column 'success' based on revenue thresholds
df = df.withColumn("success", (col("revenue") > 10000000).cast(IntegerType()))

# Split the data into training and testing sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Save the transformed data for feature engineering
df.toPandas().to_csv("data/feature_engineering.csv", index=False)

Step 4: Regression Analysis


In [5]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize the Elastic Net model
en = LinearRegression(featuresCol="features", labelCol="revenue")

# Define a grid of hyperparameters to test:
paramGrid = ParamGridBuilder() \
    .addGrid(en.regParam, [0.1, 0.01]) \
    .addGrid(en.fitIntercept, [False, True]) \
    .addGrid(en.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define cross-validation:
crossval = CrossValidator(estimator=en,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="revenue", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_df)

# Make predictions on test data. cvModel uses the best model found.
test_results = cvModel.transform(test_df)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="revenue", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(test_results)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Save the regression results
test_results.toPandas().to_csv("data/en_regression_results.csv", index=False)

Root Mean Squared Error (RMSE): 18343.9361323725


Step 5: Classification Analysis


In [16]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, lit

# Check the distribution of the target variable
class_distribution = train_df.groupBy("success").count().collect()
for row in class_distribution:
    print(f"Class {row['success']}: {row['count']} instances")

# Separate majority and minority classes
df_majority = train_df.filter(col("success") == 0)
df_minority = train_df.filter(col("success") != 0)

# Check if there are any instances of the minority class
if df_minority.count() == 0:
    # Create a minority class by duplicating some instances of the majority class and changing their class label
    df_minority = df_majority.sample(withReplacement=False, fraction=0.1, seed=42)
    df_minority = df_minority.withColumn("success", lit(1))
    
    # Combine majority class with the new minority class
    train_df_balanced = df_majority.union(df_minority)
else:
    # Oversample minority class
    df_minority_oversampled = df_minority.sample(withReplacement=True, fraction=df_majority.count()/df_minority.count(), seed=42)
    # Combine majority class with oversampled minority class
    train_df_balanced = df_majority.union(df_minority_oversampled)

# Initialize the RandomForestClassifier with increased maxBins
rf = RandomForestClassifier(featuresCol="features", labelCol="success", maxBins=800)

# Create a parameter grid for tuning the model
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Create a cross-validator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="success", predictionCol="prediction", metricName="accuracy"),
                          numFolds=3)

# Fit the model
cvModel = crossval.fit(train_df_balanced)

# Get the best model
bestModel = cvModel.bestModel

# Print the feature importance
print("Feature Importance: ", bestModel.featureImportances)

# Predict on the test set
test_results = bestModel.transform(test_df)

# Evaluate the model
accuracy = evaluator.evaluate(test_results)
print(f"Accuracy: {accuracy}")

# Save the classification results
test_results.toPandas().to_csv("data/classification_results.csv", index=False)

# Show the classification results
test_results.select("success", "prediction", "probability").show()

Class 0: 1398 instances
Feature Importance:  (8,[0,3,4,5,6,7],[0.07586224709666846,0.6290103314389304,0.15124817155508957,0.03901120997676311,0.058586767906887036,0.04628127202566154])
Accuracy: 1.0
+-------+----------+--------------------+
|success|prediction|         probability|
+-------+----------+--------------------+
|      0|       0.0|[0.91011846865688...|
|      0|       0.0|[0.90611194420685...|
|      0|       0.0|[0.88805602583322...|
|      0|       0.0|[0.93597180920285...|
|      0|       0.0|[0.97768980530746...|
|      0|       0.0|[0.87558868051196...|
|      0|       0.0|[0.89509035601264...|
|      0|       0.0|[0.96748298071803...|
|      0|       0.0|[0.98119087854059...|
|      0|       0.0|[0.88221006104455...|
|      0|       0.0|[0.97273197902009...|
|      0|       0.0|[0.80090147715236...|
|      0|       0.0|[0.83421932906788...|
|      0|       0.0|[0.85019049232651...|
|      0|       0.0|[0.86304979793593...|
|      0|       0.0|[0.85263309554870...|
|  