#  Movies datasets analysis


### Import required packages

In [2]:
from pyspark.sql import SparkSession

#VectorAssembler provides transformer to convert Dataframe columns into vectors
#StandarScalar is for useful fetures selection using technics like (ChiSquare)
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Bisecting k-means clustering
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline

# useful functions for String datatype analysis
#StringIndexer provides transformer to convert string labels into numerical values
from pyspark.ml.feature import StringIndexer, VectorIndexer, RegexTokenizer, StopWordsRemover

# multiclass classification Metrics library, knowing that movies are clustered based on multiple criterion
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Processing so;plex structures (i.e genre,)
#Useful methods for date manipulations with pyspark 
from pyspark.sql.types import *
from datetime import datetime

### Spark session

In [3]:
spark = SparkSession.builder.appName("film").getOrCreate()

In [69]:
#load data from the data1 repertory location already uploaded in HDFS
#movies-metadata
meta = spark.read.csv("./data1/movies_metadata.csv",
                       header = True, inferSchema = True)

#ratings
rati = spark.read.csv("./data1/ratings_small.csv",
                       header = True, inferSchema = True)



In [6]:
meta.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

# Bisecting K-means

Ratings dataset represents a historical data source suitable for use and track users to build a CFRS. Thus, merging the latter dataset with a rich movie description within movies_metadata (see previous schema) helps to give more insights on users' preferences.


In [71]:

# pyspark "join" function

#we neeed to agree on 'movie ID' as 'id' on both datasets
rati = rati.selectExpr("userId as userId", "movieId as id", "rating as rating", "timestamp as timestamp")
movi_rati = meta.join(rati, on=['id'], how='inner') 
movi_rati.printSchema()

 


root
 |-- id: string (nullable = true)
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

Preparing the datasets for PySpark ML 


In [22]:
#converting release dates to spark datetime
func = udf (lambda x: datetime.strptime(x, '%m/%%d/%Y'), DateType())
movi_rati = movi_rati.withColumn("release_date", func(col("release_date")))

In [30]:
# visualize some data from the combines dataset
movi_rati.describe().select("id", "budget", "popularity", "revenue", "vote_count", "vote_average", "userId", "timestamp").show(50)

+------------------+--------------------+-----------------+--------------------+------------------+------------------+------------------+--------------------+
|                id|              budget|       popularity|             revenue|        vote_count|      vote_average|            userId|           timestamp|
+------------------+--------------------+-----------------+--------------------+------------------+------------------+------------------+--------------------+
|             44925|               44678|            41364|               40461|             41775|             40643|             44925|               44925|
| 6006.031761264895|1.9859276239357177E7|8.153184489652464| 7.902478949780445E7| 654.7698384201086|114753.16447609093| 345.8632164718976|1.0899410248034725E9|
|15930.725435031307| 3.798504925606181E7|6.880673692190476|1.8142134682936063E8|1215.9350993372047| 5512893.569973916|194.84668982664172|1.8803306535208604E8|
|          1.574392|                 0.0|     

In [75]:
#selecting useful features for our model
movi_rati = movi_rati.select("id", "popularity", "vote_count", "vote_average", "userId", "rating")

In [77]:
movi_rati.printSchema()

root
 |-- id: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [80]:
[trainingData, testData] = movi_rati.randomSplit([0.7, 0.3])

#vector assembler to transform feature columns into vector
assembler = VectorAssembler(inputCols = ["popIndexed", "uiIndexed", "vaIndexed", "vcIndexed", "idIndexed"],
                            outputCol = "features")

#The VectorIndexer will automatically transform categorical attributes to numerical values
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=50)


## It is important to handle string indexer exceptions to fit the pipeliner and transform the raw data #####
ratIndex = StringIndexer(inputCol="rating", outputCol="label")
ratIndex.setHandleInvalid("keep")

popIndex = StringIndexer(inputCol="popularity", outputCol="popIndexed")
popIndex.setHandleInvalid("keep")

idIndex = StringIndexer(inputCol="id", outputCol="idIndexed")
idIndex.setHandleInvalid("keep")

vcIndex = StringIndexer(inputCol="vote_count", outputCol="vcIndexed")
vcIndex.setHandleInvalid("keep")

vaIndex= StringIndexer(inputCol="vote_average", outputCol="vaIndexed")
vaIndex.setHandleInvalid("keep")

uiIndex= StringIndexer(inputCol="userId", outputCol="uiIndexed")
uiIndex.setHandleInvalid("keep")
                       
# estimator for bisecting k-means model
bis-km = BisectingKMeans().setK(6).setSeed(1)

# ML pipeline to assemble transformers and estimators
pipeline = Pipeline(stages=[popIndex, uiIndex, vaIndex, vcIndex, ratIndex, idIndex, assembler, bis-km])                       


                       

In [81]:
#train the model
model = pipeline.fit(trainingData)

#test the classification model using the test data
prediction = model.transform(testData)

prediction.select('id', 'userId', '', '').show(20)

+----+--------------------+--------------------+--------------------+------+------+----------+---------+---------+---------+-----+---------+--------------------+----------+
|  id|          popularity|          vote_count|        vote_average|userId|rating|popIndexed|uiIndexed|vaIndexed|vcIndexed|label|idIndexed|            features|prediction|
+----+--------------------+--------------------+--------------------+------+------+----------+---------+---------+---------+-----+---------+--------------------+----------+
| 101| he becomes the u...|Leon: The Profess...|If you want a job...|    56|   4.0|      48.0|    242.0|     33.0|     50.0|  0.0|     52.0|[48.0,242.0,33.0,...|         4|
| 101| he becomes the u...|Leon: The Profess...|If you want a job...|    83|   3.5|      48.0|    184.0|     33.0|     50.0|  4.0|     52.0|[48.0,184.0,33.0,...|         3|
| 101| he becomes the u...|Leon: The Profess...|If you want a job...|   388|   4.0|      48.0|     20.0|     33.0|     50.0|  0.0|     

In [82]:
# normalize features with standard scaler 
from pyspark.ml.feature import StandardScaler

stdscaler = StandardScaler().setInputCol("features").setOutputCol("normalized_features")
prediction=stdscaler.fit(prediction).transform(prediction)

In [88]:
# chisquareselector for relevant features selection
from pyspark.ml.feature import ChiSqSelector
chisq = ChiSqSelector(featuresCol='normalized_features', outputCol='Selected_features', labelCol='label',
                   fpr=0.005)
#predictions = chi.fit(predictions).transform(train)
test=chisq.fit(prediction).transform(prediction)
test.select("Selected_features", "userId", "label", "prediction").show(10, truncate=False)

+---------------------------------------------------------------------------------------------------+------+-----+----------+
|Selected_features                                                                                  |userId|label|prediction|
+---------------------------------------------------------------------------------------------------+------+-----+----------+
|[1.41475420797037,1.3583889992187068,3.440803240680316,1.5824534467200047,1.5278703946488534]      |56    |0.0  |4         |
|[1.41475420797037,1.032824693620835,3.440803240680316,1.5824534467200047,1.5278703946488534]       |83    |4.0  |3         |
|[1.41475420797037,0.11226355365443857,3.440803240680316,1.5824534467200047,1.5278703946488534]     |388   |0.0  |1         |
|[1.41475420797037,0.8419766524082893,3.440803240680316,1.5824534467200047,1.5278703946488534]      |434   |1.0  |3         |
|[1.41475420797037,0.07297130987538508,3.440803240680316,1.5824534467200047,1.5278703946488534]     |624   |4.0  |1   

In [89]:
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'label', predictionCol = 'prediction', metricName='accuracy')

p = prediction.withColumn("prediction", prediction["prediction"].cast("double"))

accuracy = evaluator.evaluate(p)

print("Classification Error = %g" % (1.0 - accuracy))

Classification Error = 0.829102


### Shutdown the spark session

It is necessary to shutdown the spark session once all tasks are completed.

In [1]:
#invoke the method stop to shutdown the spark session
spark.stop()

NameError: name 'spark' is not defined