In [1]:
from pyspark.sql import SparkSession, Row, functions
import os

In [2]:
def loadMovieNames():
    movie_names = {}
    with open('/Users/ayusman/migrate/hadooop/hadoop-basics/ml-100k/u.item', encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movie_names[int(fields[0])] = fields[1]
    
    return  movie_names

In [3]:
def parseInput(line):
    fields = line.split()
    return Row(int (fields[1]), float (fields[2]), 1.0)

In [4]:
sparkSession = SparkSession.builder.appName("spark dataset").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/28 13:16:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/28 13:16:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/01/28 13:16:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
#movieNames dictionary
movieNames = loadMovieNames()

In [6]:
os.getcwd()

'/Users/ayusman/migrate/hadooop/hadoop-basics/Spark'

In [7]:
# get the raw data in dataframe
lines = sparkSession.sparkContext.textFile('/Users/ayusman/migrate/hadooop/hadoop-basics/ml-100k/u.data')

In [8]:
#converting to RDD of raw data
movies = lines.map(parseInput)

In [9]:
# creating dataframe/dataset with that raw data
movieDataset = sparkSession.createDataFrame(movies)

In [10]:
movieDataset.show()

+----+---+---+
|  _1| _2| _3|
+----+---+---+
| 242|3.0|1.0|
| 302|3.0|1.0|
| 377|1.0|1.0|
|  51|2.0|1.0|
| 346|1.0|1.0|
| 474|4.0|1.0|
| 265|2.0|1.0|
| 465|5.0|1.0|
| 451|3.0|1.0|
|  86|3.0|1.0|
| 257|2.0|1.0|
|1014|5.0|1.0|
| 222|5.0|1.0|
|  40|3.0|1.0|
|  29|3.0|1.0|
| 785|3.0|1.0|
| 387|5.0|1.0|
| 274|2.0|1.0|
|1042|4.0|1.0|
|1184|2.0|1.0|
+----+---+---+
only showing top 20 rows



In [11]:
#compute average rating for each movieID
# _1 = movieid, _2=rating, _3=count
averageRating = movieDataset.groupBy("_1").avg("_2")

In [12]:
#average rating for movieIds
averageRating.take(3)

[Row(_1=474, avg(_2)=4.252577319587629),
 Row(_1=29, avg(_2)=2.6666666666666665),
 Row(_1=26, avg(_2)=3.452054794520548)]

In [13]:
#compute the count
counts = movieDataset.groupBy("_1").count()

In [14]:
counts.take(3)

[Row(_1=474, count=194), Row(_1=29, count=114), Row(_1=26, count=73)]

In [15]:
averageAndcounts = counts.join(averageRating, "_1")

In [16]:
averageAndcounts.take(2)

[Row(_1=474, count=194, avg(_2)=4.252577319587629),
 Row(_1=29, count=114, avg(_2)=2.6666666666666665)]

In [38]:
# best movies
top10 = averageAndcounts.orderBy(("avg(_2)"), ascending=[0]).filter("count>20").take(10)

In [40]:
top10

[Row(_1=408, count=112, avg(_2)=4.491071428571429),
 Row(_1=318, count=298, avg(_2)=4.466442953020135),
 Row(_1=169, count=118, avg(_2)=4.466101694915254),
 Row(_1=483, count=243, avg(_2)=4.45679012345679),
 Row(_1=114, count=67, avg(_2)=4.447761194029851),
 Row(_1=64, count=283, avg(_2)=4.445229681978798),
 Row(_1=603, count=209, avg(_2)=4.3875598086124405),
 Row(_1=12, count=267, avg(_2)=4.385767790262173),
 Row(_1=50, count=583, avg(_2)=4.3584905660377355),
 Row(_1=178, count=125, avg(_2)=4.344)]

In [41]:
# searching for key i.e movie[0] in movieNames dictionary and its corresponding average rating
for movie in top10:
    print(movieNames[movie[0]], movie[2])

Close Shave, A (1995) 4.491071428571429
Schindler's List (1993) 4.466442953020135
Wrong Trousers, The (1993) 4.466101694915254
Casablanca (1942) 4.45679012345679
Wallace & Gromit: The Best of Aardman Animation (1996) 4.447761194029851
Shawshank Redemption, The (1994) 4.445229681978798
Rear Window (1954) 4.3875598086124405
Usual Suspects, The (1995) 4.385767790262173
Star Wars (1977) 4.3584905660377355
12 Angry Men (1957) 4.344


In [42]:
# worst movies
last10 = averageAndcounts.orderBy(("avg(_2)"), ascending=[1]).filter("count>20").take(10)
last10

[Row(_1=758, count=21, avg(_2)=1.7142857142857142),
 Row(_1=457, count=27, avg(_2)=1.7407407407407407),
 Row(_1=688, count=44, avg(_2)=1.8409090909090908),
 Row(_1=368, count=31, avg(_2)=1.903225806451613),
 Row(_1=1215, count=30, avg(_2)=1.9333333333333333),
 Row(_1=743, count=39, avg(_2)=1.9487179487179487),
 Row(_1=890, count=43, avg(_2)=1.9534883720930232),
 Row(_1=375, count=23, avg(_2)=1.9565217391304348),
 Row(_1=1037, count=24, avg(_2)=2.0),
 Row(_1=564, count=27, avg(_2)=2.037037037037037)]

In [43]:
for movie in last10:
    print(movieNames[movie[0]], movie[1], movie[2])

Lawnmower Man 2: Beyond Cyberspace (1996) 21 1.7142857142857142
Free Willy 3: The Rescue (1997) 27 1.7407407407407407
Leave It to Beaver (1997) 44 1.8409090909090908
Bio-Dome (1996) 31 1.903225806451613
Barb Wire (1996) 30 1.9333333333333333
Crow: City of Angels, The (1996) 39 1.9487179487179487
Mortal Kombat: Annihilation (1997) 43 1.9534883720930232
Showgirls (1995) 23 1.9565217391304348
Grease 2 (1982) 24 2.0
Tales from the Hood (1995) 27 2.037037037037037


In [89]:
sparkSession.stop()