In [None]:
!pip install pyspark
!pip install duckdb

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5bd64a1cbacc4bf43c089f98483d0d65576351dba5f9b58f8c53c1e07926cea6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
import numpy as np
import pandas as pd

In [None]:
from pyspark.sql.functions import explode, col, create_map, collect_list, concat_ws, countDistinct, avg, row_number, count, sum as _sum, max
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local[*]").appName("PySpark_DuckDB_Project").getOrCreate()

import duckdb
conn = duckdb.connect(database=':memory:', read_only=False)

## **READING DATA**

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Movie Classification Project") \
    .getOrCreate()

# Path to the folder containing the data on Google Drive
path_to_data_folder = "/content/drive/My Drive/Big Data/Data/"

df_train_combined = spark.read.option("header", "true").csv(f"{path_to_data_folder}/train-1.csv")

# Loop through the rest of the training CSV files and union them with the initial DataFrame
for i in range(2, 9):
    df_temp = spark.read.option("header", "true").csv(f"{path_to_data_folder}/train-{i}.csv")
    df_train_combined = df_train_combined.union(df_temp)

# Read all CSV files into a single DataFrame
df_validation = spark.read.option("header", "true").csv(f"{path_to_data_folder}/validation_hidden.csv")
df_test = spark.read.option("header", "true").csv(f"{path_to_data_folder}/test_hidden.csv")

# Read JSON files
json_files = [f"{path_to_data_folder}/directing.json", f"{path_to_data_folder}/writing.json"]
df_json_writing = spark.read.json(json_files[1])
df_json_directing = spark.read.option("multiLine", True).json(json_files[0])

## **EDA**


In [None]:
df_json_writing.show()
df_train_combined.show()
df_validation.show()
df_test.show()

In [None]:
number_of_unique_writers = df_json_writing.select("writer").distinct().count()
number_of_unique_writers

## **COMBINING DATASETS**

In [None]:
df_json_writing_renamed = df_json_writing.withColumnRenamed("movie", "tconst")
df_writers_per_movie = df_json_writing_renamed.groupBy("tconst").agg(countDistinct("writer").alias("num_writers"))
df_movies_per_writer = df_json_writing_renamed.groupBy("writer").agg(countDistinct("tconst").alias("num_movies"))

df_movie_writer_info = df_json_writing_renamed.join(df_movies_per_writer, "writer")
df_avg_movies_per_writer = df_movie_writer_info.groupBy("tconst").agg(avg("num_movies").alias("avg_movies_per_writer"))



In [None]:
# experience
df_train_combined_no_label = df_train_combined.drop("label")

combined_df = df_train_combined_no_label.unionByName(df_validation).unionByName(df_test)
df_json_writing_renamed = df_json_writing.withColumnRenamed("movie", "tconst")

expercience_df = df_json_writing_renamed.join(combined_df, "tconst", "left")

movies_per_writer_per_year = expercience_df.groupBy("writer", "startYear")\
                                           .agg(count("tconst").alias("movies_this_year"))

# Step 2: Calculate the cumulative sum of movies for each writer over the years
windowSpec = Window.partitionBy("writer").orderBy("startYear")\
                   .rangeBetween(Window.unboundedPreceding, 0)

cumulative_movies = movies_per_writer_per_year.withColumn("experience", _sum("movies_this_year").over(windowSpec))

# Step 3: Join this back to the original DataFrame to add the "experience" column
result_df = expercience_df.join(cumulative_movies.select("writer", "startYear", "experience"),
                                on=["writer", "startYear"], how="left")

df_avg_movies_per_writer = result_df.drop('_c0', 'primaryTitle', 'originalTitle', 'startYear', 'endYear', 'runtimeMinutes', 'numVotes')

max_experience_per_movie = df_avg_movies_per_writer.groupBy("tconst")\
    .agg(max("experience").alias("max_experience"))

avg_experience_per_movie = df_avg_movies_per_writer.groupBy("tconst")\
    .agg(avg("experience").alias("avg_experience"))

# expercience_df = df_json_writing_renamed.join(df_train_combined, on='tconst', how='left')
# expercience_df.show()

In [None]:
df_train_combined = df_train_combined.join(max_experience_per_movie, "tconst", "left")
df_train_combined = df_train_combined.join(avg_experience_per_movie, "tconst", "left")
df_train = df_train_combined.join(df_writers_per_movie, on='tconst', how='left')
df_validation = df_validation.join(df_writers_per_movie, 'tconst', 'left')
df_test = df_test.join(df_writers_per_movie, 'tconst', 'left')

df_train.show()

## **MODIFYING DATA**

In [None]:
df_train = df_train.withColumn("numVotes", col("numVotes").cast("int"))
df_train = df_train.withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

In [None]:
# Fill in NULLS for numVotes
filter_numvote = df_train.filter(col("numVotes").isNotNull())
numvote_rows = filter_numvote.select("numVotes").collect()
numvote_list = [row['numVotes'] for row in numvote_rows]
numvote_median = np.median(numvote_list)

df_train = df_train.fillna({"numVotes": numvote_median})

#df_train.filter(col("numVotes").isNotNull()).count()
# Check if equal to 7959

In [None]:
# Fill in NULLS for runTime
filter_runtime = df_train.filter(col("runtimeMinutes").isNotNull())
runtime_rows = filter_runtime.select("runtimeMinutes").collect()
runtime_list = [row['runtimeMinutes'] for row in runtime_rows]
runtime_median = np.median(runtime_list)

df_train = df_train.fillna({"runtimeMinutes": runtime_median})

#df_train.filter(col("runtimeMinutes").isNotNull()).count()
# Check if equal to 7959

#ACME Format and Combining Titles to Find Jaccard Similarity


In [None]:
#Transformation of original title by replacing all nulls in original title

from pyspark.sql.functions import trim, lower, regexp_replace, udf, coalesce
from pyspark.sql.types import ArrayType, StringType
import re

df_train = df_train \
    .withColumn("primaryTitle", trim(col("primaryTitle"))) \
    .withColumn("primaryTitle", lower(col("primaryTitle"))) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[^\w\s]", "")) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[\x00-\x1F]+", ""))

df_train = df_train \
    .withColumn("originalTitle", trim(col("originalTitle"))) \
    .withColumn("originalTitle", lower(col("originalTitle"))) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[^\w\s]", "")) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[\x00-\x1F]+", ""))

# Transform Original title to remove nulls and replace with Primary title
df_train = df_train \
    .withColumn("originalTitle", coalesce("originalTitle", "primaryTitle"))

df_train.show()

In [None]:
'''from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import re

# Define a UDF to calculate the Jaccard similarity
def jaccard_similarity(str1, str2):
    # Tokenize and create sets of words for each title, handling None values
    words_str1 = set(re.sub(r"[^\w\s]", '', (str1 or "").lower()).split())
    words_str2 = set(re.sub(r"[^\w\s]", '', (str2 or "").lower()).split())
    # Calculate intersection and union
    intersection = words_str1.intersection(words_str2)
    union = words_str1.union(words_str2)

    jaccard_index = float(len(intersection)) / len(union)
    return jaccard_index

# Register UDF
jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

df_train = df_train.withColumn("titleJaccardSimilarity", jaccard_similarity_udf(df_train["primaryTitle"], df_train["originalTitle"]))'''

#More Feature Engineering

In [None]:
from pyspark.sql.functions import col, log, length, lit, trim, lower, coalesce, when
from pyspark.sql.types import IntegerType

# Assuming the Spark session is already initialized as 'spark'
current_year = 2024

# Transform movie age
df_train = df_train \
    .withColumn("startYear", when(col("startYear") == "\\N", None).otherwise(col("startYear")).cast(IntegerType())) \
    .withColumn("endYear", when(col("endYear") == "\\N", None).otherwise(col("endYear")).cast(IntegerType())) \
    .withColumn("movieAge", lit(current_year) - coalesce(col("startYear"), col("endYear"))) \

# Transform numvotes
df_train = df_train \
    .withColumn("logNumVotes", log(col("numVotes") + 1)) \

# Transform title length
df_train = df_train \
    .withColumn("titleLength", length(col("primaryTitle"))) \

# Transform run time votes interaction
df_train = df_train \
    .withColumn("runtimeVotesInteraction", col("runtimeMinutes") * col("logNumVotes")) \

# Transform Age and Runtime Interaction
df_train = df_train \
    .withColumn("ageRuntimeInteraction", col("movieAge") * col("runtimeMinutes"))


In [None]:
df_train.select("tconst", "primaryTitle", "originalTitle", "runtimeMinutes", "numVotes", "num_writers", "movieAge", "logNumVotes", "titleLength", "runtimeVotesInteraction", "avg_experience", "max_experience").show(truncate=False)

In [None]:
from pyspark.sql.functions import when, count, avg, expr, stddev

# Check for missing or zero values
missing_or_zero = df_train.select([count(when(col(c).isNull() | (col(c) == 0), c)).alias(c)
                                   for c in ["runtimeMinutes", "num_writers", "numVotes", "movieAge", "avg_experience", "max_experience"]])

missing_or_zero.show()

# Gradient boosting model

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when, lower, trim, col

# Ensure features have correct data types and no missing values
# Changing label to 1 and 0
df_train = df_train.withColumn("label", when(lower(trim(col("label"))) == "true", 1).otherwise(0))

# Adjust input features
inputs = ["runtimeMinutes", "num_writers", "numVotes", "movieAge"]

# Convert inputs into a feature vector
assembler = VectorAssembler(inputCols=inputs, outputCol="features")

# Train/Test Split
(trainingData, testData) = df_train.randomSplit([0.8, 0.2], seed=42)

# Initialize Gradient-Boosted Trees classifier
gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)

pipeline = Pipeline(stages=[assembler, gbt])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# Evaluate on test split
test_auc = evaluator.evaluate(predictions)
print(f"Test Area Under ROC: {test_auc}")

# Optionally, evaluate on training data as well to check for overfitting
training_predictions = model.transform(trainingData)
train_auc = evaluator.evaluate(training_predictions)
print(f"Training Area Under ROC: {train_auc}")


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Precision: {precision}")

# Recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Recall: {recall}")

# F1 Score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)
print(f"F1 Score: {f1}")


# Test and Validation



In [None]:
df_test = df_test.withColumn("numVotes", col("numVotes").cast("int"))
df_test = df_test.withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

# Fill in NULLS for numVotes
filter_numvote = df_test.filter(col("numVotes").isNotNull())
numvote_rows = filter_numvote.select("numVotes").collect()
numvote_list = [row['numVotes'] for row in numvote_rows]
numvote_median = np.median(numvote_list)

df_test = df_test.fillna({"numVotes": numvote_median})


# Fill in NULLS for runTime
filter_runtime = df_test.filter(col("runtimeMinutes").isNotNull())
runtime_rows = filter_runtime.select("runtimeMinutes").collect()
runtime_list = [row['runtimeMinutes'] for row in runtime_rows]
runtime_median = np.median(runtime_list)

df_test = df_test.fillna({"runtimeMinutes": runtime_median})

df_test = df_test \
    .withColumn("primaryTitle", trim(col("primaryTitle"))) \
    .withColumn("primaryTitle", lower(col("primaryTitle"))) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[^\w\s]", "")) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[\x00-\x1F]+", ""))

df_test = df_test \
    .withColumn("originalTitle", trim(col("originalTitle"))) \
    .withColumn("originalTitle", lower(col("originalTitle"))) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[^\w\s]", "")) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[\x00-\x1F]+", ""))

# Transform Original title to remove nulls and replace with Primary title
df_test = df_test \
    .withColumn("originalTitle", coalesce("originalTitle", "primaryTitle"))

# Transform movie age
df_test = df_test \
    .withColumn("startYear", when(col("startYear") == "\\N", None).otherwise(col("startYear")).cast(IntegerType())) \
    .withColumn("endYear", when(col("endYear") == "\\N", None).otherwise(col("endYear")).cast(IntegerType())) \
    .withColumn("movieAge", lit(current_year) - coalesce(col("startYear"), col("endYear"))) \

# Transform numvotes
df_test = df_test \
    .withColumn("logNumVotes", log(col("numVotes") + 1)) \

# Transform title length
df_test = df_test \
    .withColumn("titleLength", length(col("primaryTitle"))) \

# Transform run time votes interaction
df_test = df_test \
    .withColumn("runtimeVotesInteraction", col("runtimeMinutes") * col("logNumVotes")) \

# Transform Age and Runtime Interaction
df_test = df_test \
    .withColumn("ageRuntimeInteraction", col("movieAge") * col("runtimeMinutes"))


# Check for missing or zero values
missing_or_zero_test = df_test.select([count(when(col(c).isNull() | (col(c) == 0), c)).alias(c)
                                   for c in ["runtimeMinutes", "num_writers", "numVotes", "movieAge"]])

missing_or_zero_test.show()


In [None]:
df_validation = df_validation.withColumn("numVotes", col("numVotes").cast("int"))
df_validation = df_validation.withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

# Fill in NULLS for numVotes
filter_numvote = df_validation.filter(col("numVotes").isNotNull())
numvote_rows = filter_numvote.select("numVotes").collect()
numvote_list = [row['numVotes'] for row in numvote_rows]
numvote_median = np.median(numvote_list)

df_validation = df_validation.fillna({"numVotes": numvote_median})


# Fill in NULLS for runTime
filter_runtime = df_validation.filter(col("runtimeMinutes").isNotNull())
runtime_rows = filter_runtime.select("runtimeMinutes").collect()
runtime_list = [row['runtimeMinutes'] for row in runtime_rows]
runtime_median = np.median(runtime_list)

df_validation = df_validation.fillna({"runtimeMinutes": runtime_median})

df_validation = df_validation \
    .withColumn("primaryTitle", trim(col("primaryTitle"))) \
    .withColumn("primaryTitle", lower(col("primaryTitle"))) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[^\w\s]", "")) \
    .withColumn("primaryTitle", regexp_replace(col("primaryTitle"), "[\x00-\x1F]+", ""))

df_validation = df_validation \
    .withColumn("originalTitle", trim(col("originalTitle"))) \
    .withColumn("originalTitle", lower(col("originalTitle"))) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[^\w\s]", "")) \
    .withColumn("originalTitle", regexp_replace(col("originalTitle"), "[\x00-\x1F]+", ""))

# Transform Original title to remove nulls and replace with Primary title
df_validation = df_validation \
    .withColumn("originalTitle", coalesce("originalTitle", "primaryTitle"))

# Transform movie age
df_validation = df_validation \
    .withColumn("startYear", when(col("startYear") == "\\N", None).otherwise(col("startYear")).cast(IntegerType())) \
    .withColumn("endYear", when(col("endYear") == "\\N", None).otherwise(col("endYear")).cast(IntegerType())) \
    .withColumn("movieAge", lit(current_year) - coalesce(col("startYear"), col("endYear"))) \

# Transform numvotes
df_validation = df_validation \
    .withColumn("logNumVotes", log(col("numVotes") + 1)) \

# Transform title length
df_validation = df_validation \
    .withColumn("titleLength", length(col("primaryTitle"))) \

# Transform run time votes interaction
df_validation = df_validation \
    .withColumn("runtimeVotesInteraction", col("runtimeMinutes") * col("logNumVotes")) \

# Transform Age and Runtime Interaction
df_validation = df_validation \
    .withColumn("ageRuntimeInteraction", col("movieAge") * col("runtimeMinutes"))


# Check for missing or zero values
missing_or_zero_test = df_validation.select([count(when(col(c).isNull() | (col(c) == 0), c)).alias(c)
                                   for c in ["runtimeMinutes", "num_writers", "numVotes", "movieAge"]])

missing_or_zero_test.show()


In [None]:
# Transform df_test to get predictions
test_predictions = model.transform(df_test)
# Show some of the predictions
test_predictions = test_predictions.withColumn("label", expr("CASE WHEN prediction = 1 THEN 'True' ELSE 'False' END"))


In [None]:
# Transform df_validation to get predictions
val_predictions = model.transform(df_validation)
# Show some of the predictions
val_predictions = val_predictions.withColumn("label", expr("CASE WHEN prediction = 1 THEN 'True' ELSE 'False' END"))


In [None]:
output_path_test = "test_predictions.csv"
output_path_val = "val_predictions.csv"

selected_columns = test_predictions.select(
    "label"
)

selected_columns_val = val_predictions.select(
    "label"
)

# Save the DataFrame to a CSV file without headers
selected_columns.coalesce(1).write.mode("overwrite").csv(output_path_test, header=False)
selected_columns_val.coalesce(1).write.mode("overwrite").csv(output_path_val, header=False)