In [8]:
import os
import findspark
import pandas as pd
from unidecode import unidecode
from pyspark.sql import types as T
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, when, udf, count, isnan, desc
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BooleanType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, QuantileDiscretizer, VectorAssembler, OneHotEncoder, StringIndexer
from typing import Any, List, Tuple
from pyspark.ml import Transformer
ud = udf(lambda row: unidecode(row))
import re
import json

### Data preparation functions

In [9]:
def create_spark_session():
    return (
        SparkSession.builder
        .master("local")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.executor.memory", "70g")
        .config("spark.driver.memory", "50g")
        .config("spark.sql.analyzer.failAmbiguousSelfJoin", False)
        .config("spark.memory.offHeap.enabled", "true")
        .config("spark.memory.offHeap.size", "10g")
        .config("spark.default.parallelism", 1)
        .config("spark.sql.shuffle.partitions", 1)
        # .config("spark.driver.maxResultSize", "4g") # Increase to higher value
        # .config("spark.driver.warn.largeBroadcast", "false") # Remove warnings
        .getOrCreate()
    )

In [10]:
def read_training_files_to_dataframe(directory, file_prefix, schema_csv=None):
    all_training_files = [file for file in os.listdir(directory) if file.startswith(file_prefix)]
    if schema_csv:
        return (
            spark.read.options(header="True", inferSchema="False").schema(schema_csv).csv(all_training_files)
        )
    else:
        return (
            spark.read.options(header="True", inferSchema="False").csv(all_training_files)
        )

In [11]:
def load_director_data(json_file_path):
    with open(json_file_path, 'r') as f:
        dirData = json.load(f)
    movies = dirData['movie']
    dirs = dirData['director']
    directors = {
        value: dirs[key] if key in dirs.keys() else 'NULL'
        for key, value in movies.items()
    }
    df_directors = pd.DataFrame({'tconst': directors.keys(), 'directors': directors.values()})
    return spark.createDataFrame(df_directors)

In [12]:
def load_writer_data(json_file_path):
    with open(json_file_path, 'r') as f:
        writerData = json.load(f)
    writers = []
    movies = []
    for i in writerData:
        writers.append(i['writer'])
        movies.append(i['movie'])
    df_writers = pd.DataFrame({'tconst': movies, 'writer': writers})
    return spark.createDataFrame(df_writers)

In [13]:
def add_director_data(df, directors):
    df = df.join(directors, on='tconst')
    directorCount = directors.groupBy('directors').count()
    avgVotesDirector = df.groupBy('directors').avg('numVotes')
    df = df.join(directorCount, on='directors')
    df = df.join(avgVotesDirector, on='directors')
    df = df.withColumnRenamed('count', 'moviesByDirector')
    df = df.withColumnRenamed('avg(numVotes)', 'avgVotesDirector')
    return df

In [14]:
def add_writer_data(df, writers):
    numWriters = writers.groupBy('tconst').count()
    df = df.join(numWriters, on='tconst')
    df = df.withColumnRenamed('count', 'numWriters')
    written = writers.groupBy('writer').count()
    counts = writers.join(written, on='writer')
    counts = counts.groupBy('tconst').sum('count')
    df = df.join(counts, on='tconst')
    df = df.withColumnRenamed('sum(count)', 'totalMoviesWritten')
    return df

In [15]:
def clean_dataframe_columns(input_df_to_clean, primary_title_col, original_title_col, start_year_col, end_year_col, runtime_minutes_col, num_votes_col, label_col=None):
    cleaned_df = (
        input_df_to_clean
        .drop("")
        .withColumn('numVotesIsNull', col("numVotes").isNull().cast(T.IntegerType()))
        .withColumn("runtimeMinutesIsNull", when(col(runtime_minutes_col).isNull() | (col(runtime_minutes_col) == "\\N"), 1).otherwise(0))
        .withColumn(primary_title_col, when(col(primary_title_col).isNull(), col(original_title_col)).otherwise(col(primary_title_col)).cast(T.StringType()))
        .withColumn(original_title_col, when(col(original_title_col).isNull(), col(primary_title_col)).otherwise(col(original_title_col)).cast(T.StringType()))
        .withColumn(primary_title_col, ud(col(primary_title_col)))
        .withColumn(original_title_col, ud(col(original_title_col)))
        .withColumn(end_year_col, when(col(end_year_col).isNull() | (col(end_year_col) == "\\N"), col(start_year_col)).otherwise(col(end_year_col)).cast(T.IntegerType()))
        .withColumn(start_year_col, when(col(start_year_col).isNull() | (col(start_year_col) == "\\N"), col(end_year_col)).otherwise(col(start_year_col)).cast(T.IntegerType()))
        .withColumn(runtime_minutes_col, when(col(runtime_minutes_col).isNull(), None).otherwise(col(runtime_minutes_col)).cast(T.IntegerType()))
        .withColumn(runtime_minutes_col, when(col(runtime_minutes_col).isNull(), 0).otherwise(col(runtime_minutes_col)).cast(T.IntegerType()))
        .withColumn(num_votes_col, when(col(num_votes_col).isNull(), 0).otherwise(col(num_votes_col)).cast(T.IntegerType()))
        .withColumn('popularityScore', (col(num_votes_col) / (2022 - col(start_year_col))).cast(T.IntegerType()))
        .withColumn('hasSequel', count("*").over(Window.partitionBy(primary_title_col)) > 1)
    )
    
    # Convert label column to integer type if specified
    if label_col is not None:
        cleaned_df = cleaned_df.withColumn(label_col, col(label_col).cast(T.IntegerType()))
    return cleaned_df


In [85]:
def feature_transformers_pipeline(primary_title_col,start_year_col,runtime_minutes_col, num_votes_col,directors_col, movies_by_director_col, avg_votes_director_col, num_writers_col, total_movies_written_col, num_votes_is_null_col, runtime_minutes_is_null_col, popularity_score_col, has_sequel_col):
    
    PrTitleTok = Tokenizer(inputCol=primary_title_col, outputCol="TokenizedPrimaryTitle")
    PrTitleTokStop = StopWordsRemover(inputCol="TokenizedPrimaryTitle", outputCol="TokenizedStoppedPrimaryTitle")
    PrTitleTokStopCount = CountVectorizer(inputCol="TokenizedStoppedPrimaryTitle", outputCol="TokenizedStoppedCountedPrimaryTitle")
    SYDiscret = QuantileDiscretizer(numBuckets=20, inputCol=start_year_col, outputCol="DiscretStartYear")
    RTMDiscret = QuantileDiscretizer(numBuckets=3, inputCol=runtime_minutes_col, outputCol="DiscretRTM")
    NVDiscret = QuantileDiscretizer(numBuckets=4, inputCol=num_votes_col, outputCol="DiscretNV")
    directorsIndexer = StringIndexer(inputCol=directors_col, outputCol="IndexedDirectors", handleInvalid="keep")
    directorsEncoder = OneHotEncoder(inputCol="IndexedDirectors", outputCol="EncodedDirectors")
    
    # Additional transformers
    MBD_Discret = QuantileDiscretizer(numBuckets=4, inputCol=movies_by_director_col, outputCol="DiscretMBD")
    AVD_Discret = QuantileDiscretizer(numBuckets=4, inputCol=avg_votes_director_col, outputCol="DiscretAVD")
    NW_Discret = QuantileDiscretizer(numBuckets=4, inputCol=num_writers_col, outputCol="DiscretNW")
    TMW_Discret = QuantileDiscretizer(numBuckets=4, inputCol=total_movies_written_col, outputCol="DiscretTMW")
    
    all_assembled = VectorAssembler(
        inputCols=[
            "TokenizedStoppedCountedPrimaryTitle", 
            "DiscretRTM", 
            "DiscretNV", 
            "DiscretStartYear", 
            runtime_minutes_col, 
            num_votes_col, 
            start_year_col,
            popularity_score_col,
            "EncodedDirectors", 
            MBD_Discret.getOutputCol(), 
            AVD_Discret.getOutputCol(), 
            NW_Discret.getOutputCol(), 
            TMW_Discret.getOutputCol(),
            movies_by_director_col, 
            avg_votes_director_col, 
            num_writers_col, 
            total_movies_written_col,
            num_votes_is_null_col,
            runtime_minutes_is_null_col,
            has_sequel_col
        ], 
        outputCol="features"
    )

    return [
        PrTitleTok,
        PrTitleTokStop,
        PrTitleTokStopCount,
        SYDiscret,
        RTMDiscret,
        NVDiscret,
        directorsIndexer,
        directorsEncoder,
        MBD_Discret,
        AVD_Discret,
        NW_Discret,
        TMW_Discret,
        all_assembled,
    ]

### Model training functions

In [86]:
def train_model(input_df_to_train, label_col, feature_col, preprocessing_stages, split_ratio, seed):
    classifier = GBTClassifier(labelCol=label_col, featuresCol=feature_col, seed=seed)
    pipeline = Pipeline(stages=preprocessing_stages + [classifier])
    df_train, df_test = input_df_to_train.randomSplit([split_ratio, 1.0 - split_ratio], seed=seed)
    model_trained = pipeline.fit(df_train)

    return model_trained, df_train, df_test

In [87]:
def evaluate_model(model_trained, test_data, label_col=None):
    predictions = model_trained.transform(test_data)
    evaluator_multi = MulticlassClassificationEvaluator(labelCol=label_col)
    evaluator_binary = BinaryClassificationEvaluator(labelCol=label_col)
    accuracy_multi = round(evaluator_multi.evaluate(predictions, {evaluator_multi.metricName: "accuracy"}), 3)
    accuracy_binary = round(evaluator_binary.evaluate(predictions, {evaluator_binary.metricName: "areaUnderROC"}), 3)
    print(f"Accuracy multiclass: {accuracy_multi}")
    print(f"Accuracy binary: {accuracy_binary}")

In [88]:
def export_final_model(model_trained, input_path, file_prefix, schema_csv, primary_title_col, original_title_col, start_year_col, end_year_col, runtime_minutes_col, num_votes_col, output_file_name, director_data_file, writer_data_file):

    # Read hidden file to DataFrame
    final_df = read_training_files_to_dataframe(directory=input_path, file_prefix=file_prefix, schema_csv=schema_csv)

    # Clean DataFrame columns
    final_df_cleaned = clean_dataframe_columns(final_df, primary_title_col, original_title_col, start_year_col, end_year_col, runtime_minutes_col, num_votes_col)

    # Load director and writer data
    df_directors = load_director_data(director_data_file)
    df_writers = load_writer_data(writer_data_file)

    # Add director and writer data
    df_cleaned_with_director_data = add_director_data(final_df_cleaned, df_directors)
    df_cleaned_with_director_and_writer_data = add_writer_data(df_cleaned_with_director_data, df_writers)

    # Transform final model
    final_model_transformed = model_trained.transform(df_cleaned_with_director_and_writer_data)

    # Convert transformed DataFrame to pandas DataFrame
    final_model_transformed_pandas = final_model_transformed.select('prediction').toPandas().astype(bool)

    # Save pandas DataFrame to CSV file
    final_model_transformed_pandas.to_csv(output_file_name, index=False, header=False)

In [89]:
def get_feature_importances(model_trained):
    # Extract the feature importances and feature names
    pipeline_stages = model_trained.stages
    vector_assembler_stage = [stage for stage in pipeline_stages if isinstance(stage, VectorAssembler)][0]
    feature_importances = model_trained.stages[-1].featureImportances.toArray()
    feature_names = vector_assembler_stage.getInputCols()
    # Create a list of (feature_name, feature_importance) tuples
    feature_tuples = list(zip(feature_names, [float(val) for val in feature_importances]))
    # Create a DataFrame with the feature importances
    schema = StructType([
        StructField("feature_name", StringType(), True),
        StructField("feature_importance", DoubleType(), True)])
    feature_df = spark.createDataFrame(feature_tuples, schema)
    return feature_df.orderBy(desc("feature_importance")).show()   

### Data preparation parameters and execute functions

In [90]:
# Directory and file_pre_fix
directory = "."
file_prefix = "train"

# Schema parameters (train data sets include 'label' column)
schema_csv = StructType([
  StructField("", IntegerType(), nullable = True),
  StructField("tconst", StringType(), nullable = True),
  StructField("primaryTitle", StringType(), nullable = True),
  StructField("originalTitle", StringType(), nullable = True),
  StructField("startYear", IntegerType(), nullable = True),
  StructField("endYear", IntegerType(), nullable = True),
  StructField("runtimeMinutes", IntegerType(), nullable = True),
  StructField("numVotes", DoubleType(), nullable = True),
  StructField("label", BooleanType(), nullable = True)
])

In [91]:
# Column names for multiple functions

primary_title_col = "primaryTitle"
original_title_col = 'originalTitle'
start_year_col = "startYear"
end_year_col = "endYear"
runtime_minutes_col = "runtimeMinutes"
num_votes_col = "numVotes"
directors_col = "directors"
label = 'label'
movies_by_director_col = "moviesByDirector"
avg_votes_director_col = "avgVotesDirector"
num_writers_col = "numWriters"
total_movies_written_col = "totalMoviesWritten"
num_votes_is_null_col = "numVotesIsNull" 
runtime_minutes_is_null_col = "runtimeMinutesIsNull"
popularity_score_col = "popularityScore"
has_sequel_col = "hasSequel"

In [92]:
# Initialize spark session and read data
findspark.init()
spark = create_spark_session()
df = read_training_files_to_dataframe(directory=directory, file_prefix=file_prefix, schema_csv=schema_csv)

# Clean data
df_cleaned = clean_dataframe_columns(df, primary_title_col, original_title_col, start_year_col, end_year_col, runtime_minutes_col, num_votes_col, label)

# Get global transformers
preprocessing_stages = feature_transformers_pipeline(primary_title_col, start_year_col, runtime_minutes_col, num_votes_col, directors_col, movies_by_director_col, avg_votes_director_col, num_writers_col, total_movies_written_col, num_votes_is_null_col, runtime_minutes_is_null_col, popularity_score_col, has_sequel_col)

In [93]:
# Add director and writer data

director_data_file = '/Users/xandersnelder/repos/big-data-project/directing.json'
writer_data_file = '/Users/xandersnelder/repos/big-data-project/writing.json'

directors = load_director_data(director_data_file)
writers = load_writer_data(writer_data_file)

df_cleaned_with_director_data = add_director_data(df_cleaned, directors)
df_cleaned_with_director_and_writer_data = add_writer_data(df_cleaned_with_director_data, writers)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


### Model training parameters and execute functions

In [94]:
# Modelling parameters
input_df = df_cleaned_with_director_and_writer_data
feature_col = 'features'
label_col = 'label'
split_ratio = 0.7
seed = 70

In [95]:
# Train model
model_trained, df_train, df_test = train_model(input_df_to_train=input_df, label_col=label_col, feature_col=feature_col, preprocessing_stages=preprocessing_stages, split_ratio=split_ratio, seed=seed)

23/03/20 22:19:51 WARN DAGScheduler: Broadcasting large task binary with size 1234.7 KiB
23/03/20 22:19:51 WARN DAGScheduler: Broadcasting large task binary with size 1234.8 KiB
23/03/20 22:19:51 WARN DAGScheduler: Broadcasting large task binary with size 1301.6 KiB


                                                                                

23/03/20 22:19:53 WARN DAGScheduler: Broadcasting large task binary with size 1514.8 KiB


                                                                                

23/03/20 22:19:54 WARN DAGScheduler: Broadcasting large task binary with size 1515.6 KiB
23/03/20 22:19:54 WARN DAGScheduler: Broadcasting large task binary with size 1516.2 KiB
23/03/20 22:19:54 WARN DAGScheduler: Broadcasting large task binary with size 1517.8 KiB
23/03/20 22:19:55 WARN DAGScheduler: Broadcasting large task binary with size 1520.5 KiB
23/03/20 22:19:55 WARN DAGScheduler: Broadcasting large task binary with size 1526.7 KiB
23/03/20 22:19:56 WARN DAGScheduler: Broadcasting large task binary with size 1527.4 KiB
23/03/20 22:19:56 WARN DAGScheduler: Broadcasting large task binary with size 1527.9 KiB
23/03/20 22:19:56 WARN DAGScheduler: Broadcasting large task binary with size 1529.1 KiB
23/03/20 22:19:56 WARN DAGScheduler: Broadcasting large task binary with size 1531.4 KiB
23/03/20 22:19:57 WARN DAGScheduler: Broadcasting large task binary with size 1534.1 KiB
23/03/20 22:19:57 WARN DAGScheduler: Broadcasting large task binary with size 1534.8 KiB
23/03/20 22:19:57 WAR

In [103]:
### After
# Evaluate model
evaluate_model(model_trained, df_test, label_col=label_col)

23/03/20 22:22:00 WARN DAGScheduler: Broadcasting large task binary with size 1382.1 KiB
23/03/20 22:22:01 WARN DAGScheduler: Broadcasting large task binary with size 1370.5 KiB
Accuracy multiclass: 0.74
Accuracy binary: 0.811


In [None]:
### Before
# Evaluate model
evaluate_model(model_trained, df_test, label_col=label_col)

23/03/20 18:15:19 WARN DAGScheduler: Broadcasting large task binary with size 1350.0 KiB
23/03/20 18:15:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/20 18:15:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/03/20 18:15:19 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/03/20 18:15:20 WARN DAGScheduler: Broadcasting large task binary with size 1338.8 KiB
Accuracy multiclass: 0.737
Accuracy binary: 0.814


In [97]:
# Inspect feature importances
feature_importances = get_feature_importances(model_trained)

+--------------------+--------------------+
|        feature_name|  feature_importance|
+--------------------+--------------------+
|            numVotes| 0.02393080160316427|
|    DiscretStartYear|0.015831260553492342|
|     popularityScore|0.008974741312843468|
|    EncodedDirectors|0.003380304803443...|
|          DiscretMBD|0.002060074607553...|
|           DiscretNV|0.002049926864719...|
|           hasSequel|0.002007664378818677|
|      numVotesIsNull|0.001904880495982...|
|           DiscretNW|0.001633585676360...|
|           startYear| 4.14911793739783E-5|
|TokenizedStoppedC...|4.117712910756262E-6|
|  totalMoviesWritten| 8.37076221972239E-8|
|          DiscretRTM|                 0.0|
|          DiscretTMW|                 0.0|
|      runtimeMinutes|                 0.0|
|    moviesByDirector|                 0.0|
|runtimeMinutesIsNull|                 0.0|
|          DiscretAVD|                 0.0|
|    avgVotesDirector|                 0.0|
|          numWriters|          

### Final model parameters and export results

In [98]:
# Schema parameters (hidden files do not have 'label' column)
final_schema_csv = StructType([
  StructField("", IntegerType(), nullable = True),
  StructField("tconst", StringType(), nullable = True),
  StructField("primaryTitle", StringType(), nullable = True),
  StructField("originalTitle", StringType(), nullable = True),
  StructField("startYear", IntegerType(), nullable = True),
  StructField("endYear", IntegerType(), nullable = True),
  StructField("runtimeMinutes", IntegerType(), nullable = True),
  StructField("numVotes", DoubleType(), nullable = True)
])

In [99]:
# Specify input directory
input_path = '/Users/xandersnelder/repos/big-data-project/'

# Validation hidden parameters
validation_hidden = 'validation_hidden'
hidden_output_file_name = 'output_validation_hidden.txt'

# Test hidden parameters
test_hidden = 'test_hidden'
output_test_hidden = 'output_test_hidden.txt'

In [100]:
directors = load_director_data(director_data_file)
writers = load_writer_data(writer_data_file)

df_cleaned_with_director_data = add_director_data(df_cleaned, directors)
df_cleaned_with_director_and_writer_data = add_writer_data(df_cleaned_with_director_data, writers)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [101]:
# Train and export final model for validation_hidden
export_final_model(
    model_trained=model_trained,
    input_path=input_path, 
    file_prefix=validation_hidden, 
    schema_csv=final_schema_csv, 
    primary_title_col=primary_title_col, 
    original_title_col=original_title_col, 
    start_year_col=start_year_col, 
    end_year_col=end_year_col, 
    runtime_minutes_col=runtime_minutes_col, 
    num_votes_col=num_votes_col, 
    output_file_name = hidden_output_file_name,
    director_data_file = director_data_file, 
    writer_data_file = writer_data_file
    )

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


23/03/20 22:20:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [102]:
# Train and export final model for test_hidden
export_final_model(
    model_trained=model_trained,
    input_path=input_path, 
    file_prefix=test_hidden, 
    schema_csv=final_schema_csv, 
    primary_title_col=primary_title_col, 
    original_title_col=original_title_col, 
    start_year_col=start_year_col, 
    end_year_col=end_year_col, 
    runtime_minutes_col=runtime_minutes_col, 
    num_votes_col=num_votes_col, 
    output_file_name = output_test_hidden,
    director_data_file = director_data_file, 
    writer_data_file = writer_data_file
    )

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


23/03/20 22:20:28 WARN DAGScheduler: Broadcasting large task binary with size 1357.4 KiB
