In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

In [None]:
!apt-get install openjdk-11-jdk-headless -qq> /dev/null

In [None]:
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
!pip install koalas

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl (390 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m390.8/390.8 KB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: koalas
Successfully installed koalas-1.8.2


In [None]:
!pip install --no-deps spark-df-profiling-new

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting spark-df-profiling-new
  Downloading spark-df-profiling-new-1.1.14.tar.gz (86 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 KB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: spark-df-profiling-new
  Building wheel for spark-df-profiling-new (setup.py) ... [?25l[?25hdone
  Created wheel for spark-df-profiling-new: filename=spark_df_profiling_new-1.1.14-py3-none-any.whl size=92076 sha256=339502614585355a8ae227440abe8c3ff65fa7f9ba5e78eaada70f211701a51f
  Stored in directory: /root/.cache/pip/wheels/ff/bf/de/f525f8bee6fc166a4216204bcf64477c8f152a380dfa8b5cd0
Successfully built spark-df-profiling-new
Installing collected packages: spark-df-profiling-new
Successfully installed spark-df-profiling-new-1.1.14


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import os

In [None]:
# set spark folder to the system path env
import os
os.environ["JAVA_home"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import databricks.koalas as ks
# set koalas
ks.set_option('compute.ops_on_diff_frames', True)



### Read dataset from Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
mkdir data

In [None]:
cp /content/gdrive/MyDrive/'Colab Notebooks'/dataBDF2/data.zip /content/data/data.zip

In [None]:
!unzip data/data.zip -d data

Archive:  data/data.zip
  inflating: data/credits.csv        
  inflating: data/keywords.csv       
  inflating: data/links_small.csv    
  inflating: data/movies_metadata.csv  
  inflating: data/ratings_small.csv  


In [None]:
from IPython.utils import encoding
from IPython.utils.py3compat import encode
# read files
df_credits = spark.read.options(quote="\"", escape="\"",multiline=True).csv('data/credits.csv', inferSchema=True, header=True).distinct()
df_keywords = spark.read.options(quote="\"", escape="\"",multiline=True).csv('data/keywords.csv', inferSchema=True, header=True).distinct()
df_links = spark.read.options(quote="\"", escape="\"",multiline=True).csv('data/links_small.csv', inferSchema=True, header=True).distinct()
df_movies_metadata = spark.read.options(quote="\"", escape="\"",multiline=True).csv('data/movies_metadata.csv', inferSchema=True, header=True).distinct()
df_ratings_small = spark.read.options(quote="\"", escape="\"",multiline=True).csv('data/ratings_small.csv', inferSchema=True, header=True).distinct()

In [None]:
df_movies_metadata = df_movies_metadata.na.drop(subset=["vote_average"]) #Remove a few wrong data rows

In [None]:
movies = df_movies_metadata.to_koalas()

# Data preparation

Some features contain a string of a list of dictionaries and need to be processed to be used properly for the regression model

In [None]:
#example:
import ast
lst = ast.literal_eval(df_keywords.select("keywords").first()[0])
l=[]
for e in lst:
  if e.keys() not in l:
    l.append(e.keys())

print(l)

[dict_keys(['id', 'name'])]


In [None]:
lst

[{'id': 476, 'name': 'self-fulfilling prophecy'},
 {'id': 703, 'name': 'detective'},
 {'id': 1470, 'name': 's.w.a.t.'},
 {'id': 2231, 'name': 'drug dealer'},
 {'id': 3597, 'name': 'evisceration'},
 {'id': 3857, 'name': 'lust and impulsiveness'},
 {'id': 3927, 'name': 'rage and hate'},
 {'id': 3932, 'name': 'pride and vanity'},
 {'id': 4138, 'name': 'immoderateness'},
 {'id': 4142, 'name': 'insomnia'},
 {'id': 5340, 'name': 'investigation'},
 {'id': 6125, 'name': 'pension'},
 {'id': 6149, 'name': 'police'},
 {'id': 10714, 'name': 'serial killer'}]

In [None]:
# keep names in a list
names = [d['name'] for d in lst]
print(names)


['self-fulfilling prophecy', 'detective', 's.w.a.t.', 'drug dealer', 'evisceration', 'lust and impulsiveness', 'rage and hate', 'pride and vanity', 'immoderateness', 'insomnia', 'investigation', 'pension', 'police', 'serial killer']


In [None]:
from pyspark.sql.functions import udf

def extract_names(keywords):
    import ast
    keywords_list = ast.literal_eval(keywords) # remove the string wrapped on the list
    names = [d['name'] for d in keywords_list] # get each name of dictionaries directly in a list
    return names

extract_names_udf = udf(extract_names) # apply transformation to each row of the dataframe


In [None]:
df_extract_kw = df_keywords.withColumn('keywords_names', extract_names_udf(df_keywords['keywords']))

In [None]:
df_extract_kw.show(10,truncate=False)

+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df_extract_kw = df_extract_kw.drop('keywords')

In [None]:
df_extract_credits = df_credits.withColumn('cast_names', extract_names_udf(df_credits['cast']))
df_extract_credits = df_extract_credits.withColumn('crew_names', extract_names_udf(df_extract_credits['crew']))
df_extract_credits = df_extract_credits.drop(*['cast','crew'])

In [None]:
df_extract_metadata = df_movies_metadata.withColumn('genres_names', extract_names_udf(df_movies_metadata['genres']))

In [None]:
df_extract_metadata.select('genres','genres_names').show(3,truncate=False) # confirm successful operation

+------------------------------------------------------------------+-------------------+
|genres                                                            |genres_names       |
+------------------------------------------------------------------+-------------------+
|[{'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]   |[Drama, Romance]   |
|[{'id': 10751, 'name': 'Family'}, {'id': 12, 'name': 'Adventure'}]|[Family, Adventure]|
|[{'id': 10752, 'name': 'War'}, {'id': 18, 'name': 'Drama'}]       |[War, Drama]       |
+------------------------------------------------------------------+-------------------+
only showing top 3 rows



In [None]:
df_movies_metadata.select('belongs_to_collection').show(20,truncate=False) # not interesting info to use for regression

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|belongs_to_collection                                                                                                                                                           |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|null                                                                                                                                                                            |
|null                                                                                                                                                                            |
|null                                                                                                    

In [None]:
df_extract_metadata = df_extract_metadata.withColumn('production_countries_names', extract_names_udf(df_extract_metadata['production_countries']))
df_extract_metadata = df_extract_metadata.withColumn('production_companies_names', extract_names_udf(df_extract_metadata['production_companies']))
df_extract_metadata = df_extract_metadata.withColumn('spoken_languages_names', extract_names_udf(df_extract_metadata['spoken_languages']))
df_extract_metadata = df_extract_metadata.withColumn('spoken_languages_names', extract_names_udf(df_extract_metadata['spoken_languages']))

In [None]:
df_extract_metadata = df_extract_metadata.drop(*['production_countries', 'production_companies', 'spoken_languages'])

### TF-IDF vectorization on the lists of names

In [None]:
from pyspark.ml.feature import HashingTF, IDF

# Create an instance of the HashingTF class
hashingTF = HashingTF(inputCol="keywords_names", outputCol="raw_features")

# Transform the data to create the raw features
raw_features = hashingTF.transform(df_extract_kw)


raw_features.show()
# I can't get the problem here. Maybe the list got wrapped on a string again ? IDK

IllegalArgumentException: ignored

In [None]:

# Create an instance of the IDF class
idf = IDF(inputCol="raw_features", outputCol="keywords_vectors")

# Fit the idf model on the raw features and transform the data
vectors = idf.fit(raw_features).transform(raw_features)