# Import necessary libraries

In [4]:
import json
import ast
import glob
import numpy as np
import pandas as pd
from itertools import groupby

from py_files.writer_director_to_one_hot import writer_director_to_one_hot
from py_files.add_merge_begin_end_year import merge_start_end_year
from py_files.load_box_office_data import load_and_aggregate_box_office
from py_files.add_remake_feature import create_remake_column
from py_files.add_langoriginaltitle_feature import add_language_of_original_title
from py_files.add_ENvsNonEN_feature import add_english_title_or_not
from py_files.add_movie_genre_feature import add_movie_genre
from py_files.df_processor_enrichment import df_processor_enrichment

from py_files.df_model_prep import df_model_prep
from py_files.d2v_embed import d2v_embed
from sklearn.model_selection import train_test_split
import lightgbm as lgb
from sklearn.metrics import accuracy_score
import math

# Loading the data

In [60]:
from py_files.load_original_data import load_original_data

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .getOrCreate()

all_files = glob.glob("train*.csv")

print(f"Found files: {', '.join(all_files)}")

 # Header: , tconst, primaryTitle, originalTitle, startYear, endYear, runtimeMinutes, numVotes, label

schema = StructType() \
      .add("_c0",IntegerType(),True) \
      .add("tconst",StringType(),True) \
      .add("primaryTitle",StringType(),True) \
      .add("originalTitle",StringType(),True) \
      .add("startYear",IntegerType(),True) \
      .add("endYear",IntegerType(),True) \
      .add("runtimeMinutes",IntegerType(),True) \
      .add("numVotes",IntegerType(),True) \
      .add("label",BooleanType(),True)


n_skip_rows = 1
row_rdd = spark.sparkContext \
    .textFile("train*.csv") \
    .zipWithIndex() \
    .filter(lambda row: row[1] >= n_skip_rows) \
    .map(lambda row: row[0])
# df = spark_session.read.csv(row_rdd, ...)

training_data = spark.read.csv(row_rdd, schema=schema, header=False)

Found files: train-8.csv, train-2.csv, train-7.csv, train-5.csv, train-3.csv, train-4.csv, train-1.csv, train-6.csv


                                                                                

# Preprocessing of original columns

In [61]:
from pyspark.sql.functions import udf,col, lit, coalesce
import unicodedata
def format_titles(title):
    return unicodedata.normalize('NFKD',title.lower()).encode('ascii', errors='ignore').decode('utf-8').replace(" ", "_").replace("\W", "")

# def merge_start_end_year(startyear, endyear):
#     df_['Year'] = df_['startYear'].fillna(df_['endYear'])
#     return df_

udf_format_titles = udf(format_titles, StringType()) # if the function returns an int

training_data.show()
training_data = training_data.withColumn("primaryTitleFormatted", lit(udf_format_titles('primaryTitle')))
training_data = training_data.withColumn('Year', coalesce('startYear', 'endYear'))
training_data.show()

                                                                                

+---+---------+--------------------+--------------------+---------+-------+--------------+--------+-----+
|_c0|   tconst|        primaryTitle|       originalTitle|startYear|endYear|runtimeMinutes|numVotes|label|
+---+---------+--------------------+--------------------+---------+-------+--------------+--------+-----+
|  4|tt0010600|            The Doll|           Die Puppe|     1919|   null|            66|    null| true|
|  7|tt0011841|       Way Down East|       Way Down East|     1920|   null|           145|    null| true|
|  9|tt0012494|             Déstiny|        Der müde Tod|     1921|   null|            97|    null| true|
| 25|tt0015163|       The Navigator|       The Navigator|     1924|   null|            59|    null| true|
| 38|tt0016220|The Phantom of th...|The Phantom of th...|     1925|   null|            93|    null| true|
| 42|tt0016630|     Báttling Bútlér|     Battling Butler|     1926|   null|            77|    null| true|
| 81|tt0021015|Juno and the Paycock|          

                                                                                

+---+---------+--------------------+--------------------+---------+-------+--------------+--------+-----+---------------------+----+
|_c0|   tconst|        primaryTitle|       originalTitle|startYear|endYear|runtimeMinutes|numVotes|label|primaryTitleFormatted|Year|
+---+---------+--------------------+--------------------+---------+-------+--------------+--------+-----+---------------------+----+
|  4|tt0010600|            The Doll|           Die Puppe|     1919|   null|            66|    null| true|             the_doll|1919|
|  7|tt0011841|       Way Down East|       Way Down East|     1920|   null|           145|    null| true|        way_down_east|1920|
|  9|tt0012494|             Déstiny|        Der müde Tod|     1921|   null|            97|    null| true|              destiny|1921|
| 25|tt0015163|       The Navigator|       The Navigator|     1924|   null|            59|    null| true|        the_navigator|1924|
| 38|tt0016220|The Phantom of th...|The Phantom of th...|     1925|  

## Preprocessing of exogenous data

### Oscar data

In [None]:
oscars = pd.read_csv("additional_data/oscars.csv")

oscars["film"] = oscars["film"].str.lower()\
                               .str.normalize('NFKD')\
                               .str.encode('ascii', errors='ignore')\
                               .str.decode('utf-8')\
                               .str.replace(" ", "_", regex=True)\
                               .str.replace("\W", "", regex=True)

# Counting oscar nominations and wins per movie
oscar_noms = pd.merge(df_preprocessed, oscars, left_on = "primaryTitleFormatted", right_on = "film").groupby("tconst")["winner"].count()
oscar_wins = pd.merge(df_preprocessed, oscars, left_on = "primaryTitleFormatted", right_on = "film").groupby("tconst")["winner"].sum()

### Writer and Director data

In [None]:
# Find writers and directors per movie and combine the two
written_and_directed = (writer_director_to_one_hot("writers") + writer_director_to_one_hot("directors")).fillna(0).astype(int).loc[df_preprocessed['tconst']]

### TMDB data

In [None]:
df_TMDB = pd.read_csv("additional_data/TMDB.csv")[["budget", "genres", "imdb_id", 
                                                   "original_language", "overview", 
                                                   "popularity", "production_companies", 
                                                   "tagline", "Keywords", "revenue"]]

In [None]:
def dict_to_list(dictionary):
    try:
        d = ast.literal_eval(dictionary)
    except ValueError:
        return []
    
    return [i["name"] for i in d]

In [None]:
df_TMDB["genres"] = df_TMDB["genres"].apply(lambda x: dict_to_list(x))
df_TMDB["Keywords"] = df_TMDB["Keywords"].apply(lambda x: dict_to_list(x))
df_TMDB["production_companies"] = df_TMDB["production_companies"].apply(lambda x: dict_to_list(x))
df_TMDB = df_TMDB.set_index("imdb_id")

### Metacritic data

In [None]:
df_meta = pd.read_csv("additional_data/Metacritic.csv").drop("Unnamed: 0", axis=1).set_index("movie")
df_meta["overview"] = df_meta["overview"].apply(lambda x: eval(x))
df_meta["overview"] = df_meta["overview"].apply(lambda x: x[0] if x else str(x))

In [None]:
# Combine for faster merge
df_TMDB["overview"] = df_TMDB["overview"].str.cat(df_meta["overview"], join="outer", na_rep="")

### Box Office data

In [None]:
df_box_office_mojo = load_and_aggregate_box_office()

# process the 'release group' (read movie title) in the same way as the formatted title
df_box_office_mojo["Release Group"] = df_box_office_mojo["Release Group"].str.lower()\
                                       .str.normalize('NFKD')\
                                       .str.encode('ascii', errors='ignore')\
                                       .str.decode('utf-8')\
                                       .str.replace(" ", "_", regex=True)\
                                       .str.replace("\W", "", regex=True)
df_box_office_mojo.drop(['%', '%.1'], axis=1, inplace=True)

# Adding of exogenous columns

In [None]:
df_incl_exog = df_preprocessed.copy(deep=True)
df_incl_exog = df_incl_exog.rename({"tconst" : "id"}, axis = 1).set_index("id")
df_incl_exog.info()

## add oscar data

In [None]:
df_incl_exog["oscar_noms"] = oscar_noms
df_incl_exog["oscar_wins"] = oscar_wins

## add mojo box office

In [None]:
df_incl_exog = df_incl_exog.reset_index().merge(df_box_office_mojo, left_on=['primaryTitleFormatted', 'Year'], right_on=['Release Group', 'year'], how="left").set_index('id')
df_incl_exog.drop(['Release Group', 'year'], axis=1, inplace=True)

df_incl_exog.loc[df_incl_exog['Worldwide'] == '-', 'Worldwide'] = np.nan
df_incl_exog.loc[df_incl_exog['Domestic'] == '-', 'Domestic'] = np.nan
df_incl_exog.loc[df_incl_exog['Foreign'] == '-', 'Foreign'] = np.nan
df_incl_exog.loc[df_incl_exog['Worldwide'].notnull(), 'Worldwide'] = df_incl_exog.loc[df_incl_exog['Worldwide'].notnull(), 'Worldwide'].apply(lambda x: float(x.replace('$', '').replace(',', '')))
df_incl_exog.loc[df_incl_exog['Domestic'].notnull(), 'Domestic'] = df_incl_exog.loc[df_incl_exog['Domestic'].notnull(), 'Domestic'].apply(lambda x: float(x.replace('$', '').replace(',', '')))
df_incl_exog.loc[df_incl_exog['Foreign'].notnull(), 'Foreign'] = df_incl_exog.loc[df_incl_exog['Foreign'].notnull(), 'Foreign'].apply(lambda x: float(x.replace('$', '').replace(',', '')))

## add remake column

In [None]:
df_incl_exog = create_remake_column(df_incl_exog)

## add title language

In [None]:
# # add the language of the original title, currently commented for training data usage and not wait 15 min every time
# df_incl_exog = add_language_of_original_title(df_incl_exog)

df_added_lang = pd.read_csv('additional_data/df_added_lang.csv', index_col=0)
df_added_lang = df_added_lang.rename({"tconst" : "id"}, axis = 1).set_index("id")
df_incl_exog = df_incl_exog.join(df_added_lang['title_language'], how='left')

## add whether title is English or not

In [None]:
df_incl_exog = add_english_title_or_not(df_incl_exog)

## add movie genres

In [None]:
df_incl_exog = add_movie_genre(df_incl_exog)

## add writers and directors

In [None]:
df_incl_exog = pd.concat([df_incl_exog.T, written_and_directed.T]).T

## add TMDB & Metacritic overviews

In [None]:
df_incl_exog = pd.merge(df_incl_exog, df_TMDB, how = "left", left_index = True, right_index = True)

In [None]:
df_incl_exog["overview"].str.len().sort_values().dropna()

## save dataframe with features

In [None]:
df_incl_exog.to_csv('df_with_features.csv')

# Preparing data for classifier

Convert non-numeric columns to numeric.
We use Doc2Vec to embed each string column into n-by-128 array 

In [None]:
train_df = pd.read_csv('df_with_features.csv', index_col=0)

In [None]:
train_df_prepped = df_model_prep(train_df,'train')
train_df_prepped.head()

In [None]:
# df_model_prep function for demonstration purposes
#
# from py_files.d2v_embed import d2v_embed
# import pandas as pd
# import math

# def df_model_prep(df, filename):
    
#     try:
#         print("Looking for pre made file...")
#         return pd.read_csv(f"{filename}_df_with_features_fully_processed_read_for_model.csv", index_col = 0)
#     except:
#         print("No file found, creating a new one")
    
#     prim_title_df = d2v_embed(df['primaryTitle'])
#     orig_title_df = d2v_embed(df['originalTitle'])
#     prim_title_formatted_df = d2v_embed(df['primaryTitleFormatted'])
#     title_formatted_df = d2v_embed(df['titleFormatted'])
#     genres_df = d2v_embed(df['genres'])

#     # just encode languages into ints for this column
#     df['title_language'] = pd.factorize(df['title_language'])[0]

#     df.drop(columns = df.select_dtypes(include='object').columns, inplace=True)

#     # dealing with (some) nan values
#     for index, row in df.iterrows():
#         # For missing startYear or endYear entries, insert the other, if it exists.
#         if math.isnan(row['startYear']):
#             if not math.isnan(row['endYear']):
#                 df.at[index,'startYear']=df.at[index,'endYear']
#         if math.isnan(row['endYear']):
#             if not math.isnan(row['startYear']):
#                 df.at[index,'endYear']=df.at[index,'startYear']

#         # For missing oscar_noms and oscar_wins, insert 0
#         if math.isnan(row['oscar_noms']):
#             df.at[index,'oscar_noms'] = 0
#         if math.isnan(row['oscar_wins']):
#             df.at[index,'oscar_wins'] = 0

#     df['numVotes'] = df['numVotes'].fillna(df['numVotes'].mean(skipna=True))
#     df['runtimeMinutes'] = df['runtimeMinutes'].fillna(df['runtimeMinutes'].mean(skipna=True))
    
#     df['title_language'] = pd.factorize(df['title_language'])[0]
    
#     df = df.join(prim_title_df)
#     df = df.join(orig_title_df)
#     df = df.join(prim_title_formatted_df)
#     df = df.join(title_formatted_df)
#     df = df.join(genres_df)
    
#     df.to_csv(f"{filename}_df_with_features_fully_processed_read_for_model.csv")
    
#     return df

In [None]:
# d2v_embed function for demonstration purposes
# 
# from gensim.models.doc2vec import Doc2Vec, TaggedDocument
# from nltk.tokenize import word_tokenize
# import multiprocessing as mp
# from tqdm import tqdm
# import pandas as pd
# import math

# def d2v_embed(df_col, max_epochs = 100, vec_size = 128, alpha = 0.025):
    
#     df_col = df_col.fillna(" ")
#     df_col = df_col.str.lower()\
#                    .str.normalize('NFKD')\
#                    .str.encode('ascii', errors='ignore')\
#                    .str.decode('utf-8')\
#                    .str.replace("\W", " ", regex=True)
    
#     tagged_data = [TaggedDocument(words=word_tokenize(_d.lower()), tags=[str(i)]) for i, _d in enumerate(df_col)]

#     model = Doc2Vec(vector_size=vec_size,
#                     alpha=alpha, 
#                     min_alpha=0.00025,
#                     min_count=1,
#                     dm =1,
#                     workers = mp.cpu_count())
  
#     model.build_vocab(tagged_data)

#     for epoch in tqdm(range(max_epochs)):
#     #     print('iteration {0}'.format(epoch))
#         model.train(tagged_data,
#                     total_examples=model.corpus_count,
#                     epochs=model.epochs)
#         # decrease the learning rate
#         model.alpha -= 0.0002
#         # fix the learning rate, no decay
#         model.min_alpha = model.alpha
    
#     # save model
#     model.save(f"doc2vec_model_{df_col.name}.model")
    
#     #return df with doc embeddings
#     return pd.DataFrame([model.docvecs[i] for i in range(len(df_col))], 
#                         index = df_col.index,
#                         columns = [f"{df_col.name}_{i}" for i in range(vec_size)])

In [None]:
# df_processor_enrichment function for demonstration purposes
# 
# import json
# import numpy as np
# import pandas as pd
# from itertools import groupby

# from py_files.writer_director_to_one_hot import writer_director_to_one_hot
# from py_files.add_merge_begin_end_year import merge_start_end_year
# from py_files.load_box_office_data import load_and_aggregate_box_office
# from py_files.add_remake_feature import create_remake_column
# from py_files.add_langoriginaltitle_feature import add_language_of_original_title
# from py_files.add_ENvsNonEN_feature import add_english_title_or_not
# from py_files.add_movie_genre_feature import add_movie_genre

# from py_files.d2v_embed import d2v_embed
# from sklearn.model_selection import train_test_split
# import lightgbm as lgb
# from sklearn.metrics import accuracy_score
# import math

# def df_processor_enrichment(filename):
    
#     try:
#         print("Looking for pre made file...")
#         return pd.read_csv(f"{filename}_df_with_features.csv", index_col = 0)
#     except:
#         print("File not found, creating a new one..")
              
#     df_original = pd.read_csv(filename, index_col=0)
#     # df_original.head()

#     # start the preprocessing
#     df_preprocessed = df_original.replace("\\N", np.nan)
#     df_preprocessed["primaryTitleFormatted"] = df_preprocessed["primaryTitle"].str.lower()\
#                                                                               .str.normalize('NFKD')\
#                                                                               .str.encode('ascii', errors='ignore')\
#                                                                               .str.decode('utf-8')\
#                                                                               .str.replace(" ", "_", regex=True)\
#                                                                               .str.replace("\W", "", regex=True)

#     # merge endYear into beginYear when beginYear is not available --> rename Year
#     df_preprocessed = merge_start_end_year(df_preprocessed)

#     # set the datatypes of the dataframe correctly
#     df_preprocessed['Year'] = df_preprocessed['Year'].astype(int)
#     df_preprocessed['runtimeMinutes'] = df_preprocessed['runtimeMinutes'].astype(float)

#     # df_preprocessed.info()


#     oscars = pd.read_csv("additional_data/oscars.csv")

#     oscars["film"] = oscars["film"].str.lower()\
#                                    .str.normalize('NFKD')\
#                                    .str.encode('ascii', errors='ignore')\
#                                    .str.decode('utf-8')\
#                                    .str.replace(" ", "_", regex=True)\
#                                    .str.replace("\W", "", regex=True)

#     # Counting oscar nominations and wins per movie
#     oscar_noms = pd.merge(df_preprocessed, oscars, left_on = "primaryTitleFormatted", right_on = "film").groupby("tconst")["winner"].count()
#     oscar_wins = pd.merge(df_preprocessed, oscars, left_on = "primaryTitleFormatted", right_on = "film").groupby("tconst")["winner"].sum()


#     # Find writers and directors per movie and combine the two
#     written_and_directed = (writer_director_to_one_hot("writers") + writer_director_to_one_hot("directors")).fillna(0).astype(int).loc[df_preprocessed['tconst']]


#     df_box_office_mojo = load_and_aggregate_box_office()

#     # process the 'release group' (read movie title) in the same way as the formatted title
#     df_box_office_mojo["Release Group"] = df_box_office_mojo["Release Group"].str.lower()\
#                                            .str.normalize('NFKD')\
#                                            .str.encode('ascii', errors='ignore')\
#                                            .str.decode('utf-8')\
#                                            .str.replace(" ", "_", regex=True)\
#                                            .str.replace("\W", "", regex=True)
#     df_box_office_mojo.drop(['%', '%.1'], axis=1, inplace=True)


#     df_incl_exog = df_preprocessed.copy(deep=True)
#     df_incl_exog = df_incl_exog.rename({"tconst" : "id"}, axis = 1).set_index("id")
#     # df_incl_exog.info()


#     df_incl_exog["oscar_noms"] = oscar_noms
#     df_incl_exog["oscar_wins"] = oscar_wins

#     df_incl_exog = df_incl_exog.reset_index().merge(df_box_office_mojo, left_on=['primaryTitleFormatted', 'Year'], right_on=['Release Group', 'year'], how="left").set_index('id')
#     df_incl_exog.drop(['Release Group', 'year'], axis=1, inplace=True)

#     df_incl_exog.loc[df_incl_exog['Worldwide'] == '-', 'Worldwide'] = np.nan
#     df_incl_exog.loc[df_incl_exog['Domestic'] == '-', 'Domestic'] = np.nan
#     df_incl_exog.loc[df_incl_exog['Foreign'] == '-', 'Foreign'] = np.nan
#     df_incl_exog.loc[df_incl_exog['Worldwide'].notnull(), 'Worldwide'] = df_incl_exog.loc[df_incl_exog['Worldwide'].notnull(), 'Worldwide'].apply(lambda x: float(x.replace('$', '').replace(',', '')))
#     df_incl_exog.loc[df_incl_exog['Domestic'].notnull(), 'Domestic'] = df_incl_exog.loc[df_incl_exog['Domestic'].notnull(), 'Domestic'].apply(lambda x: float(x.replace('$', '').replace(',', '')))
#     df_incl_exog.loc[df_incl_exog['Foreign'].notnull(), 'Foreign'] = df_incl_exog.loc[df_incl_exog['Foreign'].notnull(), 'Foreign'].apply(lambda x: float(x.replace('$', '').replace(',', '')))


#     df_incl_exog = create_remake_column(df_incl_exog)

#     # # add the language of the original title, currently commented for training data usage and not wait 15 min every time
#     # df_incl_exog = add_language_of_original_title(df_incl_exog)

#     df_added_lang = pd.read_csv('additional_data/df_added_lang.csv', index_col=0)
#     df_added_lang = df_added_lang.rename({"tconst" : "id"}, axis = 1).set_index("id")
#     df_incl_exog = df_incl_exog.join(df_added_lang['title_language'], how='left')

#     df_incl_exog = add_english_title_or_not(df_incl_exog)
#     df_incl_exog = add_movie_genre(df_incl_exog)
#     df_incl_exog = pd.concat([df_incl_exog.T, written_and_directed.T]).T
#     df_incl_exog.to_csv(f"{filename}_df_with_features.csv")
    
#     return df_incl_exog

# Evaluating classifier

In [None]:
model_lgbm = lgb.LGBMClassifier(objective='binary',
                                learning_rate=0.01,
                                num_iterations=1000,
                                feature_fraction=0.8,
                                verbosity=1,
                                random_state=17)
model_lgbm.fit(train_df_prepped.loc[:, train_df_prepped.columns != 'label'],
              train_df_prepped['label'],
              eval_metric='logloss')

# Predicting

## Add and process train and valid data

In [None]:
valid_df = df_processor_enrichment('validation_hidden.csv')
valid_df.head()

In [None]:
valid_df_prepped = df_model_prep(valid_df, 'valid')
valid_df_prepped.head()

In [None]:
test_df = df_processor_enrichment('test_hidden.csv')
test_df.head()

In [None]:
test_df_prepped = df_model_prep(test_df, 'test')
test_df_prepped.head()

In [None]:
val_preds_lgbm = model_lgbm.predict(valid_df_prepped)
test_preds_lgbm = model_lgbm.predict(test_df_prepped)

In [None]:
with open('val_preds_lgbm.txt', 'w+') as f:
    for val in val_preds_lgbm:
        f.write(f"{str(val)}\n")

In [None]:
with open('test_preds_lgbm.txt', 'w+') as f:
    for val in test_preds_lgbm:
        f.write(f"{str(val)}\n")