In [57]:
# Import PySpark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName('MyApp').getOrCreate()

In [58]:
"""
Loading the training csv.
"""

training = spark.read.option("escape", "\"").csv('../datasets/gpt_data/final/train_plots_awards_genre_no_duplicates.csv', header=True, inferSchema=True)

In [3]:
"""
Loading the validation csv.
"""

validation = spark.read.option("escape", "\"").csv('../datasets/gpt_data/final/validation_plots_awards_genre_no_duplicates.csv', header=True, inferSchema=True)

In [4]:
"""
Loading the test csv.
"""

test = spark.read.option("escape", "\"").csv('../datasets/gpt_data/final/test_plots_awards_genre_no_duplicates.csv',
    header=True, inferSchema=True)

In [115]:
from pyspark.ml.pipeline import Transformer
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import col, when, lower, mean, udf, split, regexp_replace, \
    min, array_contains
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [116]:
ranges = ('1915-1929', '1930-1939', '1940-1949', '1950-1959', '1960-1969', '1970-1979', '1980-1989',
              '1990-1999', '2000-2009', '2010-2019', '2020-2023')

In [117]:
scalar_udf = udf(lambda arr: float(arr[0]), DoubleType())

In [126]:
class LowerCaseTransformer(Transformer):
    def __init__(self, input_cols=None, output_cols=None):
        super(LowerCaseTransformer, self).__init__()
        self.input_cols = input_cols
        self.output_cols = output_cols

    def _transform(self, df):
        df = df.withColumn(self.output_cols[0], lower(self.input_cols[0]))
        df = df.withColumn(self.output_cols[1], lower(self.input_cols[1]))
        df = df.withColumn(self.output_cols[2], lower(self.input_cols[2]))

        return df

class CastToInt(Transformer):
    def __init__(self, input_cols=None, output_cols=None):
        super(CastToInt, self).__init__()
        self.input_cols = input_cols
        self.output_cols = output_cols

    def _transform(self, df):
        df = df.withColumn(self.output_cols[0], col(self.input_cols[0]).cast('int'))
        df = df.withColumn(self.output_cols[1], col(self.input_cols[1]).cast('int'))
        df = df.withColumn(self.output_cols[2], col(self.input_cols[2]).cast('int'))
        df = df.withColumn(self.output_cols[3], col(self.input_cols[3]).cast('int'))

        return df

class PreprocessYears(Transformer):
    def __init__(self, input_col=None, output_col=None, ranges=None):
        super(PreprocessYears, self).__init__()
        self.input_col = input_col
        self.output_col = output_col
        self.ranges = ranges

    def _transform(self, df):

        for r in self.ranges:
            limit0, limit1 = r.split('-')
            limit0 = int(limit0)
            limit1 = int(limit1)
            df = df.withColumn(r, when((col('startYear') >= limit0) & (col('startYear') <= limit1), 1).otherwise(0))

        return df

class PrepRuntimeMin(Transformer):
    def __init__(self, input_col=None, output_col=None):
        super(PrepRuntimeMin, self).__init__()
        self.input_col = input_col
        self.output_col = output_col

    def _transform(self, df):
        mean_runtime = df.select(mean(self.input_col).cast("int")).collect()[0][0]
        df = df.withColumn(self.input_col, when(col(self.input_col).isNull(), mean_runtime).
                       otherwise(col(self.input_col)))

        return df

class PrepNumVotes(Transformer):
    def __init__(self, input_col=None, output_col=None):
        super(PrepNumVotes, self).__init__()
        self.input_col = input_col
        self.output_col = output_col

    def __scaling_method(self, df, column_name, vector_column_name, scaled_column_name, placeholder):
        # Replace null values with a placeholder value, e.g. -1, using when/otherwise
        df = df.withColumn(column_name, when(col(column_name).isNull(), placeholder).otherwise(col(column_name)))

        # Create a VectorAssembler to convert the scalar column to a vector column
        assembler = VectorAssembler(inputCols=[column_name], outputCol=vector_column_name)
        df = assembler.transform(df)

        # Create the StandardScaler transformer and fit it to the data
        scaler = StandardScaler(inputCol=vector_column_name, outputCol=scaled_column_name, withMean=True, withStd=True)
        scaler_model = scaler.fit(df)

        scaled_data = scaler_model.transform(df)

        # Replace the placeholder values with null
        scaled_data = scaled_data.withColumn(scaled_column_name, when(col(column_name) == -1, None).
                                             otherwise(col(scaled_column_name)))
        scaled_data = scaled_data.withColumn(column_name, when(col(column_name) == -1, None).
                                             otherwise(col(column_name)))

        scaled_data = scaled_data.drop(vector_column_name)

        return scaled_data

    def _transform(self, df):
         min_votes = df.select(min(self.input_col)).collect()[0][0]
         df = df.withColumn(self.input_col, when(col(self.input_col).isNull(), min_votes).
                       otherwise(col(self.input_col)))
         df = self.__scaling_method(df, self.input_col, "vector_column_votes", "scaled_votes", -1)
         df = df.withColumn(self.input_col, scalar_udf(df['scaled_votes']))

         return df

class PrepGenre(Transformer):
    def __init__(self):
        super(PrepGenre, self).__init__()

    def __genre_preprocess(self, df):
        # Everything to lower case
        df = df.withColumn('Genre', lower(col('Genre')))

        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'film ', ''))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), ' in ', ' '))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), ' of ', ' '))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), '\.', ''))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'drama based on the novel by russell banks;', 'drama'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'drama adapted from wajdi mouawad\'s play of the same name', 'drama'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'world war ii', 'war'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'biopic', 'biography'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'drama\[not in citation given\]', 'drama'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'kung fu', 'kungfu'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'historical romance based on colm tóibín\'s novel of the same name', 'romantic'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), '3-d', '3d'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'neo-noir', 'noir'))
        df = df.withColumn('Genre', when(col('Genre').isNull(), 'unknown').otherwise(col('Genre')))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), '/', ', '))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), ' ,', ','))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), ', ', ','))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), '-', ' '))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'sci fi', 'sci-fi'))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), ' ', ','))
        df = df.withColumn('Genre', regexp_replace(col('Genre'), 'martial,arts', 'martial arts'))

        # Split the columns
        df = df.withColumn('Genre', split(lower(col('Genre')), ','))

        # Creating the one hot encoding
        unique_values = [str(row[0]) for row in df.selectExpr("explode(array_distinct(Genre))").distinct().collect()]
        for value in unique_values:
            df = df.withColumn(value, array_contains('Genre', value).cast('int'))

        df = df.drop('&', ' ', 'on', 'sf', 'i', 'jidaigeki', '', 'the', 'mouawad\'s', 'tóibín\'s', 'drama[not', 'given]', 'wajdi')

        return df

    def _transform(self, df):
         df = self.__genre_preprocess(df)
         return df

In [127]:
"""
Definition of the pipeline
"""

lowercase = LowerCaseTransformer(input_cols=["primaryTitle", "originalTitle", "plot"],
                                 output_cols=["primaryTitle", "originalTitle", "plot"])
integer = CastToInt(input_cols=["startYear", "endYear", "numVotes", "runtimeMinutes"],
                    output_cols=["startYear", "endYear", "numVotes", "runtimeMinutes"])
year = PreprocessYears(input_col="startYear", output_col="startYear", ranges=ranges)
runtime = PrepRuntimeMin(input_col="runtimeMinutes", output_col="runtimeMinutes")
votes = PrepNumVotes(input_col="numVotes", output_col="numVotes")
genre = PrepGenre()

pipeline = Pipeline(stages=[lowercase, integer, year, runtime, votes, genre])

model = pipeline.fit(training)

In [128]:
training_prep = model.transform(training)
training_prep = training_prep.drop('originalTitle', 'endYear', 'scaled_votes', 'Genre')

In [129]:
training_prep.toPandas().to_csv('train_transform.csv', index=False)

                                                                                

In [130]:
validation_prep = model.transform(validation)
validation_prep = validation_prep.drop('originalTitle', 'endYear', 'scaled_votes', 'Genre')

In [131]:
validation_prep.toPandas().to_csv('validation_transform.csv', index=False)

In [132]:
test_prep = model.transform(test)
test_prep = test_prep.drop('originalTitle', 'endYear', 'scaled_votes', 'Genre')

In [133]:
test_prep.toPandas().to_csv('test_transform.csv', index=False)