In [3]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import time
from pyspark.sql.types import *
import  pyspark.sql.functions as F 

In [4]:
from pyspark.sql import SparkSession


spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [5]:
movies = spark.read.load("./data/ml-latest-small/ml-latest-small/movies.csv", format='csv', header = True)
ratings = spark.read.load("./data/ml-latest-small/ml-latest-small/ratings.csv", format='csv', header = True)
links = spark.read.load("./data/ml-latest-small/ml-latest-small/links.csv", format='csv', header = True)
tags = spark.read.load("./data/ml-latest-small/ml-latest-small/tags.csv", format='csv', header = True)

AnalysisException: Path does not exist: file:/opt/workspace/movie_recommendation/data/ml-latest-small/ml-latest-small/movies.csv;

In [266]:
movies.dtypes

[('movieId', 'string'), ('title', 'string'), ('genres', 'string')]

In [263]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [151]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [152]:
links.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [153]:
tags.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [270]:
# %time
# Count specific genres type: comedy
print('Num of comedy:')
print( movies.withColumn('split', F.split('genres','\|')).withColumn('cnt', F.expr('size(filter(split, x -> x in ("Comedy")))')).select('cnt').groupBy().sum().collect()[0][0])
print('Num of comedy and adventure:')
print( movies.withColumn('split', F.split('genres','\|')).withColumn('cnt', F.expr('size(filter(split, x -> x in ("Comedy","Adventure")))')).select('cnt').groupBy().sum().collect()[0][0])

Num of comedy:
3756
Num of comedy and adventure:
5019


In [284]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [308]:
%time 
# slow
ratings.select('rating').distinct().sort('rating')

CPU times: user 3 µs, sys: 3 µs, total: 6 µs
Wall time: 10.5 µs


DataFrame[rating: int]

In [300]:
%time 
# fast
sorted(ratings.select('rating').distinct().rdd.map(lambda r: r[0]).collect())

CPU times: user 3 µs, sys: 2 µs, total: 5 µs
Wall time: 8.34 µs


[1, 3, 5, 4, 2, 0]

In [325]:
# For the users that rated movies and the movies that were rated:
# Minimum number of ratings per user is 20
print( ratings.groupBy('userId').count().select('count').rdd.min()[0])
# Minimum number of ratings per movie is 1
print( ratings.groupBy('movieId').count().select('count').rdd.min()[0])

20
1


In [355]:
# 3446 out of 9724 movies are rated by only one user
print(ratings.groupBy("movieId").count().withColumnRenamed("count", "rating count").groupBy("rating count").count().orderBy('rating count').first()[1])
print(ratings.groupBy('movieId').count().count())
# Number of all movies (rated & tagged): 9724
print( ratings.select('movieId').union(tags.select('movieId')).distinct().count() )

3446
9724
9742


In [356]:
# Number of users: 671
print( ratings.select('userId').distinct().count())
# equal to ratings.groupBy('userId').count().count()

# no new userid in tags dataframe
print( ratings.select('userId').union(tags.select('userId')).distinct().count() )

610
610
