# Loading

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import findspark
findspark.init
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType,StringType,IntegerType,MapType,ArrayType,StructField
from pyspark.sql.functions import *
import os

# NLP
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline


In [3]:
# Initialize Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/19 23:30:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
links=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/links.csv")

keywords=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/keywords.csv")

movies_metadata=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/movies_metadata.csv")

credits=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/credits.csv")

ratings=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/ratings.csv")



ratings_sample=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("dataset/ratings_small.csv")

                                                                                

# 1.Analysis of the datasets

In [5]:
links.printSchema()
ratings.printSchema()
credits.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: integer (nullable = true)



In [6]:
movies_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: integer (nu

In [8]:
movies_metadata=movies_metadata.where(~col('id').isin('82663','162372','215848'))# suppression de lignes corrompues

#valeurs nulles sur les colonnes JSON  "production_companies" et "production_countries"
movies_metadata=movies_metadata.na.drop(subset=["production_companies","production_countries","genres"])

In [9]:
#titrage des colonnes ajoutés
movies_metadata=movies_metadata.withColumn('genres',when(col('genres')=='[]',"[{'id': 0, 'name': 'Unknown'}]").otherwise(col('genres')))\
               .withColumn('production_companies',when(col('production_companies')=='[]',"[{'name': 'Unknown', 'id': 0}]").otherwise(col('production_companies')))\
               .withColumn('production_countries',when(col('production_countries')=='[]',"[{'iso_3166_1': 'Unknown', 'name': 'Unknown'}]").otherwise(col('production_countries')))


In [10]:
datadict={'Ratings.csv':ratings,'Movies_metadata.csv':movies_metadata,'Credits.csv':credits,'Keywords.csv':keywords,'links.csv':links}
# Check if null/NaN values
for key,value in datadict.items() :
  dfStats= value.select([count(when(col(c).isNull()|isnan(c),'True')).alias(c) for c,c_type in value.dtypes if c_type not in ('timestamp','boolean')])
  print("Column stats for data file :" +key+"\n")
  dfStats.show()

Column stats for data file :Ratings.csv



                                                                                

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+

Column stats for data file :Movies_metadata.csv



                                                                                

+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+------------+----------+
|adult|belongs_to_collection|budget|genres|homepage| id|imdb_id|original_language|original_title|overview|popularity|poster_path|production_companies|production_countries|release_date|revenue|runtime|spoken_languages|status|tagline|title|vote_average|vote_count|
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+------------+----------+
|    0|                40968|     0|     0|   37680|  0|     17|               11|             0|     954|         2|        383|                   0|                   0|          84|      3|    260|           

                                                                                

+----+----+---+
|cast|crew| id|
+----+----+---+
|   0|   0|  0|
+----+----+---+

Column stats for data file :Keywords.csv



                                                                                

+---+--------+
| id|keywords|
+---+--------+
|  0|       0|
+---+--------+

Column stats for data file :links.csv

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      0|     0|   219|
+-------+------+------+



In [11]:
#Change Column data type
movies_metadata=movies_metadata.withColumn('budget',col('budget').cast('integer'))\
                               .withColumn('popularity',col('popularity').cast('float'))\
                               .withColumn('revenue',col('revenue').cast('integer'))

In [12]:
# identify duplicates based on IMDB ID
df_dup=movies_metadata.groupby('imdb_id','title','release_date', 'overview').count().filter("count > 1").show()

#Total number of duplicate rows
movies_metadata.groupby('imdb_id','title','release_date', 'overview').count().where(f.col('count')>1).select(f.sum('count')).show()

# DROP Duplicates - using drop_duplicates - RETURNS NEW DF
movies_metadata=movies_metadata.drop_duplicates(['imdb_id','title','release_date', 'overview'])

                                                                                

+---------+--------------------+------------+--------------------+-----+
|  imdb_id|               title|release_date|            overview|count|
+---------+--------------------+------------+--------------------+-----+
|tt0454792|              Bubble|  2005-09-03|Set against the b...|    2|
|tt0287635|Pokémon 4Ever: Ce...|  2001-07-06|All your favorite...|    2|
|tt1821641|        The Congress|  2013-05-16|More than two dec...|    2|
|tt0270288|Confessions of a ...|  2002-12-30|Television made h...|    2|
|tt1180333|            Blackout|  2008-12-26|Recovering from a...|    3|
|tt0022537|          The Viking|  1931-06-21|Originally called...|    2|
|tt1701210|          Black Gold|  2011-12-21|On the Arabian Pe...|    2|
|tt0499456|    Days of Darkness|  2007-01-01|When a comet stri...|    2|
|tt0084387|Nana, the True Ke...|  1983-06-13|In Zola's Paris, ...|    2|
|tt0157472|       Clockstoppers|  2002-03-17|Until now, Zak Gi...|    2|
|tt2121382|       Force Majeure|  2014-08-15|While 

[Stage 49:>                                                         (0 + 1) / 1]

+----------+
|sum(count)|
+----------+
|        59|
+----------+



                                                                                

In [13]:
movies_metadata=movies_metadata.withColumnRenamed("id","id_ori")\
                                .withColumnRenamed("poster_path","poster_path_ori")

#Attach schema to Json object column
df=movies_metadata.withColumn("belongs_to_collection_value",from_json(movies_metadata.belongs_to_collection,MapType(StringType(),StringType())))
                  

# Get the distinct Keys of Json object
key_df=df.select(explode(map_keys(col('belongs_to_collection_value')))).distinct()

# Convert Key collection object to a List
keylst=list(map(lambda row:row[0],key_df.collect()))

#Retrieve values based on Keys into a seperate column
key_cols=map(lambda f:df['belongs_to_collection_value'].getItem(f).alias(str(f)),keylst)
#df2=df2.select(col('belongs_to_collection'),*key_cols)
df=df.select(*movies_metadata.columns,*key_cols)
df.printSchema()

[Stage 57:>                                                         (0 + 4) / 4]

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id_ori: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- poster_path_ori: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: 

                                                                                

### Add JSON arrays to dataset columns

In [14]:
# Columns - production_companies , production_countries , genres have Json array values


#Step 1 - Define  schema of Json array type
schema = ArrayType(StructType([
        StructField('id', IntegerType(), nullable=False), 
        StructField('name', StringType(), nullable=False)]))

#Step 2 - UDF function to convert list to column seperated values. As Data is in Json array, extracting values based on Json keys will  produce list
convertUDF = udf(lambda s: ','.join(map(str, s)),StringType())

#Step 3 - Json parsing
df=df.withColumn("production_companies_values",when(col('production_companies')=='[]','').otherwise(convertUDF(from_json(movies_metadata.production_companies,schema).getField("name"))))\
     .withColumn("production_countries_values",convertUDF(from_json(movies_metadata.production_countries,schema).getField("name")))\
        .withColumn("genres_value",convertUDF(from_json(movies_metadata.genres,schema).getField("name")))

df.select('id_ori',"genres_value","production_companies_values","production_countries_values").show(10,False)

[Stage 63:>                                                         (0 + 1) / 1]

+------+-----------------------------+----------------------------+---------------------------+
|id_ori|genres_value                 |production_companies_values |production_countries_values|
+------+-----------------------------+----------------------------+---------------------------+
|45514 |Unknown                      |Unknown                     |France                     |
|118013|Unknown                      |Unknown                     |Unknown                    |
|65256 |Fantasy,Adventure            |ZDF Enterprises             |Australia                  |
|75015 |Foreign,Action,History,Comedy|Unknown                     |Poland                     |
|293412|Adventure,Drama              |Unknown                     |Unknown                    |
|16624 |Drama                        |Edison Manufacturing Company|United States of America   |
|82120 |Comedy                       |Lumière                     |France                     |
|129865|Documentary                  |Lu

                                                                                

### Find number movies per collection

In [15]:
df_pd_collection=df.where(col('name').isNotNull()).select(col('name')).groupBy(col('name')).count().orderBy(col('count').desc()).toPandas()
df_pd_collection.head(20)

                                                                                

Unnamed: 0,name,count
0,The Bowery Boys,29
1,Totò Collection,27
2,James Bond Collection,26
3,Zatôichi: The Blind Swordsman,26
4,The Carry On Collection,25
5,Pokémon Collection,20
6,Godzilla (Showa) Collection,16
7,Charlie Chan (Warner Oland) Collection,15
8,Dragon Ball Z (Movie) Collection,15
9,Monster High Collection,14


### Count genre movies


In [16]:
genres_schema = ArrayType(StructType([
        StructField('id', StringType(), nullable=False), 
        StructField('name', StringType(), nullable=False)]))

df_wc=df.select(explode(from_json(col('genres'),genres_schema).getField("name").alias('genres_val')))
df_wc.groupby(col('col').alias('gener_values')).count().orderBy(col('count').desc()).show()

                                                                                

+---------------+-----+
|   gener_values|count|
+---------------+-----+
|          Drama|20243|
|         Comedy|13175|
|       Thriller| 7618|
|        Romance| 6730|
|         Action| 6590|
|         Horror| 4670|
|          Crime| 4304|
|    Documentary| 3930|
|      Adventure| 3490|
|Science Fiction| 3042|
|         Family| 2767|
|        Mystery| 2464|
|        Unknown| 2441|
|        Fantasy| 2309|
|      Animation| 1930|
|        Foreign| 1619|
|          Music| 1597|
|        History| 1398|
|            War| 1322|
|        Western| 1042|
+---------------+-----+
only showing top 20 rows



# 2.Model Prediction for Revenue and Vote Average

In [17]:
movies_metadata.write.csv("utput.csv", header=True)


23/01/19 12:59:04 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

## Preprocessing

In [18]:
import six

movies = movies_metadata.select('budget','popularity','revenue','runtime','vote_average','vote_count') # The only integer available in the dataset (converting strings to number has multiple problem in pySpark)

# Correlation checking
for i in movies.columns:
    if not( isinstance(movies.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to revenue for ", i, movies.stat.corr('revenue',i))

                                                                                

Correlation to revenue for  budget 0.7602053998132995


                                                                                

Correlation to revenue for  popularity 0.46845078337846213


                                                                                

Correlation to revenue for  revenue 1.0


                                                                                

Correlation to revenue for  runtime 0.10352080949502311


                                                                                

Correlation to revenue for  vote_average 0.08389723014384019


[Stage 119:>                                                        (0 + 1) / 1]

Correlation to revenue for  vote_count 0.787544593488117


                                                                                

In [19]:
movies = movies.select('budget','popularity','revenue','vote_count')
# The features with the best correlation are kept while the other are removed

In [20]:
from pyspark.sql.functions import col,isnan, when, count
movies.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in movies.columns]
   ).show()

[Stage 122:>                                                        (0 + 1) / 1]

+------+----------+-------+----------+
|budget|popularity|revenue|vote_count|
+------+----------+-------+----------+
|     3|         3|      3|         3|
+------+----------+-------+----------+



                                                                                

In [21]:
movies= movies.filter("budget is not NULL") # Retirer les 3 lignes avec des "nulls"


In [22]:
movies.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
budget,45428,4225013.90417804,1.742889227897661E7,0,380000000
popularity,45428,2.9212739482750387,6.006825607105138,0.0,547.4883
revenue,45428,1.111883328993132E7,6.34221897251613E7,-1507002209,2068223624
vote_count,45428,109.94003698159726,491.47676043797094,0,14075


In [23]:
from  pyspark.ml.feature import VectorAssembler
from  pyspark.ml.linalg import Vectors

#creating features column
assembler = VectorAssembler(
    inputCols=('budget','popularity','vote_count'),
    outputCol='features')

In [24]:
# Create the vector and the label
output = assembler.transform(movies)
output = output.select(['features', 'revenue'])
output.show(3)

[Stage 134:>                                                        (0 + 1) / 1]

+--------------------+-------+
|            features|revenue|
+--------------------+-------+
|[2000000.0,11.528...|1275000|
|[0.0,0.0016629999...|      0|
|[0.0,0.0033869999...|      0|
+--------------------+-------+
only showing top 3 rows



                                                                                

In [25]:
#Split the dataset with a train and test set
splits = output.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

## First model : Linear Regression

In [26]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='revenue', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

[Stage 140:>                                                        (0 + 4) / 4]

23/01/19 12:59:50 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/01/19 12:59:50 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


[Stage 143:>                                                        (0 + 4) / 4]

Coefficients: [1.6117734946681739,-636298.8081020241,64036.101562541204]
Intercept: -948423.1681283715


                                                                                

In [27]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
rmseLR = trainingSummary.rootMeanSquaredError

RMSE: 34776584.676075
r2: 0.703603


## Second model : Decision Tree Regression

In [28]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'revenue')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="revenue", predictionCol="prediction", metricName="rmse")
rmseDT = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmseDT)

[Stage 167:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 3.60987e+07


                                                                                

In [29]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

[Stage 170:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 3.21096e+07


                                                                                

## Third model : Gradient Boosting Regression

In [30]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'revenue', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'revenue', 'features').show(5)

[Stage 332:>                                                        (0 + 1) / 1]

+-----------------+-------+---------+
|       prediction|revenue| features|
+-----------------+-------+---------+
|84203.46051617594|      0|(3,[],[])|
|84203.46051617594|      0|(3,[],[])|
|84203.46051617594|      0|(3,[],[])|
|84203.46051617594|      0|(3,[],[])|
|84203.46051617594|      0|(3,[],[])|
+-----------------+-------+---------+
only showing top 5 rows



                                                                                

In [31]:
gbt_evaluator = RegressionEvaluator(
    labelCol="revenue", predictionCol="prediction", metricName="rmse")
rmseGBT = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmseGBT)

[Stage 335:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 3.82867e+07


                                                                                

## Model prediction of Vote average

In [32]:
#Find correlation with the vote average 
import six

test = movies_metadata.select('budget','popularity','revenue','runtime','vote_average','vote_count')

for i in test.columns:
    if not( isinstance(test.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to vote_average for ", i, test.stat.corr('vote_average',i))

                                                                                

Correlation to vote_average for  budget 0.07351520061424012


                                                                                

Correlation to vote_average for  popularity 0.15439188401134443


                                                                                

Correlation to vote_average for  revenue 0.0838972301438402


                                                                                

Correlation to vote_average for  runtime 0.16809860474456345


                                                                                

Correlation to vote_average for  vote_average 1.0


[Stage 371:>                                                        (0 + 1) / 1]

Correlation to vote_average for  vote_count 0.12361742176783214


                                                                                

In [33]:
X = movies_metadata.select('budget','popularity','revenue','runtime','vote_average','vote_count')


In [34]:
from pyspark.sql.functions import col,isnan, when, count
X.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in X.columns]
   ).show()

[Stage 374:>                                                        (0 + 1) / 1]

+------+----------+-------+-------+------------+----------+
|budget|popularity|revenue|runtime|vote_average|vote_count|
+------+----------+-------+-------+------------+----------+
|     3|         3|      3|    260|           3|         3|
+------+----------+-------+-------+------------+----------+



                                                                                

In [35]:
X = X.filter("budget is not NULL") # Retirer les 3 lignes avec des "nulls"
X = X.filter("runtime is not NULL")

In [36]:
X.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
budget,45171,4248596.416816099,1.747554809865757E7,0,380000000
popularity,45171,2.9366332450528625,6.020362723986118,0.0,547.4883
revenue,45171,1.1182012611409975E7,6.359680953568571E7,-1507002209,2068223624
runtime,45171,94.12623143167076,38.41382404786643,0.0,1256.0
vote_average,45171,5.62961191915172,1.9086772197460182,0.0,10.0
vote_count,45171,110.54988820260787,492.80595818941293,0,14075


In [42]:
from  pyspark.ml.feature import VectorAssembler
from  pyspark.ml.linalg import Vectors

#creating features column
assembler = VectorAssembler(
    inputCols=('budget','popularity','vote_count','popularity','revenue', 'runtime','vote_count'),
    outputCol='features')

# Create the vector and the label
output = assembler.transform(X)
output = output.select(['features', 'vote_average'])
output.show(3)

[Stage 386:>                                                        (0 + 1) / 1]

+--------------------+------------+
|            features|vote_average|
+--------------------+------------+
|[2000000.0,11.528...|         6.8|
|(7,[1,3,5],[0.001...|         0.0|
|(7,[1,3,5],[0.003...|         0.0|
+--------------------+------------+
only showing top 3 rows



                                                                                

In [43]:
splits = output.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [44]:
from pyspark.ml.regression import LinearRegression
lr2 = LinearRegression(featuresCol = 'features', labelCol='vote_average', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr2_model = lr2.fit(train_df)
print("Coefficients: " + str(lr2_model.coefficients))
print("Intercept: " + str(lr2_model.intercept))

[Stage 393:>                                                        (0 + 1) / 1]

Coefficients: [0.0,0.0027468586001501995,0.0,0.0027468586001501995,0.0,0.0015289624717588933,0.0]
Intercept: 5.469648925216722


                                                                                

In [45]:
trainingSummary2 = lr2_model.summary
print("RMSE: %f" % trainingSummary2.rootMeanSquaredError)
print("r2: %f" % trainingSummary2.r2)

# Very low quality data because of the lack of interesting feature

RMSE: 1.900711
r2: 0.013672


# 3.Model Recommendation

## 3.1.Top N movies closer with a movie title

### Preprocessing

In [168]:
corrupted_id = [49788, 47596, 101838, 46770, 43524, 47110]
df = df.filter(df.id.isin(corrupted_id) == False)

In [169]:
df.select('overview').show(3)

[Stage 795:>                                                        (0 + 1) / 1]

+--------------------+
|            overview|
+--------------------+
|How I Unleashed W...|
|Henry Frankenstei...|
|James Parker and ...|
+--------------------+
only showing top 3 rows



                                                                                

In [170]:
# remove null rows
movies = df.select('title','overview')
movies.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in movies.columns]
   ).show()



+-----+--------+
|title|overview|
+-----+--------+
|    0|      29|
+-----+--------+



                                                                                

In [171]:
movies = movies.filter('title is not NULL').filter('overview is not NULL')

In [172]:
movies.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in movies.columns]
   ).show()

[Stage 804:>                                                        (0 + 1) / 1]

+-----+--------+
|title|overview|
+-----+--------+
|    0|       0|
+-----+--------+



                                                                                

### NLP Process : Tf-Idf

In [52]:
#Extracting the features
def extract_features(df):
    indexer = StringIndexer(inputCol="title", outputCol="movie_title_index")
    tokenizer = Tokenizer(inputCol="overview", outputCol="words")
    hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    pipeline = Pipeline(stages=[indexer, tokenizer, hashing_tf, idf])
    return pipeline.fit(df).transform(df)

In [173]:
#NLP Process to each movie
movie_df = extract_features(movies)

movie_df.show(3)

[Stage 819:>                                                        (0 + 1) / 1]

+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|               title|            overview|movie_title_index|               words|         rawFeatures|            features|
+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|How I Unleashed W...|How I Unleashed W...|           1217.0|[how, i, unleashe...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|        Frankenstein|Henry Frankenstei...|            936.0|[henry, frankenst...|(20,[0,3,4,5,6,7,...|(20,[0,3,4,5,6,7,...|
|  Tarzan the Ape Man|James Parker and ...|           2342.0|[james, parker, a...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
only showing top 3 rows



                                                                                

In [179]:
name_movie = input("insert a movie name : ")
#Blue in the Face

In [182]:
# We select the feature of the movie selected earlier in the dataset
movie_researched = movie_df.filter("title ==  '"f'{name_movie}'"'").select('features').collect()[0][0]

# We create the cosine similarity in a User Defined Function
cosine_similarity = udf(lambda x: float(x.dot(movie_researched)/(x.norm(2)*movie_researched.norm(2))), DoubleType())

# We compare the feature of the selected movie with each movie of the dataset
movie_df_sortie = movie_df.withColumn("cosine_similarity", cosine_similarity("features"))

                                                                                

In [183]:
# Top N : replace the limit with the N value chosen.
movie_df_sortie_topN = movie_df_sortie.sort("cosine_similarity",ascending=False).limit(5)

movie_df_sortie_topN.select("title","cosine_similarity").show()

[Stage 843:>                                                        (0 + 4) / 4]

+--------------------+------------------+
|               title| cosine_similarity|
+--------------------+------------------+
|    Blue in the Face|1.0000000000000002|
|Kevin Smith: Too ...|0.9535951056498836|
|    A Tale of Winter|0.9407873230248185|
|    Gamera the Brave|0.9394527940304581|
|     Tenchi Forever!|0.9365659002154718|
+--------------------+------------------+



                                                                                

## 3.2 Predict user rating

In [82]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

ratings_sample.select('userId','movieId','rating','timestamp').show(2)

(training, test) = ratings_sample.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
+------+-------+------+----------+
only showing top 2 rows



                                                                                

23/01/19 13:24:15 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

Root-mean-square error = 1.1319439945018415


In [83]:
# We use the user 253 and MovieId 3 because this user have never rated it
data = [(253, 3)]
#Transformation into dataframe 
test2 = spark.createDataFrame(data, ["userId", "movieId"])

pred = model.transform(test2)
pred.show()

                                                                                

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|   253|      3| 4.2175503|
+------+-------+----------+



In [None]:
userId = input('Choose the userID')
movieId = input('Choose the movieId')
data = [(userId, movieId)]
#Transformation into dataframe 
test2 = spark.createDataFrame(data, ["userId", "movieId"])

pred = model.transform(test2)
pred.show()