In [None]:
from secrets import OMDB_API_KEY, HADOOP_USER_NAME, SPARK_URI, HADOOP_NAMENODE

In [None]:
import os
os.environ['HADOOP_USER_NAME'] = HADOOP_USER_NAME

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from hdfs import InsecureClient
import omdb
from omdb import OMDBClient
import pyspark.sql.types as t

In [None]:
client_hdfs = InsecureClient(f'http://{HADOOP_NAMENODE}:50070', user=HADOOP_USER_NAME)

In [None]:
# get preprocessed opusdata filename
hdfs_path = "/processed/opusdata_00.csv"

filename = [f for f in client_hdfs.list(hdfs_path) if f.endswith('.csv')][0]

In [None]:
sc = SparkContext(SPARK_URI)
sparkSession = (
    SparkSession.builder.appName("example-pyspark-read-and-write")
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
    .getOrCreate()
)

In [None]:
# Read from hdfs
opusdata = sparkSession.read.csv(
    f"hdfs://{HADOOP_NAMENODE}:8020{hdfs_path}/{filename}", header=True, inferSchema=True
)


# sc = spark.sparkContext

In [None]:
opusdata.show()

In [None]:
opusdata.count()

In [None]:
omdb.set_default('apikey', OMDB_API_KEY)
client = OMDBClient(apikey=OMDB_API_KEY)

In [None]:
requested_flat_fields = ['runtime', 'director', 'actors', 'country', 'awards', 'imdb_votes', 'imdb_id']
requested_nested_fields = {'ratings': ['Internet Movie Database', 'Rotten Tomatoes', 'Metacritic']}

In [None]:
def format_source(source):
    return '_'.join(source.split()).lower()

In [None]:
def construct_schema(requested_flat_fields, requested_nested_fields):
    schema = []
    for key in requested_flat_fields:
        schema.append(t.StructField(key, t.StringType(), True))
    for key, values in requested_nested_fields.items():
        for value in values:
            schema.append(t.StructField(f'{key}_{format_source(value)}', t.StringType(), True))
            
    return t.StructType(schema)

In [None]:
schema = construct_schema(requested_flat_fields, requested_nested_fields)

In [None]:
@F.udf(returnType=schema)
def omdb_data(arguments):

    movie_name, year = arguments
    print(movie_name)
    result = client.get(title=movie_name, year=year, fullplot=True, tomatoes=True)
    
    result_to_keep = {}
    
    for key in requested_flat_fields:
        result_to_keep[key] = result.get(key, None)
        
    for nested_field in requested_nested_fields:
        requested_nested_list = requested_nested_fields[nested_field]
        nested_list = result.get(nested_field, None)
        
        if nested_list:
            for nested_dict in nested_list:
                source = nested_dict.get('source', None)

                if source:
                    value = nested_dict.get('value', None)
                    
                    if source in requested_nested_list:

                        source_formatted = format_source(source)
                        key = f'{nested_field}_{source_formatted}'

                        result_to_keep[key] = value
                        
            requested_sources = requested_nested_fields[nested_field]
            for requested_source in requested_sources:
                source_formatted = format_source(requested_source)
                key = f'{nested_field}_{source_formatted}'
                if not key in result_to_keep:
                    result_to_keep[key] = None
                    
        else:
            requested_sources = requested_nested_fields[nested_field]
            for requested_source in requested_sources:
                source_formatted = format_source(requested_source)
                key = f'{nested_field}_{source_formatted}'
                result_to_keep[key] = None
                
            


    #print(result_to_keep.keys(), result_to_keep.values())
    return t.Row(*list(result_to_keep.keys()))(*list(result_to_keep.values()))
    

In [None]:
opusdata_omdb = opusdata.withColumn(
    "omdb_data", F.explode(F.array(omdb_data(F.array("movie_name", "production_year"))))
)

In [None]:
opusdata_fields_name = [field.name for field in opusdata.schema.fields]

In [None]:
opusdata_ombd = opusdata_omdb.select(*opusdata_fields_name, 'omdb_data.*')

In [None]:
opusdata_ombd_id_not_null = opusdata_ombd.na.drop(
    subset=[
        "imdb_id",
        "ratings_internet_movie_database",
        "ratings_rotten_tomatoes",
        "ratings_metacritic",
    ]
)

In [None]:
opusdata_ombd_no_id_duplicated = opusdata_ombd_id_not_null.dropDuplicates(['imdb_id'])

### Processing awards

In [None]:
@F.udf(returnType=t.IntegerType())
def general_awards_by_keyword(awards_str, keyword):
    n_nominations = awards_str.split(keyword)[0].split()[-1]
    try:
        n_nominations_int = int(n_nominations)
    except ValueError as e:
        n_nominations_int = 0
    return n_nominations_int

In [None]:
keywords_general = ['nomination', 'win']

In [None]:
awards_name = ['golden globe', 'oscar', 'bafta']

In [None]:
@F.udf(returnType=t.IntegerType())
def won_by_keyword(awards_str, award_name):
    awards_str = awards_str.lower()
    
    try:
        won_or_nominated = awards_str.split(award_name)[0].split()[-2]
        if won_or_nominated == "won":
            n_won = int(awards_str.split(award_name)[0].split()[-1])
        else:
            n_won = 0
    except IndexError as e:
        n_won = 0

    return n_won

In [None]:
@F.udf(returnType=t.IntegerType())
def nominated_by_keyword(awards_str, award_name):
    awards_str = awards_str.lower()
   
    try:
        won_or_nominated = awards_str.split(award_name)[0].split()[-2]
        if won_or_nominated == "for":
            n_nominated = int(awards_str.split(award_name)[0].split()[-1])
        else:
            n_nominated = 0
    except IndexError as e:
        n_nominated = 0

    return n_nominated

In [None]:
opusdata_awards_categorized = opusdata_ombd_no_id_duplicated
for general_keyword in keywords_general:
    opusdata_awards_categorized = opusdata_awards_categorized.withColumn(
        f'{general_keyword}s', general_awards_by_keyword("awards", F.lit(general_keyword))
    )

In [None]:
for award_name in awards_name:
    award_name_formatted = '_'.join(award_name.split())
    opusdata_awards_categorized = opusdata_awards_categorized.withColumn(
        f'won_{award_name_formatted}s', won_by_keyword("awards", F.lit(award_name))
    )
    opusdata_awards_categorized = opusdata_awards_categorized.withColumn(
        f'nominated_{award_name_formatted}s', nominated_by_keyword("awards", F.lit(award_name))
    )

In [None]:
opusdata_awards_categorized = opusdata_awards_categorized.drop('awards')

### Scale rankings [0..1]

In [None]:
# scale imdb ratings
opusdata_scaled_ratings = opusdata_awards_categorized.withColumn(
    "ratings_internet_movie_database", F.split(F.col("ratings_internet_movie_database"), "/").cast("array<float>") \
   
)
opusdata_scaled_ratings = opusdata_scaled_ratings.withColumn(
    "ratings_internet_movie_database", F.col("ratings_internet_movie_database")[0] / 10
)

In [None]:
# scale rotten tomatoes ratings
opusdata_scaled_ratings = opusdata_scaled_ratings.withColumn(
    "ratings_rotten_tomatoes", F.split(F.col("ratings_rotten_tomatoes"), "%").cast("array<int>") \
   
)
opusdata_scaled_ratings = opusdata_scaled_ratings.withColumn(
    "ratings_rotten_tomatoes", F.col("ratings_rotten_tomatoes")[0] / 100
)

In [None]:
# scale metacritic ratings
opusdata_scaled_ratings = opusdata_scaled_ratings.withColumn(
    "ratings_metacritic", F.split(F.col("ratings_metacritic"), "/").cast("array<int>") \
   
)
opusdata_scaled_ratings = opusdata_scaled_ratings.withColumn(
    "ratings_metacritic", F.col("ratings_metacritic")[0] / 100
)

In [None]:
# remove comma from imdb_votes
opusdata_votes = opusdata_scaled_ratings.withColumn(
    "imdb_votes", F.regexp_replace("imdb_votes", ",", "")
)

### Encode actors

In [None]:
unique_actors = set()

for i, row in enumerate(opusdata_votes.rdd.collect()):
    actors = row['actors']
    unique_actors.update([a.strip().lower() for a in actors.split(',')])    

In [None]:
actors_id_dict = {actor: i for i, actor in enumerate(unique_actors)}

In [None]:
schema_actors = t.StructType([
    t.StructField('actor_id_0', t.IntegerType(), True),
    t.StructField('actor_id_1', t.IntegerType(), True),
    t.StructField('actor_id_2', t.IntegerType(), True),
    t.StructField('actor_id_3', t.IntegerType(), True)
])

In [None]:
@F.udf(returnType=schema_actors)
def encode_authors(actors_str):
    actors = [a.strip().lower() for a in actors_str.split(',')]
    
    ids = []
    for a in actors:
        ids.append(actors_id_dict[a])
    
    ids = sorted(ids) + (4-len(ids))*[None]
        
    return t.Row('actor_id_0', 'actor_id_1', 'actor_id_2', 'actor_id_3')(*ids)

In [None]:
opusdata_actors = opusdata_votes.withColumn(
    "actors_ids", F.explode(F.array(encode_authors("actors")))
)

opusdata_fields_name = [
    field.name
    for field in opusdata_actors.schema.fields
    if field.name != "actors_ids" and field.name != "actors"
]
opusdata_actors = opusdata_actors.select(*opusdata_fields_name, "actors_ids.*")

### Runtime - remove "min"

In [None]:
opusdata_runtime = opusdata_actors.withColumn(
    "runtime", F.split(F.col("runtime"), " ").cast("array<string>")
)
opusdata_runtime = opusdata_runtime.withColumn("runtime", F.col("runtime")[0])

### Keep only first country

In [None]:
opusdata_first_country = opusdata_runtime.withColumn(
    "country", F.split(F.col("country"), ",").cast("array<string>") \
   
)
opusdata_first_country = opusdata_first_country.withColumn(
    "country", F.col("country")[0]
)

### Keep only first director

In [None]:
opusdata_first_director = opusdata_first_country.withColumn(
    "director", F.split(F.col("director"), ",").cast("array<string>") \
   
)
opusdata_first_director = opusdata_first_director.withColumn(
    "director", F.col("director")[0]
)

### Get "success" [1]
[1] _Rhee, Travis Ginmu, and Farhana Zulkernine. "Predicting movie box office profitability: A neural network approach." 2016 15th IEEE International Conference on Machine Learning and Applications (ICMLA). IEEE, 2016._

Profit = (1⁄2 * total_box_office) – production_budget

In [None]:
@F.udf(returnType=t.IntegerType())
def success(arguments):
    total_box_office, production_budget = arguments
    
    profit = (0.5 * total_box_office) - production_budget
    profit_censored = 1 if profit > 0 else 0
    return profit_censored

In [None]:
opusdata_success = opusdata_first_director.withColumn(
    "success", success(F.array("total_box_office", "production_budget")))

In [None]:
#opusdata_success.write.csv('opusdata_omdb2.csv', header=True, sep=',')

In [None]:
opusdata_success.repartition(1).write.mode("overwrite").option('header',True).csv(
    f"hdfs://{HADOOP_NAMENODE}:8020/processed/opusdata_omdb_00.csv"
)