-
Notifications
You must be signed in to change notification settings - Fork 0
/
popular-movies-nice-dataframe.py
54 lines (40 loc) · 1.71 KB
/
popular-movies-nice-dataframe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# -*- coding: utf-8 -*-
"""
Created on Mon Sep 7 15:28:00 2020
@author: Frank
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs
def loadMovieNames():
movieNames = {}
# CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
with codecs.open("ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
nameDict = spark.sparkContext.broadcast(loadMovieNames())
# Create schema when reading u.data
schema = StructType([ \
StructField("userID", IntegerType(), True), \
StructField("movieID", IntegerType(), True), \
StructField("rating", IntegerType(), True), \
StructField("timestamp", LongType(), True)])
# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.data")
movieCounts = moviesDF.groupBy("movieID").count()
# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
return nameDict.value[movieID]
lookupNameUDF = func.udf(lookupName)
# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))
# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))
# Grab the top 10
sortedMoviesWithNames.show(20, False)
# Stop the session
spark.stop()