In [1]:
!pip install pyspark[sql] tqdm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark[sql]
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 62.8 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=ffd72ba2fe171997bebe392b6c7a199081d78228ad3cfd38ac1e0e7325955667
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
import pandas as pd
import numpy as np

from tqdm import tqdm

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("mocommender") \
    .getOrCreate()

In [4]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [5]:
DRIVE_PATH='/content/drive/MyDrive/data'

metadata = spark.read.option("header",True).option("mode",'DROPMALFORMED').csv(DRIVE_PATH+'/movies_metadata.csv')
links = spark.read.option("header",True).option("mode",'DROPMALFORMED').csv(DRIVE_PATH+'/links.csv')
keywords = spark.read.option("header",True).option("mode",'DROPMALFORMED').csv(DRIVE_PATH+'/keywords.csv')
credits = spark.read.option("header",True).option("mode",'DROPMALFORMED').csv(DRIVE_PATH+'/credits.csv')

In [6]:
# we won't use these columns.
metadata = metadata.drop('adult', 'belongs_to_collection', 'budget', 'homepage', 'poster_path', 'revenue', 'status', 'tagline', 'video')

In [7]:
metadata.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------+-----+---------+-----------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------

In [None]:
metadata.count()

45572

In [None]:
# columns count and dataframe count doesn't fit, so we should drop NaN values..
metadata.describe().show()

+-------+--------------------+--------------------+--------------------+------------------+-----------------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+------------------+--------------------+
|summary|              genres|                  id|             imdb_id| original_language|                     original_title|            overview|          popularity|production_companies|production_countries|        release_date|             runtime|    spoken_languages|          title|      vote_average|          vote_count|
+-------+--------------------+--------------------+--------------------+------------------+-----------------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+------------------+--------------------+
|  coun

In [8]:
metadata = metadata.dropna()
metadata.count()

43149

In [None]:
# now, columns count and dataframe count fit each other.
metadata.describe().show(truncate=False)

+-------+------------------------------------------------------+-------------------------------------------------------------+-------------------------------------+-----------------+-----------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------------+----------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+--------------------------------------+
|summary|genres                                                |

In [9]:
# some values are corrupted. If we look at min id value, it must be integer, not string.
# therefore we should drop these values, because there are corrupted.

# drop non-int id 
metadata = metadata.withColumn('id',
                               F.col('id').cast('integer')) \
                   .filter(F.col('id').isNotNull()) 


# drop non-int vote-count
metadata = metadata.withColumn('vote_count',
                               F.col('vote_count').cast('integer')) \
                   .filter(F.col('vote_count').isNotNull())                


# drop non-double vote-average
metadata = metadata.withColumn('vote_average',
                               F.col('vote_average').cast('double')) \
                   .filter(F.col('vote_average').isNotNull())                

                   
# drop non-double runtime
metadata = metadata.withColumn('runtime',
                               F.col('runtime').cast('double')) \
                   .filter(F.col('runtime').isNotNull())                

# drop non-double runtime
metadata = metadata.withColumn('popularity',
                               F.col('popularity').cast('double')) \
                   .filter(F.col('popularity').isNotNull())                

# change release_date format
metadata = metadata.withColumn('release_date',
                    F.date_format(F.col('release_date'), 'MM-dd-yyyy')) \
                    .filter(F.col('release_date').isNotNull())

metadata.count()

40534

In [None]:
# the ids are duplicated. so we must drop these duplicated record.
metadata.groupBy('id').count().orderBy('id').where(F.col('count') > 1).show()

+------+-----+
|    id|count|
+------+-----+
|  4912|    2|
|  5511|    2|
| 11115|    2|
| 12600|    2|
| 13209|    2|
| 14788|    2|
| 15028|    2|
| 18440|    2|
| 22649|    2|
| 23305|    2|
| 25541|    2|
| 42495|    2|
| 69234|    2|
| 77221|    2|
| 84198|    2|
| 97995|    2|
| 99080|    2|
|105045|    2|
|109962|    2|
|110428|    2|
+------+-----+
only showing top 20 rows



In [10]:
metadata = metadata.dropDuplicates(['id'])
metadata.count()

40506

In [None]:
# there is no duplicated value now
metadata.groupBy('id').count().where(F.col('count') > 1).orderBy('id').show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



In [11]:
# the title could contain non-ASCII value.
metadata = metadata.withColumn('original_title',
                    F.decode(F.encode(F.col('original_title'), 'ascii'), 'ascii')) \
                    .withColumn('title',
                    F.decode(F.encode(F.col('title'), 'ascii'), 'ascii'))
metadata.count()

40506

In [None]:
# original title sometimes contains non-ascii value, so we must drop these records.
metadata.select(['original_title', 'title']).where(F.col('original_title') != F.col('title')).show()

+--------------------+--------------------+
|      original_title|               title|
+--------------------+--------------------+
|Varjoja paratiisissa| Shadows in Paradise|
|  LaLehet Al HaMayim|       Walk on Water|
|              ??????|       Magnetic Rose|
|      Hable con ella|         Talk to Her|
|                  ??|                Hero|
|            ????????|Nausica? of the V...|
|   Elementarteilchen|The Elementary Pa...|
|          Las Hurdes|  Land Without Bread|
| Todo sobre mi madre| All About My Mother|
|Elsker dig for evigt|         Open Hearts|
|          Lola rennt|        Run Lola Run|
|Trois couleurs : ...|  Three Colors: Blue|
|Trois couleurs : ...| Three Colors: White|
|Trois couleurs : ...|   Three Colors: Red|
|Italiensk for beg...|Italian for Begin...|
|    ? ?? ?? ?? ??? ?|Spring, Summer, F...|
|           Bez Konca|              No End|
|           Przypadek|        Blind Chance|
|               ?????|   Princess Mononoke|
|            ????????|       Spi

In [12]:
metadata = metadata.drop(F.col('original_title'))
metadata.count()

40506

In [None]:
metadata.describe().show()

+-------+--------------------+------------------+---------+-----------------+--------------------+------------------+------------------------+--------------------+------------+-----------------+--------------------+--------------------+------------------+------------------+
|summary|              genres|                id|  imdb_id|original_language|            overview|        popularity|    production_companies|production_countries|release_date|          runtime|    spoken_languages|               title|      vote_average|        vote_count|
+-------+--------------------+------------------+---------+-----------------+--------------------+------------------+------------------------+--------------------+------------+-----------------+--------------------+--------------------+------------------+------------------+
|  count|               40506|             40506|    40506|            40506|               40506|             40506|                   40506|               40506|       40506

In [None]:
metadata.where(F.col('id') == 2).show()

+--------------------+---+---------+-----------------+--------------------+----------+--------------------+--------------------+------------+-------+--------------------+-----+------------+----------+
|              genres| id|  imdb_id|original_language|            overview|popularity|production_companies|production_countries|release_date|runtime|    spoken_languages|title|vote_average|vote_count|
+--------------------+---+---------+-----------------+--------------------+----------+--------------------+--------------------+------------+-------+--------------------+-----+------------+----------+
|[{'id': 18, 'name...|  2|tt0094675|               fi|Taisto Kasurinen ...|  3.860491|[{'name': 'Villea...|[{'iso_3166_1': '...|  10-21-1988|   69.0|[{'iso_639_1': 'f...|Ariel|         7.1|        44|
+--------------------+---+---------+-----------------+--------------------+----------+--------------------+--------------------+------------+-------+--------------------+-----+------------+-------

In [13]:
# the movies don't have any genres, we must drop these records.
#metadata = metadata.where(F.col('genres') == '[]').drop()
#metadata.where(F.col('genres') == '{}').show()

@F.udf(returnType=T.BooleanType())
def is_valid_json(x):
  import ast

  try:
    return len(ast.literal_eval(x)) > 0
  except:
    return False

  return False

# 2645 record is not valid json format or empty,
# so we should drop these values from dataframe.
metadata.where(~is_valid_json(F.col('genres'))).count()
metadata = metadata.where(is_valid_json(F.col('genres')))

metadata.count()

38582

In [14]:
# drop companies if it's not valid
#metadata.where(~is_valid_json(F.col('production_companies'))).count()
metadata = metadata.where(is_valid_json(F.col('production_companies')))
metadata.count()

29996

In [15]:
# drop countries if it's not valid
#metadata.where(~is_valid_json(F.col('production_countries'))).count()
metadata = metadata.where(is_valid_json(F.col('production_countries')))
metadata.count()

29567

In [16]:
# drop languages if it's not valid
#metadata.where(~is_valid_json(F.col('spoken_languages'))).count()
metadata = metadata.where(is_valid_json(F.col('spoken_languages')))
metadata.count()

29001

In [None]:
metadata.show()

+--------------------+---+---------+-----------------+--------------------+----------+--------------------+--------------------+------------+-------+--------------------+--------------------+------------+----------+
|              genres| id|  imdb_id|original_language|            overview|popularity|production_companies|production_countries|release_date|runtime|    spoken_languages|               title|vote_average|vote_count|
+--------------------+---+---------+-----------------+--------------------+----------+--------------------+--------------------+------------+-------+--------------------+--------------------+------------+----------+
|[{'id': 18, 'name...|  3|tt0092149|               fi|An episode in the...|   2.29211|[{'name': 'Villea...|[{'iso_3166_1': '...|  10-16-1986|   76.0|[{'iso_639_1': 'e...| Shadows in Paradise|         7.1|        35|
|[{'id': 80, 'name...|  5|tt0113101|               en|It's Ted the Bell...|  9.026586|[{'name': 'Mirama...|[{'iso_3166_1': '...|  12-09-

In [17]:
movie_genre_df = metadata.select(['id', 'genres'])
movie_country_df = metadata.select(['id', 'production_countries'])
movie_spoken_languages_df = metadata.select(['id', 'spoken_languages'])
movie_company_df = metadata.select(['id', 'production_companies'])

metadata = metadata \
  .drop(F.col('genres')) \
  .drop(F.col('production_countries')) \
  .drop(F.col('spoken_languages')) \
  .drop(F.col('production_companies'))

In [None]:
metadata.show()

+---+---------+-----------------+--------------------+----------+------------+-------+--------------------+------------+----------+
| id|  imdb_id|original_language|            overview|popularity|release_date|runtime|               title|vote_average|vote_count|
+---+---------+-----------------+--------------------+----------+------------+-------+--------------------+------------+----------+
|  3|tt0092149|               fi|An episode in the...|   2.29211|  10-16-1986|   76.0| Shadows in Paradise|         7.1|        35|
|  5|tt0113101|               en|It's Ted the Bell...|  9.026586|  12-09-1995|   98.0|          Four Rooms|         6.5|       539|
|  6|tt0107286|               en|While racing to a...|  5.538671|  10-15-1993|  110.0|      Judgment Night|         6.4|        79|
| 12|tt0266543|               en|Nemo, an adventur...| 25.497794|  05-30-2003|  100.0|        Finding Nemo|         7.6|      6292|
| 13|tt0109830|               en|A man with a low ...| 48.307194|  07-06-199

In [None]:
metadata.describe().show()

+-------+------------------+---------+-----------------+--------------------+------------------+------------+------------------+--------------------+------------------+------------------+
|summary|                id|  imdb_id|original_language|            overview|        popularity|release_date|           runtime|               title|      vote_average|        vote_count|
+-------+------------------+---------+-----------------+--------------------+------------------+------------+------------------+--------------------+------------------+------------------+
|  count|             29001|    29001|            29001|               29001|             29001|       29001|             29001|               29001|             29001|             29001|
|   mean| 98598.56622185442|     null|             null|                null|3.8357964918106204|        null|  98.4836729767939|            Infinity| 5.845039826212988|153.70549291403745|
| stddev|111532.78892649393|     null|             null|    

In [18]:
metadata_ids = metadata.select('id').rdd.flatMap(lambda x: x).collect()
len(metadata_ids)

29001

---
## RELATION BETWEEN MOVIE-GENRE

In [None]:
# we should convert genres json str to dataframe
movie_genre_df.show()

+---+--------------------+
| id|              genres|
+---+--------------------+
|  3|[{'id': 18, 'name...|
|  5|[{'id': 80, 'name...|
|  6|[{'id': 28, 'name...|
| 12|[{'id': 16, 'name...|
| 13|[{'id': 35, 'name...|
| 15|[{'id': 9648, 'na...|
| 16|[{'id': 18, 'name...|
| 17|[{'id': 27, 'name...|
| 19|[{'id': 18, 'name...|
| 20|[{'id': 18, 'name...|
| 22|[{'id': 12, 'name...|
| 26|[{'id': 18, 'name...|
| 27|[{'id': 18, 'name...|
| 35|[{'id': 16, 'name...|
| 64|[{'id': 18, 'name...|
| 76|[{'id': 18, 'name...|
| 78|[{'id': 878, 'nam...|
| 81|[{'id': 12, 'name...|
| 85|[{'id': 12, 'name...|
| 86|[{'id': 18, 'name...|
+---+--------------------+
only showing top 20 rows



In [None]:
# GENRE SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
genre_schema = T.StructType([
    T.StructField('id', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
])

movie_genre_rdd = movie_genre_df.select('genres').rdd.map(lambda x: x[0])


# convert json string to dataframe
genre_df = spark.read.json(movie_genre_rdd, genre_schema) \
                 .distinct() \
                 .orderBy('id')
genre_df.show()

+-----+---------------+
|   id|           name|
+-----+---------------+
|   12|      Adventure|
|   14|        Fantasy|
|   16|      Animation|
|   18|          Drama|
|   27|         Horror|
|   28|         Action|
|   35|         Comedy|
|   36|        History|
|   37|        Western|
|   53|       Thriller|
|   80|          Crime|
|   99|    Documentary|
|  878|Science Fiction|
| 9648|        Mystery|
|10402|          Music|
|10749|        Romance|
|10751|         Family|
|10752|            War|
|10769|        Foreign|
|10770|       TV Movie|
+-----+---------------+



In [None]:
schema = T.ArrayType(
    T.StructType(genre_schema)
)

# RELATION BETWEEN MOVIE-GENRE
R_movie_genre_df = movie_genre_df \
                    .select(['id', 'genres']) \
                    .withColumn('genres',
                                F.from_json(F.col('genres'),
                                            schema).getField('id')) \
                    .orderBy(F.col('id'))

R_movie_genre_df.show()

+---+--------------------+
| id|              genres|
+---+--------------------+
|  2|            [18, 80]|
|  3|            [18, 35]|
|  5|            [80, 35]|
|  6|        [28, 53, 80]|
| 11|       [12, 28, 878]|
| 12|         [16, 10751]|
| 13|     [35, 18, 10749]|
| 14|                [18]|
| 15|          [9648, 18]|
| 16|     [18, 80, 10402]|
| 17|      [27, 53, 9648]|
| 18|[12, 14, 28, 53, ...|
| 19|           [18, 878]|
| 20|         [18, 10749]|
| 21|                [99]|
| 22|        [12, 14, 28]|
| 24|            [28, 80]|
| 25|         [18, 10752]|
| 26|                [18]|
| 27|  [18, 10402, 10749]|
+---+--------------------+
only showing top 20 rows



---
## RELATION BETWEEN MOVIE-COMPANY

In [None]:
movie_company_df.show(5, truncate=False)

+---+-------------------------------------------------------------------------------------------------------------------------------------------+
|id |production_companies                                                                                                                       |
+---+-------------------------------------------------------------------------------------------------------------------------------------------+
|3  |[{'name': 'Villealfa Filmproduction Oy', 'id': 2303}]                                                                                      |
|5  |[{'name': 'Miramax Films', 'id': 14}, {'name': 'A Band Apart', 'id': 59}]                                                                  |
|6  |[{'name': 'Universal Pictures', 'id': 33}, {'name': 'Largo Entertainment', 'id': 1644}, {'name': 'JVC Entertainment Networks', 'id': 4248}]|
|12 |[{'name': 'Pixar Animation Studios', 'id': 3}]                                                                         

In [None]:
# COMPANY SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
company_schema = T.StructType([
    T.StructField('id', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
])

movie_company_rdd = movie_company_df.select('production_companies').rdd.map(lambda x: x[0])


# convert json string to dataframe
company_df = spark.read.json(movie_company_rdd, company_schema) \
                 .distinct() \
                 .orderBy('id') \
                 .dropna()
company_df.show()

+---+--------------------+
| id|                name|
+---+--------------------+
|  1|           Lucasfilm|
|  2|Walt Disney Pictures|
|  3|Pixar Animation S...|
|  4|  Paramount Pictures|
|  5|   Columbia Pictures|
|  6|  RKO Radio Pictures|
|  7|          DreamWorks|
|  8|  Fine Line Features|
|  9|             Gaumont|
| 10|           Highlight|
| 11|       WingNut Films|
| 12|     New Line Cinema|
| 13|   Universal Studios|
| 14|       Miramax Films|
| 15|          Lama Films|
| 16|   United King Films|
| 17|Warner Bros. Ente...|
| 18|        Gracie Films|
| 19|          Film Roman|
| 20| Rough Draft Studios|
+---+--------------------+
only showing top 20 rows



In [None]:
# RELATION BETWEEN MOVIE-COMPANY
schema = T.ArrayType(
    T.StructType(company_schema)
)

R_movie_company_df = movie_company_df \
                    .select(['id', 'production_companies']) \
                    .withColumn('production_companies',
                                F.from_json(F.col('production_companies'),
                                            schema).getField('id')) \
                    .orderBy(F.col('id'))

R_movie_company_df.show()

+---+--------------------+
| id|production_companies|
+---+--------------------+
|  2|        [2303, 2396]|
|  3|              [2303]|
|  5|            [14, 59]|
|  6|    [33, 1644, 4248]|
| 11|            [1, 306]|
| 12|                 [3]|
| 13|                 [4]|
| 14|          [27, 2721]|
| 15|          [6, 11447]|
| 16|[8, 76, 119, 157,...|
| 17|[47, 248, 2268, 2...|
| 18|              [5, 9]|
| 19|          [4, 12372]|
| 20|            [49, 77]|
| 21|             [13723]|
| 22|            [2, 130]|
| 24|     [14, 59, 39121]|
| 25|[33, 1522, 14440,...|
| 26|            [15, 16]|
| 27|               [163]|
+---+--------------------+
only showing top 20 rows



---
## RELATION BETWEEN MOVIE-COUNTRY

In [None]:
movie_country_df.show(5, truncate=False)

+---+-------------------------------------------------------------------------------------------------+
|id |production_countries                                                                             |
+---+-------------------------------------------------------------------------------------------------+
|3  |[{'iso_3166_1': 'FI', 'name': 'Finland'}]                                                        |
|5  |[{'iso_3166_1': 'US', 'name': 'United States of America'}]                                       |
|6  |[{'iso_3166_1': 'JP', 'name': 'Japan'}, {'iso_3166_1': 'US', 'name': 'United States of America'}]|
|12 |[{'iso_3166_1': 'US', 'name': 'United States of America'}]                                       |
|13 |[{'iso_3166_1': 'US', 'name': 'United States of America'}]                                       |
+---+-------------------------------------------------------------------------------------------------+
only showing top 5 rows



In [None]:
# COUNTRY SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
country_schema = T.StructType([
    T.StructField('iso_3166_1', T.StringType(), False),
    T.StructField('name', T.StringType(), False),
])

movie_country_rdd = movie_country_df.select('production_countries').rdd.map(lambda x: x[0])


# convert json string to dataframe
country_df = spark.read.json(movie_country_rdd, country_schema) \
                 .distinct() \
                 .withColumnRenamed('iso_3166_1', 'id') \
                 .orderBy('id') \
                 .dropna()
country_df.show()

+---+--------------------+
| id|                name|
+---+--------------------+
| AE|United Arab Emirates|
| AF|         Afghanistan|
| AL|             Albania|
| AM|             Armenia|
| AN|Netherlands Antilles|
| AO|              Angola|
| AQ|          Antarctica|
| AR|           Argentina|
| AT|             Austria|
| AU|           Australia|
| AW|               Aruba|
| AZ|          Azerbaijan|
| BA|Bosnia and Herzeg...|
| BE|             Belgium|
| BF|        Burkina Faso|
| BG|            Bulgaria|
| BM|             Bermuda|
| BN|   Brunei Darussalam|
| BO|             Bolivia|
| BR|              Brazil|
+---+--------------------+
only showing top 20 rows



In [None]:
# RELATION BETWEEN MOVIE-COUNTRY
schema = T.ArrayType(
    T.StructType(country_schema)
)

R_movie_country_df = movie_country_df \
                    .select(['id', 'production_countries']) \
                    .withColumn('production_countries',
                                F.from_json(F.col('production_countries'),
                                            schema).getField('iso_3166_1')) \
                    .orderBy(F.col('id'))

R_movie_country_df.show(truncate=False)

+---+------------------------------------------------+
|id |production_countries                            |
+---+------------------------------------------------+
|2  |[FI]                                            |
|3  |[FI]                                            |
|5  |[US]                                            |
|6  |[JP, US]                                        |
|11 |[US]                                            |
|12 |[US]                                            |
|13 |[US]                                            |
|14 |[US]                                            |
|15 |[US]                                            |
|16 |[AR, DK, FI, FR, DE, IS, IT, NL, NO, SE, GB, US]|
|17 |[DE, GB]                                        |
|18 |[FR]                                            |
|19 |[DE]                                            |
|20 |[CA, ES]                                        |
|21 |[US]                                            |
|22 |[US] 

---
## RELATION BETWEEN MOVIE-LANGUAGE

In [None]:
movie_spoken_languages_df.show(5, truncate=False)

+---+----------------------------------------------------------------------------------------------------------------------+
|id |spoken_languages                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------+
|3  |[{'iso_639_1': 'en', 'name': 'English'}, {'iso_639_1': 'fi', 'name': 'suomi'}, {'iso_639_1': 'sv', 'name': 'svenska'}]|
|5  |[{'iso_639_1': 'en', 'name': 'English'}]                                                                              |
|6  |[{'iso_639_1': 'en', 'name': 'English'}]                                                                              |
|12 |[{'iso_639_1': 'en', 'name': 'English'}]                                                                              |
|13 |[{'iso_639_1': 'en', 'name': 'English'}]                                                                              |


In [None]:
# LANGUAGE SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
language_schema = T.StructType([
    T.StructField('iso_639_1', T.StringType(), False),
    T.StructField('name', T.StringType(), False),
])

movie_language_rdd = movie_spoken_languages_df.select('spoken_languages').rdd.map(lambda x: x[0])


# convert json string to dataframe
language_df = spark.read.json(movie_language_rdd, language_schema) \
                 .distinct() \
                 .withColumnRenamed('iso_639_1', 'id') \
                 .orderBy('id') \
                 .dropna()
language_df.show()

+---+---------------+
| id|           name|
+---+---------------+
| ab|               |
| af|      Afrikaans|
| am|               |
| ar|        العربية|
| ay|               |
| az|     Azərbaycan|
| be|беларуская мова|
| bg| български език|
| bi|               |
| bm|     Bamanankan|
| bn|          বাংলা|
| bo|               |
| br|               |
| bs|       Bosanski|
| ca|         Català|
| ce|               |
| cn|广州话 / 廣州話|
| cr|               |
| cs|          Český|
| cy|        Cymraeg|
+---+---------------+
only showing top 20 rows



In [None]:
# RELATION BETWEEN MOVIE-LANGUAGE
schema = T.ArrayType(
    T.StructType(language_schema)
)

R_movie_language_df = movie_spoken_languages_df \
                    .select(['id', 'spoken_languages']) \
                    .withColumn('spoken_languages',
                                F.from_json(F.col('spoken_languages'),
                                            schema).getField('iso_639_1')) \
                    .orderBy(F.col('id'))

R_movie_language_df.show(truncate=False)

+---+------------------------+
|id |spoken_languages        |
+---+------------------------+
|2  |[fi, de]                |
|3  |[en, fi, sv]            |
|5  |[en]                    |
|6  |[en]                    |
|11 |[en]                    |
|12 |[en]                    |
|13 |[en]                    |
|14 |[en]                    |
|15 |[en]                    |
|16 |[en]                    |
|17 |[en, cy]                |
|18 |[en, sv, de]            |
|19 |[xx]                    |
|20 |[en]                    |
|21 |[en]                    |
|22 |[en]                    |
|24 |[en, ja, fr]            |
|25 |[en, es, ar, la]        |
|26 |[ar, en, de, he, it, tr]|
|27 |[en]                    |
+---+------------------------+
only showing top 20 rows



---
## RELATION BETWEEN MOVIE-KEYWORD

In [None]:
keywords.show(10, truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id   |keywords                                                                                                            

In [None]:
# KEYWORD SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
keyword_schema = T.StructType([
    T.StructField('id', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
])

movie_keyword_rdd = keywords.select('keywords').rdd.map(lambda x: x[0])


# convert json string to dataframe
keyword_df = spark.read.json(movie_keyword_rdd, keyword_schema) \
                 .distinct() \
                 .orderBy('id') \
                 .dropna()

keyword_df = keyword_df.where(keyword_df.id.isin(metadata_ids))
keyword_df.show()

+---+----------------+
| id|            name|
+---+----------------+
| 30|      individual|
| 74|         germany|
| 75|      gunslinger|
| 83|saving the world|
| 90|           paris|
|100|            slum|
|107| barcelona spain|
|108|    transvestism|
|110|          venice|
|113|      holy grail|
|128|   love triangle|
|139|     anti terror|
|186|    christianity|
|187|           islam|
|211|     bureaucracy|
|212|  london england|
|213|     upper class|
|220|          berlin|
|222|   schizophrenia|
|233|           japan|
+---+----------------+
only showing top 20 rows



In [None]:
# RELATION BETWEEN MOVIE-KEYWORD
schema = T.ArrayType(
    T.StructType(keyword_schema)
)

R_movie_keyword_df = keywords \
                    .select(['id', 'keywords']) \
                    .withColumn('keywords',
                                F.from_json(F.col('keywords'),
                                            schema).getField('id')) \
                    .orderBy(F.col('id'))
R_movie_keyword_df = R_movie_keyword_df.where(R_movie_keyword_df.id.isin(metadata_ids))
#R_movie_keyword_df.show(truncate=False)
R_movie_keyword_df.count()

29533

---
## RELATION BETWEEN MOVIE-CAST

In [80]:
@F.udf
def change_cast(value):
    if value:
      return value.replace("'profile_path': None", "'profile_path': null")

In [90]:
cast_df = credits.select(['id', 'cast'])

+---+----+
| id|cast|
+---+----+
+---+----+



In [91]:
cast_df.show(truncate=False)

+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
metadata_joined_df = metadata.join(cast_df, ["id"])
metadata_cast_df = metadata_joined_df.select(['id', 'cast'])

In [51]:
# CAST SCHEMA
# -----------------
# id    SERIAL
# name  VARCHAR
cast_schema = T.StructType([
    T.StructField('cast_id', T.IntegerType(), True),
    T.StructField('character', T.StringType(), True),
    T.StructField('credit_id', T.StringType(), False),
    T.StructField('gender', T.IntegerType(), False),
    T.StructField('id', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
    T.StructField('order', T.IntegerType(), True),
    T.StructField('profile_path', T.StringType(), True),
])

In [52]:
changed_movie_cast_df = metadata_cast_df.withColumn("cast", change_cast(F.col('cast')))

In [53]:
movie_cast_rdd = changed_movie_cast_df.select('cast').rdd.map(lambda x: x[0])

# convert json string to dataframe
mcast_df = spark.read.json(movie_cast_rdd, cast_schema) \
                 .distinct() \
                 .dropna(subset=['credit_id', 'id', 'gender', 'name'])

In [92]:
mcast_df.show()

+-------+--------------------+--------------------+------+-------+-------------------+-----+--------------------+
|cast_id|           character|           credit_id|gender|     id|               name|order|        profile_path|
+-------+--------------------+--------------------+------+-------+-------------------+-----+--------------------+
|      9|       Emily Crumble|52fe43c5c3a36847f...|     0|  38402|    Katrine Boorman|    5|/cIxijBUAvwg0QQ7n...|
|      2|Deputy Mayor Kevi...|52fe43ee9251416c7...|     2|   3036|        John Cusack|    1|/uKydQYuZ9TnCzvbQ...|
|      8|        John McClane|52fe42ffc3a36847f...|     2|     62|       Bruce Willis|    0|/2B7RySy2WMVJKKEF...|
|     33|    Waitress (voice)|591463b7c3a36842c...|     1|  84323|   Florence Stanley|   11|/vAmeCmnBkihYH2xh...|
|      3|               Rosie|52fe4236c3a36847f...|     1|   4593|               Haji|    1|/oOR9LINqRf2WYKLL...|
|     10|Mrs. Elinor Vigushin|52fe44d2c3a36847f...|     1|  56693|     Paula Laurence|  

In [86]:
schema = T.ArrayType(cast_schema, True)

@F.udf
def t(val):
  if not val:
    return None
  return [i.replace('\\', '\\\\') for i in val]


# RELATION BETWEEN MOVIE-CAST
R_movie_cast_df = changed_movie_cast_df \
                    .withColumn('cast',
                                F.from_json(F.col('cast'),
                                            schema).getField('credit_id')) \
                    .where(F.size(F.col('cast')) != 0)

R_movie_cast_df = R_movie_cast_df.withColumn('cast',
                               t(F.col('cast'))) \
                   .filter(F.col('cast').isNotNull())

R_movie_cast_df.count()

17323

---
## RELATION BETWEEN MOVIE-CREW

In [55]:
crew_df = credits.select(['id', 'crew'])
crew_df.show(truncate=False)

+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [56]:
metadata_crew_joined_df = metadata.join(crew_df, ["id"])
metadata_crew_df = metadata_crew_joined_df.select(['id', 'crew'])

metadata_crew_df.show(truncate=False)

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [81]:
# CREW SCHEMA
crew_schema = T.StructType([
    T.StructField('department', T.StringType(), False),
    T.StructField('job', T.StringType(), False),
    T.StructField('credit_id', T.StringType(), False),
    T.StructField('gender', T.IntegerType(), False),
    T.StructField('id', T.IntegerType(), False),
    T.StructField('name', T.StringType(), False),
    T.StructField('profile_path', T.StringType(), True),
])

In [82]:
changed_movie_crew_df = metadata_crew_df.withColumn("crew", change_cast(F.col('crew')))
changed_movie_crew_df.count()

17676

In [93]:
movie_crew_rdd = changed_movie_crew_df.select('crew').rdd.map(lambda x: x[0])

# convert json string to dataframe
mcrew_df = spark.read.json(movie_crew_rdd, crew_schema) \
                 .distinct() \
                 .dropna(subset=['department', 'job', 'credit_id', 'gender', 'id', 'name'])

mcrew_df.show()

+-----------------+--------------------+--------------------+------+-------+-------------------+--------------------+
|       department|                 job|           credit_id|gender|     id|               name|        profile_path|
+-----------------+--------------------+--------------------+------+-------+-------------------+--------------------+
|            Sound|Supervising Sound...|569cf7cac3a36858e...|     0|1404218|     Myron Nettinga|                null|
|        Directing|   Script Supervisor|569cf8869251415e6...|     1|1464541|      Mary A. Kelly|                null|
|              Art|      Set Decoration|52fe426ec3a36847f...|     2|   8526|       Michael Ford|                null|
|Costume & Make-Up|      Costume Design|586a62829251414e8...|     0| 959634|  Nicoletta Massone|                null|
|            Sound|Original Music Co...|52fe4332c3a36847f...|     0|  21276|   Stephen Endelman|/1ey21aHi5U3FS5iZ...|
|            Sound|Sound Re-Recordin...|569b21d19251410c

In [85]:
schema = T.ArrayType(crew_schema, True)

# RELATION BETWEEN MOVIE-CREW
R_movie_crew_df = changed_movie_crew_df \
                    .withColumn('crew',
                                F.from_json(F.col('crew'),
                                            schema, {"mode" : "PERMISSIVE"}).getField('credit_id')) \
                    .where(F.size(F.col('crew')) != 0) 
                    
R_movie_crew_df.filter(F.col('crew').isNull()).count()

0

In [87]:
exports = [#("metadata", metadata), ("genre", genre_df), ("movie_genre", R_movie_genre_df),
           #("company", company_df), ("movie_company", R_movie_company_df), 
           #("country", country_df), ("movie_country", R_movie_country_df),
           #("language", language_df), ("movie_language", R_movie_language_df),
           #("keyword", keyword_df), ("movie_keyword", R_movie_keyword_df),
           ("cast", mcast_df), ("movie_cast", R_movie_cast_df),
           ("crew", mcrew_df), ("movie_crew", R_movie_crew_df)]

In [88]:
for file_name, df in tqdm(exports):
  print(file_name)
  try:
    df.write.options(header=True).parquet(f'{DRIVE_PATH}/extracted_parquet/{file_name}')
  except Exception as e:
    print(e)
    continue


  0%|          | 0/4 [00:00<?, ?it/s]

cast


 25%|██▌       | 1/4 [00:06<00:20,  6.77s/it]

movie_cast


 50%|█████     | 2/4 [00:20<00:21, 10.68s/it]

crew


 75%|███████▌  | 3/4 [00:25<00:08,  8.09s/it]

movie_crew


100%|██████████| 4/4 [00:35<00:00,  8.85s/it]
