In [2]:
import glob
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
# Match all the csv files
file_paths = glob.glob('train-*.csv')
spark = SparkSession.builder.appName("BigData").getOrCreate()
# Read data
train_data = pd.concat([pd.read_csv(file) for file in file_paths], ignore_index=True)

directing_data = pd.read_json('directing.json')
writing_data = pd.read_json('writing.json')
# Combine the columns
# train_data = pd.merge(train_data, directing_data, left_on='tconst', right_on='movie', how='left')
# train_data = pd.merge(train_data, writing_data, left_on='tconst', right_on='movie', how='left')
# train_data.drop(['movie_x', 'movie_y'], axis=1, inplace=True)

# Check the first few lines
train_data.to_csv("trained_data.csv")
train_data = spark.read.csv("trained_data.csv", header=True, inferSchema=True)
# Select columns to keep, excluding the first two columns
columns_to_keep = train_data.columns[2:]

# Create a new DataFrame with the desired columns
train_data = train_data.select(*[col(column) for column in columns_to_keep])
# Show the statistic information
train_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
| true| 3990|
|false| 3969|
+-----+-----+



# Preprocessing


In [3]:
# Combine the year of the movie
#def preprocessing
train_data = train_data.withColumn(
    "ReleaseYear",
    when(col("endYear") != "\\N", col("endYear")).otherwise(col("startYear"))
)
train_data = train_data.drop("startYear", "endYear", "originalTitle")
train_data = train_data.dropna()
for column_name in ["primaryTitle", "originalTitle"]:
    train_data = train_data.filter(col(column_name).isNotNull() & (col(column_name).cast("string") == col(column_name)))
   
train_data.show()


+---------+--------------------+--------------+--------+-----+-----------+
|   tconst|        primaryTitle|runtimeMinutes|numVotes|label|ReleaseYear|
+---------+--------------------+--------------+--------+-----+-----------+
|tt0010600|            The Doll|            66|  1898.0| true|       1919|
|tt0011841|       Way Down East|           145|  5376.0| true|       1920|
|tt0012494|             Déstiny|            97|  5842.0| true|       1921|
|tt0015163|       The Navigator|            59|  9652.0| true|       1924|
|tt0016220|The Phantom of th...|            93| 17887.0| true|       1925|
|tt0016630|     Báttling Bútlér|            77|  3285.0| true|       1926|
|tt0024184|   The Invisible Man|            71| 33562.0| true|       1933|
|tt0024216|           King Kong|           100| 83177.0| true|       1933|
|tt0030341|   The Lady Vanishes|            96| 50707.0| true|       1938|
|tt0032156|The Story of the ...|           143|  3600.0| true|       1939|
|tt0033022|         Sáps 

# Feature Engineering


In [4]:
from pyspark.sql.functions import avg, log1p, round
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame

# Get basic information
def get_information(data):
    # Average value calculation
    avg_runtime = data.select(avg("runtimeMinutes")).collect()[0][0]
    avg_numVotes = data.select(avg("numVotes")).collect()[0][0]
    avg_ReleaseYear = data.select(avg("ReleaseYear")).collect()[0][0]
    
    
    return int(avg_runtime), int(avg_numVotes), int(avg_ReleaseYear)

# Print the result
avg_runtime, avg_numVotes, avg_ReleaseYear = get_information(train_data)
# Create a binary feature to indicate whether it is a new movie
train_data = train_data.withColumn("isNewRelease", (train_data["releaseYear"] <= avg_runtime).cast("int"))

# Calculate the quantiles
def calculate_quantiles(data: DataFrame, column: str, quantiles: list):

    # Convert quantiles to summary statistics
    statistics = ["{}%".format(int(quantile * 100)) for quantile in quantiles]

    # Use summary() function to calculate quantiles
    summary_df = data.select(column).summary(*statistics)

    # Convert summary DataFrame to Pandas DataFrame for easier manipulation
    summary_df_pd = summary_df.toPandas()

    # Extract quantile values from Pandas DataFrame
    quantile_values = {}
    for quantile in quantiles:
        quantile_index = int(quantile * len(summary_df_pd)) - 1
        quantile_value = summary_df_pd.iloc[quantile_index][column]
        quantile_values[quantile] = quantile_value

    return quantile_values

# Calculate quantiles
train_data = train_data.withColumn("runtimeMinutes", train_data["runtimeMinutes"].cast("float"))

quantiles = [0,0.25, 0.5, 0.75]
column = "runtimeMinutes"
quantile_values = calculate_quantiles(train_data, column, quantiles)

# Convert quantile values to integers
quantile_values = {k: float(v) for k, v in quantile_values.items()}

# Convert quantile values to a list of floats
sorted_quantiles = sorted(quantile_values.items(), key=lambda x: x[1], reverse=False)
quantile_list = [float(val) for key, val in sorted_quantiles]


# # Create a Bucketizer instance
splits = quantile_list +[float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="runtimeMinutes", outputCol="runtimeBucket")

# Apply Bucketizer to the data
train_data = bucketizer.transform(train_data)


train_data = train_data.withColumn("logVotes", log1p(train_data["numVotes"]))
train_data = train_data.withColumn("logVotes", round(train_data["logVotes"], 2))
train_data.dropna()
train_data.show()




+---------+--------------------+--------------+--------+-----+-----------+------------+-------------+--------+
|   tconst|        primaryTitle|runtimeMinutes|numVotes|label|ReleaseYear|isNewRelease|runtimeBucket|logVotes|
+---------+--------------------+--------------+--------+-----+-----------+------------+-------------+--------+
|tt0010600|            The Doll|          66.0|  1898.0| true|       1919|           0|          0.0|    7.55|
|tt0011841|       Way Down East|         145.0|  5376.0| true|       1920|           0|          3.0|    8.59|
|tt0012494|             Déstiny|          97.0|  5842.0| true|       1921|           0|          1.0|    8.67|
|tt0015163|       The Navigator|          59.0|  9652.0| true|       1924|           0|          0.0|    9.18|
|tt0016220|The Phantom of th...|          93.0| 17887.0| true|       1925|           0|          1.0|    9.79|
|tt0016630|     Báttling Bútlér|          77.0|  3285.0| true|       1926|           0|          0.0|     8.1|
|

# Encoding

In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, VectorAssembler
from pyspark.sql.functions import col, concat_ws

# Create tokenizers
tokenizer = Tokenizer(inputCol="primaryTitle", outputCol="primaryTitle_words")


# Tokenize movie titles
data_tokenized = tokenizer.transform(train_data)

# Use HashingTF to convert words to feature vectors
hashingTF = HashingTF(inputCol="primaryTitle_words", outputCol="primaryTitle_features", numFeatures=1000)


data_hashed = hashingTF.transform(data_tokenized)


# Calculate TF-IDF values
idf = IDF(inputCol="primaryTitle_features", outputCol="primaryTitle_tfidf")

idfModel = idf.fit(data_hashed)
data_tfidf = idfModel.transform(data_hashed)

# Cast ReleaseYear column to integer
data_tfidf = data_tfidf.withColumn("ReleaseYear", col("ReleaseYear").cast("int"))
# Merge other features
feature_cols = ["primaryTitle_tfidf","runtimeMinutes", "numVotes", "ReleaseYear", "isNewRelease", "runtimeBucket", "logVotes"]
assembler = VectorAssembler(inputCols= feature_cols, outputCol="features")

data_final = assembler.transform(data_tfidf)

# data_final = data_final.drop("primaryTitle_words")

# View the results
data_final.show()
# data_final.write.csv("final.csv", header=True)


+---------+--------------------+--------------+--------+-----+-----------+------------+-------------+--------+--------------------+---------------------+--------------------+--------------------+
|   tconst|        primaryTitle|runtimeMinutes|numVotes|label|ReleaseYear|isNewRelease|runtimeBucket|logVotes|  primaryTitle_words|primaryTitle_features|  primaryTitle_tfidf|            features|
+---------+--------------------+--------------+--------+-----+-----------+------------+-------------+--------+--------------------+---------------------+--------------------+--------------------+
|tt0010600|            The Doll|          66.0|  1898.0| true|       1919|           0|          0.0|    7.55|         [the, doll]| (1000,[17,685],[1...|(1000,[17,685],[1...|(1006,[17,685,100...|
|tt0011841|       Way Down East|         145.0|  5376.0| true|       1920|           0|          3.0|    8.59|   [way, down, east]| (1000,[217,391,57...|(1000,[217,391,57...|(1006,[217,391,57...|
|tt0012494|         

In [9]:
from pyspark.sql.functions import to_json
for col_name in ["primaryTitle_tfidf", "runtimeMinutes", "numVotes", "ReleaseYear", "isNewRelease", "runtimeBucket", "logVotes"]:
    print(f"Column '{col_name}' data type: {data_final.select(col_name).dtypes[0][1]}")
data_final = data_final.drop("primaryTitle_features", "primaryTitle_tfidf", "primaryTitle_words")
# data_final.write.csv("final.csv")

Column 'primaryTitle_tfidf' data type: vector
Column 'runtimeMinutes' data type: float
Column 'numVotes' data type: double
Column 'ReleaseYear' data type: int
Column 'isNewRelease' data type: int
Column 'runtimeBucket' data type: double
Column 'logVotes' data type: double


# Random Forests

In [152]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

assembler = VectorAssembler(inputCols= feature_cols, outputCol="features")

data_final = assembler.transform(data_tfidf)





# Convert boolean labels to numerical values, e.g., True -> 1, False -> 0
data_final = data_final.withColumn("label", when(col("label") == True, 1).otherwise(0))

# Split the dataset into training and test sets
training_data, test_data = data_final.randomSplit([0.8, 0.2], seed=123)

# Define Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# Train the model
model = rf.fit(training_data)

# Evaluate model performance using test data
predictions = model.transform(test_data)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

# Print model performance
print("Area Under ROC (AUC) on test data: {}".format(auc))


Py4JJavaError: An error occurred while calling o7853.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 901.0 failed 1 times, most recent failure: Lost task 0.0 in stage 901.0 (TID 643) (host.docker.internal executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda/0x000001cec224bb88`: (struct<primaryTitle_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,primaryTitle_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,runtimeMinutes_double_VectorAssembler_406dc38afd14:double,numVotes:double,ReleaseYear_double_VectorAssembler_406dc38afd14:double,isNewRelease_double_VectorAssembler_406dc38afd14:double,runtimeBucket:double,logVotes:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda/0x000001cec224bb88`: (struct<primaryTitle_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,primaryTitle_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,runtimeMinutes_double_VectorAssembler_406dc38afd14:double,numVotes:double,ReleaseYear_double_VectorAssembler_406dc38afd14:double,isNewRelease_double_VectorAssembler_406dc38afd14:double,runtimeBucket:double,logVotes:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more


# Benchmark

In [16]:
#Benchmark
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report

# Do one-hot on non-value type data
train_data_encoded = pd.get_dummies(train_data, columns=['tconst', 'primaryTitle', 'originalTitle', 'startYear', 'director','movie_writing', 'writer'])


train_data_encoded = train_data_encoded.drop(['movie_train'], axis=1)

# Encoding
train_data_encoded['label'] = train_data_encoded['label'].astype(int)

# Handling missing values
train_data_encoded.replace('\\N', np.nan, inplace=True)

missing_values = train_data_encoded.isnull().sum()


for column in missing_values[missing_values > 0].index:
    if train_data_encoded[column].dtype == 'float64':
        
        train_data_encoded[column].fillna(train_data_encoded[column].mean(), inplace=True)
    else:
       
        train_data_encoded[column].fillna(train_data_encoded[column].mode()[0], inplace=True)



# Feature extraction

# X_train = train_data_encoded.drop(['label', 'tconst_encoded'], axis=1)
X_train = train_data_encoded.drop(['label'], axis=1)
y_train = train_data_encoded['label']


X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Initialize the logistic regression model
model = LogisticRegression(random_state=42)

# Train the model
model.fit(X_train, y_train)

# Test prediction
y_pred = model.predict(X_val)

# Evaluation
accuracy = accuracy_score(y_val, y_pred)
print(f'Accuracy: {accuracy}')

# Print report
print('Classification Report:')
print(classification_report(y_val, y_pred))


# Outcome

In [None]:
import pandas as pd
import numpy as np

# Load the test data
test_data = pd.read_csv('test_hidden.csv')

# Drop unnecessary columns
test_data = test_data.drop(['endYear'], axis=1)

# Merge directing and writing data
test_data = test_data.merge(directing_data, left_on='tconst', right_on='movie', how='left', suffixes=('_train', '_directing'))
test_data = test_data.merge(writing_data, left_on='tconst', right_on='movie', how='left', suffixes=('_train', '_writing'))

# Check the column names after the merge operation
print(test_data.columns)

# Feature extraction for the test set
X_test = pd.get_dummies(test_data, columns=['tconst', 'primaryTitle', 'originalTitle', 'startYear', 'director', 'movie_writing', 'writer'])

# Drop unnecessary columns
X_test = X_test.drop(['movie_train'], axis=1)

# Replace '\\N' with NaN in the test set
X_test.replace('\\N', np.nan, inplace=True)

# Handle missing values in the test set
for column in X_test.columns[X_test.isnull().any()]:
    if X_test[column].dtype == 'float64':
        X_test[column].fillna(X_test[column].mean(), inplace=True)
    else:
        X_test[column].fillna(X_test[column].mode()[0], inplace=True)

# Make predictions on the test set
y_pred_test = model.predict(X_test)

# Save the predictions to a CSV file
submission = pd.DataFrame({'tconst': test_data['tconst'], 'label': y_pred_test})
submission.to_csv('submission.csv', index=False)