In [47]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
from datetime import datetime

spark = SparkSession.builder.appName("practice").getOrCreate()

In [60]:
!ls ml-1m

movies.dat  ratings.dat  README  users.dat


### Load Ratings and Movie Names

In [151]:
def parse_ratings(line):
    fields = line.split("::")
    return Row(user=int(fields[0]), movie_id=int(fields[1]), rating=float(fields[2]))

def parse_movie_names(line):
    f = line.split("::")
    return Row(movie_id=int(f[0]), name=str(f[1]))

# create rdd and parse it using the row object
rating_rdd = spark.sparkContext.textFile('ml-1m/ratings.dat')
rating_rdd = rating_rdd.map(parse_ratings)

movies_rdd = spark.sparkContext.textFile('ml-1m/movies.dat')
movie_names_rdd = movies_rdd.map(parse_movie_names)
print(rating_rdd.take(1))

# create dataframe using rdd
ratings = spark.createDataFrame(rating_rdd)
movies = spark.createDataFrame(movie_names)
print(ratings.head(1))

# Use take() with rdd and head() with dataframe to see what's happening
assert rating_rdd.take(1) == ratings.head(1)
assert ratings.take(1) == ratings.head(1)

[Row(movie_id=1193, rating=5.0, user=1)]
[Row(movie_id=1193, rating=5.0, user=1)]


In [59]:
ratings.printSchema()

root
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- user: long (nullable = true)



In [152]:
movies.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- name: string (nullable = true)



In [143]:
print(ratings.columns)
print(ratings.dtypes)
ratings.show()

['movie_id', 'rating', 'user']
[('movie_id', 'bigint'), ('rating', 'double'), ('user', 'bigint')]
+--------+------+----+
|movie_id|rating|user|
+--------+------+----+
|    1193|   5.0|   1|
|     661|   3.0|   1|
|     914|   3.0|   1|
|    3408|   4.0|   1|
|    2355|   5.0|   1|
|    1197|   3.0|   1|
|    1287|   5.0|   1|
|    2804|   5.0|   1|
|     594|   4.0|   1|
|     919|   4.0|   1|
|     595|   5.0|   1|
|     938|   4.0|   1|
|    2398|   4.0|   1|
|    2918|   4.0|   1|
|    1035|   5.0|   1|
|    2791|   4.0|   1|
|    2687|   3.0|   1|
|    2018|   4.0|   1|
|    3105|   5.0|   1|
|    2797|   4.0|   1|
+--------+------+----+
only showing top 20 rows



In [146]:
# create new column or modify existing in place using withColumn
# use withColumnRenamed to rename a column
ratings.withColumn("score", 100 * ratings.rating / 5.0).withColumnRenamed("score", "%_score").show(5)

+--------+------+----+-------+
|movie_id|rating|user|%_score|
+--------+------+----+-------+
|    1193|   5.0|   1|  100.0|
|     661|   3.0|   1|   60.0|
|     914|   3.0|   1|   60.0|
|    3408|   4.0|   1|   80.0|
|    2355|   5.0|   1|  100.0|
+--------+------+----+-------+
only showing top 5 rows



### Top 10 movies with at least 100 reviews using dataframe api

In [117]:
# calculate aggregate statistics
avg_rating = ratings.groupby("movie_id").avg("rating")
num_rating = ratings.groupby("movie_id").count()

# join stats and movie name
avg_and_count = avg_rating.join(num_rating, "movie_id")
summary = movies.join(avg_and_count, "movie_id")
summary = summary.filter(summary["count"] > 100)

# sort / can also use sort notation --> df.sort(df.columnName.desc())
top_10_api = summary.orderBy("avg(rating)", ascending=False).take(10)

# print results (could also just use a show() here)
for row in top_10_api:
    print((row.name, row['avg(rating)'], row['count']))

('Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 4.560509554140127, 628)
('Shawshank Redemption, The (1994)', 4.554557700942973, 2227)
('Godfather, The (1972)', 4.524966261808367, 2223)
('Close Shave, A (1995)', 4.52054794520548, 657)
('Usual Suspects, The (1995)', 4.517106001121705, 1783)
("Schindler's List (1993)", 4.510416666666667, 2304)
('Wrong Trousers, The (1993)', 4.507936507936508, 882)
('Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)', 4.491489361702127, 470)
('Raiders of the Lost Ark (1981)', 4.477724741447892, 2514)
('Rear Window (1954)', 4.476190476190476, 1050)


### Top 10 movies with at least 100 reviews using sql api

In [147]:
# create temporary tables
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")

# run sql query which returns a dataframe
result = spark.sql(
    ''' 
    SELECT name, AVG(rating), COUNT(rating)
    FROM ratings INNER JOIN movies ON ratings.movie_id = movies.movie_id 
    GROUP BY name
    HAVING COUNT(rating) > 100
    '''
)

# for some reason can't do both having and order by in spark sql
# can also use sort notation --> df.sort(df.columnName.desc())
result = result.orderBy("avg(rating)", ascending= False)

# convert dataframe into a list of rows (could also just use a show() here)
top_10_sql = result.take(10)
for row in top_10_sql:
    print((row.name, row['avg(rating)'], row['count(rating)']))

('Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 4.560509554140127, 628)
('Shawshank Redemption, The (1994)', 4.554557700942973, 2227)
('Godfather, The (1972)', 4.524966261808367, 2223)
('Close Shave, A (1995)', 4.52054794520548, 657)
('Usual Suspects, The (1995)', 4.517106001121705, 1783)
("Schindler's List (1993)", 4.510416666666667, 2304)
('Wrong Trousers, The (1993)', 4.507936507936508, 882)
('Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)', 4.491489361702127, 470)
('Raiders of the Lost Ark (1981)', 4.477724741447892, 2514)
('Rear Window (1954)', 4.476190476190476, 1050)
