#### PySpark Project Documentation: Movie Revenue Prediction Pipeline

##### This step-by-step guide walks beginners through building a machine learning pipeline in PySpark using a movie dataset. 

##### The objective is to predict the revenue category of a movie based on features like ratings, views, and awards.

 Initialize Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyFirstSparkApp") \
    .master("local[*]") \
    .getOrCreate()

In [2]:
spark.stop()

Load Dataset

Read a CSV dataset into a Spark DataFrame:

In [14]:
df = spark.read.option('header','true').csv('train.csv')
df.show(9)

+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+------+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+----------------+
|               title|    country|              genres|            language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments| likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|revenue_category|
+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+------+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+----------------+
|Pooh's Heffalump ...|        USA|Animation, Come

In [15]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- country: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- language: string (nullable = true)
 |-- writer_count: string (nullable = true)
 |-- title_adaption: string (nullable = true)
 |-- censor_rating: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- dvd_release_date: string (nullable = true)
 |-- users_votes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- overall_views: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- ratings_imdb: string (nullable = true)
 |-- ratings_tomatoes: string (nullable = true)
 |-- ratings_metacritic: string (nullable = true)
 |-- special_award: string (nullable = true)
 |-- awards_win: string (nullable = true)
 |-- awards_nomination: string (nullable = true)
 |-- revenue_category: string (nullable = true)



Data Cleaning

a. Clean Rating Columns

Extract numeric values from mixed rating formats:

In [16]:
# i.They have mixed types like 7.1/10, 64/100, 80%. Let’s clean them:
from pyspark.sql.functions import regexp_extract, col

df = df.withColumn("ratings_imdb", regexp_extract("ratings_imdb", r"([\d\.]+)", 1).cast("float"))
df = df.withColumn("ratings_metacritic", regexp_extract("ratings_metacritic", r"([\d\.]+)", 1).cast("float"))
df = df.withColumn("ratings_tomatoes", regexp_extract("ratings_tomatoes", r"([\d\.]+)", 1).cast("float"))

b. Clean Runtime and Count Columns

In [17]:
# ii. Convert runtime from "min" to int
df = df.withColumn("runtime", regexp_extract("runtime", r"(\d+)", 1).cast("int"))

# iii. Handle votes and views with commas
from pyspark.sql.functions import regexp_replace
df = df.withColumn("users_votes", regexp_replace("users_votes", ",", "").cast("int"))
df = df.withColumn("overall_views", regexp_replace("overall_views", ",", "").cast("int"))


c. Parse and Extract Date

In [18]:
from pyspark.sql.functions import to_date, year

# Step 1: Convert the string to a proper date format
df = df.withColumn("dvd_release_date_parsed", to_date("dvd_release_date", "dd-MMM-yy"))

# Step 2: Extract the year from the parsed date
df = df.withColumn("release_year", year("dvd_release_date_parsed"))

# Show result
df.show(6)

+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+----------------+-----------------------+------------+
|               title|    country|              genres|            language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments|likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|revenue_category|dvd_release_date_parsed|release_year|
+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+--------------

d. Show summary statistics of dataset

In [19]:
df.describe().show()

+-------+--------------------+--------------------+-----------------+--------------------+-----------------+--------------+-------------+------------+------------------+----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+----------------+-----------------+
|summary|               title|             country|           genres|            language|     writer_count|title_adaption|censor_rating|release_date|           runtime|dvd_release_date|       users_votes|          comments|             likes|    overall_views|          dislikes|      ratings_imdb|  ratings_tomatoes|ratings_metacritic|      special_award|       awards_win| awards_nomination|revenue_category|     release_year|
+-------+--------------------+--------------------+-----------------+--------------------+-----------------+--------------+-------------+---

Type Casting and Feature Engineering

In [20]:
from pyspark.sql.types import IntegerType

df = df.withColumn('writer_count', df['writer_count'].cast(IntegerType()))
df = df.withColumn('likes', df['likes'].cast(IntegerType()))
df = df.withColumn('dislikes', df['dislikes'].cast(IntegerType()))
df = df.withColumn('special_award', df['special_award'].cast(IntegerType()))
df = df.withColumn('awards_win', df['awards_win'].cast(IntegerType()))
df = df.withColumn('awards_nomination', df['awards_nomination'].cast(IntegerType()))

df = df.withColumn(
    'ratings',
    (df['ratings_imdb'] + df['ratings_metacritic']/10 + df['ratings_tomatoes']/10) / 3
)

 Handle Missing Values

In [21]:
# Fill numeric nulls
from pyspark.sql.types import NumericType

numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]
for col_name in numeric_cols:
    df = df.fillna({col_name: 0})

# Fill categorical nulls with "Unknown"
df = df.fillna("Unknown")

Feature Engineering and ML Pipeline

a. Label Encoding

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Step 1: Index categorical columns
indexer = StringIndexer(inputCol='revenue_category', outputCol='label')
df = indexer.fit(df).transform(df)


In [23]:
df.show(4)

+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+----------------+-----------------------+------------+------------------+-----+
|               title|    country|              genres|            language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments|likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|revenue_category|dvd_release_date_parsed|release_year|           ratings|label|
+--------------------+-----------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+--------

After applying String Indexer to the target column, 
Low has been replaced with 1 
while High = 0

In [24]:
"""# Step 2: One-Hot Encode the indexed features (not the label)
encoders = [
    OneHotEncoder(inputCol='title_idx', outputCol='title_org'),
    OneHotEncoder(inputCol='censor_idx', outputCol='censor_org')
]

df_encoded = encoders"""

"# Step 2: One-Hot Encode the indexed features (not the label)\nencoders = [\n    OneHotEncoder(inputCol='title_idx', outputCol='title_org'),\n    OneHotEncoder(inputCol='censor_idx', outputCol='censor_org')\n]\n\ndf_encoded = encoders"

b. Vector Assembler

In [25]:
# Step 3: Feature columns
feature_cols = [
    'writer_count', 'runtime', 'users_votes',
    'likes', 'overall_views', 'dislikes', 'special_award',
    'awards_win', 'awards_nomination', 'ratings', 'release_year'
]

# Step 4: Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

 Model Training and Evaluation

a. Train/Test Split and Train Model

In [26]:
# Step 5: Define model
model = RandomForestClassifier(labelCol="label", featuresCol="features")
"""
# Step 6: Create pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, model])"""

# Step 7: Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [27]:
model = model.fit(train_data)

prediction = model.transform(test_data)

b. Evaluate Model

In [28]:
# Find AUC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName:"areaUnderROC"})
print(auc)

0.9254191817572105


In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = evaluator.evaluate(prediction, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(prediction, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(prediction, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(prediction, {evaluator.metricName: "f1"})

print(f"Accuracy: {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall: {recall:.3f}")
print(f"F1 Score: {f1:.3f}")


Accuracy: 0.846
Precision: 0.847
Recall: 0.846
F1 Score: 0.846


Load and Predict on New Data

Import new data and carry out same data cleaning and feature engineering done on train data

In [30]:
data = spark.read.option('header','true').csv('test.csv')
data.show(9)

+--------------------+------------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-------+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+
|               title|     country|              genres|            language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments|  likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|
+--------------------+------------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-------+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+
|            Delhi-6 |       India|       Comedy, Drama|      Hindi, English|         3.0|    

In [31]:
from pyspark.sql.functions import regexp_extract, regexp_replace, to_date, year

# i. They have mixed types like 7.1/10, 64/100, 80%. Let’s clean them
data = data.withColumn("ratings_imdb", regexp_extract("ratings_imdb", r"([\d\.]+)", 1).cast("float"))
data = data.withColumn("ratings_metacritic", regexp_extract("ratings_metacritic", r"([\d\.]+)", 1).cast("float"))
data = data.withColumn("ratings_tomatoes", regexp_extract("ratings_tomatoes", r"([\d\.]+)", 1).cast("float"))

# ii. Convert runtime from "min" to int
data = data.withColumn("runtime", regexp_extract("runtime", r"(\d+)", 1).cast("int"))

# iii. Handle votes and views with commas
data = data.withColumn("users_votes", regexp_replace("users_votes", ",", "").cast("int"))
data = data.withColumn("overall_views", regexp_replace("overall_views", ",", "").cast("int"))

# Step 1: Convert the string to a proper date format
data = data.withColumn("dvd_release_date_parsed", to_date("dvd_release_date", "dd-MMM-yy"))

# Step 2: Extract the year from the parsed date
data = data.withColumn("release_year", year("dvd_release_date_parsed"))

# Show result
data.show(6)


+--------------------+------------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+-----------------------+------------+
|               title|     country|              genres|            language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments|likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|dvd_release_date_parsed|release_year|
+--------------------+------------+--------------------+--------------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+-----------------------+---

In [32]:
from pyspark.sql.types import IntegerType, NumericType

# Cast columns to IntegerType
data = data.withColumn('writer_count', data['writer_count'].cast(IntegerType()))
data = data.withColumn('likes', data['likes'].cast(IntegerType()))
data = data.withColumn('dislikes', data['dislikes'].cast(IntegerType()))
data = data.withColumn('special_award', data['special_award'].cast(IntegerType()))
data = data.withColumn('awards_win', data['awards_win'].cast(IntegerType()))
data = data.withColumn('awards_nomination', data['awards_nomination'].cast(IntegerType()))

# Create the 'ratings' column
data = data.withColumn(
    'ratings',
    (data['ratings_imdb'] + data['ratings_metacritic']/10 + data['ratings_tomatoes']/10) / 3
)

# Fill numeric nulls
numeric_cols = [f.name for f in data.schema.fields if isinstance(f.dataType, NumericType)]
for col_name in numeric_cols:
    data = data.fillna({col_name: 0})

# Fill categorical nulls with "Unknown"
data = data.fillna("Unknown")

In [33]:
feature_cols = [
    'writer_count', 'runtime', 'users_votes',
    'likes', 'overall_views', 'dislikes', 'special_award',
    'awards_win', 'awards_nomination', 'ratings', 'release_year'
]

# Step 4: Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

In [34]:
predict_data = model.transform(data)

In [35]:
predict_data.show(2)

+-------------------+-------+-------------+--------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+--------+------------+----------------+------------------+-------------+----------+-----------------+-----------------------+------------+-----------------+--------------------+--------------------+--------------------+----------+
|              title|country|       genres|      language|writer_count|title_adaption|censor_rating|release_date|runtime|dvd_release_date|users_votes|comments|likes|overall_views|dislikes|ratings_imdb|ratings_tomatoes|ratings_metacritic|special_award|awards_win|awards_nomination|dvd_release_date_parsed|release_year|          ratings|            features|       rawPrediction|         probability|prediction|
+-------------------+-------+-------------+--------------+------------+--------------+-------------+------------+-------+----------------+-----------+--------+-----+-------------+-

In [36]:
predict_data.groupby('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  289|
|       1.0|  311|
+----------+-----+



In [38]:
from pyspark.sql.functions import when

predict_data = predict_data.withColumn(
    'revenue_category',
    when(predict_data['prediction'] == 0, 'High').otherwise('Low')
)

Export to a csv format

In [None]:
test_data_to_pandas = predict_data.select('title', 'revenue_category').toPandas()
test_data_to_pandas.to_csv('final_submission.csv', index=False)


: 

In [1]:
spark.stop()

NameError: name 'spark' is not defined