In [0]:
ratings_f = "dbfs:/mnt/rawdata/movie/ratings.csv"
movies_f = "dbfs:/mnt/rawdata/movie/movies.csv"

In [0]:
%fs

ls dbfs:/mnt/rawdata/movie

path,name,size,modificationTime
dbfs:/mnt/rawdata/movie/genome-tags.csv,genome-tags.csv,18103,1679442810000
dbfs:/mnt/rawdata/movie/movies.csv,movies.csv,3038099,1679442811000
dbfs:/mnt/rawdata/movie/ratings.csv,ratings.csv,678260987,1679442886000


In [0]:
from pyspark.sql.types  import *
movies_w_genres_df_schema = StructType(
    [
        StructField('ID', IntegerType()),
        StructField('title', StringType()),
        StructField('genres', StringType())
    ]
)

movies_df_schema = StructType(
    [
        StructField('ID', IntegerType()),
        StructField('title', StringType())
    ]
)

In [0]:
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
inferSchema=False).schema(movies_df_schema).load(movies_f)

movies_w_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
inferSchema=False).schema(movies_w_genres_df_schema).load(movies_f)
    

In [0]:
movies_df.show(4, truncate = False)
movies_w_genres_df.show(4, truncate = False)

+---+------------------------+
|ID |title                   |
+---+------------------------+
|1  |Toy Story (1995)        |
|2  |Jumanji (1995)          |
|3  |Grumpier Old Men (1995) |
|4  |Waiting to Exhale (1995)|
+---+------------------------+
only showing top 4 rows

+---+------------------------+-------------------------------------------+
|ID |title                   |genres                                     |
+---+------------------------+-------------------------------------------+
|1  |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995) |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+---+------------------------+-------------------------------------------+
only showing top 4 rows



In [0]:
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID','title', regexp_extract('title',r'\((\d+)\)',1).alias('year') )

In [0]:
movies_with_year_df.show(4, truncate = False)

+---+------------------------+----+
|ID |title                   |year|
+---+------------------------+----+
|1  |Toy Story (1995)        |1995|
|2  |Jumanji (1995)          |1995|
|3  |Grumpier Old Men (1995) |1995|
|4  |Waiting to Exhale (1995)|1995|
+---+------------------------+----+
only showing top 4 rows



In [0]:
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

year,count
2015.0,2513
2016.0,2488
2014.0,2406
2017.0,2374
2013.0,2173
2018.0,2034
2012.0,1978
2011.0,1838
2009.0,1723
2010.0,1691


In [0]:
ratings_df_schema = StructType(
    [
        StructField('userID', IntegerType()),
        StructField('movieID', IntegerType()),
        StructField('rating', DoubleType())
    ]
)


In [0]:
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
inferSchema=False).schema(ratings_df_schema).load(ratings_f)
ratings_df.show(4)

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
|     1|    665|   5.0|
+------+-------+------+
only showing top 4 rows



In [0]:
ratings_df.cache()
movies_df.cache()

Out[18]: DataFrame[ID: int, title: string]

In [0]:
from pyspark.sql import functions as F

movie_ids_w_avg_ratings_df = ratings_df.groupBy('movieID').agg(F.count(ratings_df.rating).alias('count'),
                                                              F.avg(ratings_df.rating).alias('average'))
print('movie_ids_w_avg_ratings_df:')
movie_ids_w_avg_ratings_df.show(4, truncate=False)

movie_ids_w_avg_ratings_df:
+-------+-----+------------------+
|movieID|count|average           |
+-------+-----+------------------+
|1088   |11935|3.25002094679514  |
|1580   |40308|3.5817083457378187|
|3175   |14659|3.6077836141619484|
|44022  |4833 |3.2593627146699773|
+-------+-----+------------------+
only showing top 4 rows



In [0]:
movie_ids_w_avg_ratings_df = movie_ids_w_avg_ratings_df.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID')
movie_ids_w_avg_ratings_df.show(4, truncate=False)

+-------+-----+------------------+--------------------------------+
|movieID|count|average           |title                           |
+-------+-----+------------------+--------------------------------+
|1088   |11935|3.25002094679514  |Dirty Dancing (1987)            |
|1580   |40308|3.5817083457378187|Men in Black (a.k.a. MIB) (1997)|
|3175   |14659|3.6077836141619484|Galaxy Quest (1999)             |
|44022  |4833 |3.2593627146699773|Ice Age 2: The Meltdown (2006)  |
+-------+-----+------------------+--------------------------------+
only showing top 4 rows



In [0]:
movies_w_500_ratings_or_more = movie_ids_w_avg_ratings_df.filter(movie_ids_w_avg_ratings_df['count'] >= 500).orderBy('average'
                                                                                                                    ,ascending=False)
movies_w_500_ratings_or_more.show(4, truncate=False)

+-------+-----+-----------------+--------------------------------+
|movieID|count|average          |title                           |
+-------+-----+-----------------+--------------------------------+
|171011 |1124 |4.483096085409253|Planet Earth II (2016)          |
|159817 |1747 |4.464796794504865|Planet Earth (2006)             |
|318    |81482|4.413576004516335|Shawshank Redemption, The (1994)|
|170705 |1356 |4.398598820058997|Band of Brothers (2001)         |
+-------+-----+-----------------+--------------------------------+
only showing top 4 rows



In [0]:
seed=4
(split_60_df, split_a_20_df, split_b_20_df ) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, Validation: {1}, test: {2}\n'.format(
    training_df.count(), validation_df.count(), test_df.count()
))

training_df.show(4, truncate=False)
validation_df.show(4, truncate=False)
test_df.show(4, truncate=False)

Training: 14999112, Validation: 4999908, test: 5001075

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|1     |306    |3.5   |
|1     |307    |5.0   |
|1     |665    |5.0   |
|1     |899    |3.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|1     |1250   |4.0   |
|1     |2011   |2.5   |
|1     |2161   |3.5   |
|1     |2351   |4.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|1     |296    |5.0   |
|1     |1217   |3.5   |
|1     |2068   |2.5   |
|1     |2843   |4.5   |
+------+-------+------+
only showing top 4 rows



In [0]:
from pyspark.ml.recommendation import ALS
als = ALS()

als.setPredictionCol("prediction")\
    .setMaxIter(5)\
    .setSeed(seed)\
    .setRegParam(0.1)\
    .setUserCol('userID')\
    .setItemCol('movieID')\
    .setRatingCol('rating')\
    .setRank(8)

my_ratings_model =als.fit(training_df)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol = "rating", metricName= "rmse")
predict_df = my_ratings_model.transform(test_df)

predicted_test_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

test_RMSE_my_ratings =reg_eval.evaluate(predicted_test_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

dbutils.widgets.text("inout","5","")
ins= dbutils.widgets.get("input")
uid=int(ins)
ll=predicted_test_ratings_df.filter(col("userID")==uid)


The model had a RMSE on the test set of 0.813771546438166


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-355361367363890>[0m in [0;36m<cell line: 13>[0;34m()[0m
[1;32m     11[0m [0;34m[0m[0m
[1;32m     12[0m [0mdbutils[0m[0;34m.[0m[0mwidgets[0m[0;34m.[0m[0mtext[0m[0;34m([0m[0;34m"inout"[0m[0;34m,[0m [0;34m"5"[0m[0;34m,[0m[0;34m""[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 13[0;31m [0mins[0m[0;34m=[0m [0mdbutils[0m[0;34m.[0m[0mwidgets[0m[0;34m.[0m[0mget[0m[0;34m([0m[0;34m"input"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     14[0m [0muid[0m[0;34m=[0m[0mint[0m[0;34m([0m[0mins[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     15[0m [0mll[0m[0;34m=[0m[0mpredicted_test_ratings_df[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m"userID"[0m[0;34m)[0m[0;34m==[0m[0muid[0m[0;34m)[0m[0;34m

In [0]:
MovieRec = ll.join(movies_df,F.col('movieID') == f.col('ID').drop('ID').select('title').take(10) 

l=dbutils.notebook.exit(MovieRec)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-1506129087242971>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mMovieRec[0m [0;34m=[0m [0mll[0m[0;34m.[0m[0mjoin[0m[0;34m([0m[0mmovies_df[0m[0;34m,[0m[0mF[0m[0;34m.[0m[0mcol[0m[0;34m([0m[0;34m'movieID'[0m[0;34m)[0m [0;34m==[0m [0mf[0m[0;34m.[0m[0mcol[0m[0;34m([0m[0;34m'ID'[0m[0;34m)[0m[0;34m.[0m[0mdrop[0m[0;34m([0m[0;34m'ID'[0m[0;34m)[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m'title'[0m[0;34m)[0m[0;34m.[0m[0mtake[0m[0;34m([0m[0;36m10[0m[0;34m)[0m [0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m [0ml[0m[0;34m=[0m[0mdbutils[0m[0;34m.[0m[0mnotebook[0m[0;34m.[0m[0mexit[0m[0;34m([0m[0mMovieRec[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mNameErro