# PySpark and Mllib
## I.- find lowest rated movies with PySpark
## II.- predict user's movie rating score using Mllib  

# I.- Finding lowest rated movies using PySpark
this is just to spare our friends some time and make sure they don't waist time wathing these instead of boing out with family or walking the dog 

In [1]:
#! pip install pyspark

In [2]:
import os 
os.chdir('C:\\Users\\asuhajda\\JupyterRoot\\BigData\\movies_dataset\\ml-100k\\ml-100k')

In [3]:
# LowestRatedMovieDataFrame.py

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
    movieNames = {}
    with open("u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))


In [4]:
# Create a SparkSession in Windows 
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [5]:
# Load  movie ID
movieNames = loadMovieNames()
#movieNames

In [6]:
# Get raw data
while True:
     try:
         lines = spark.sparkContext.textFile("u.data")
         break
     except ValueError:
         #lines = spark.sparkContext.textFile("u.data")
         print('uups, somethign went wrong')

In [8]:
# Convert  it to a RDD of Row objects with (movieID, rating)
''' '''
while True:
     try:
         movies = lines.map(parseInput)
         break
            
     except ValueError:
         print('uups, somethign went wrong')

print("these are the first 10 records"), print(movies.take(10))


these are the first 10 records
[Row(movieID=242, rating=3.0), Row(movieID=302, rating=3.0), Row(movieID=377, rating=1.0), Row(movieID=51, rating=2.0), Row(movieID=346, rating=1.0), Row(movieID=474, rating=4.0), Row(movieID=265, rating=2.0), Row(movieID=465, rating=5.0), Row(movieID=451, rating=3.0), Row(movieID=86, rating=3.0)]


(None, None)

In [9]:
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movies)
print("these are the first 10 records"), print(movieDataset.take(10))

these are the first 10 records
[Row(movieID=242, rating=3.0), Row(movieID=302, rating=3.0), Row(movieID=377, rating=1.0), Row(movieID=51, rating=2.0), Row(movieID=346, rating=1.0), Row(movieID=474, rating=4.0), Row(movieID=265, rating=2.0), Row(movieID=465, rating=5.0), Row(movieID=451, rating=3.0), Row(movieID=86, rating=3.0)]


(None, None)

In [10]:
# Compute average rating for each movieID
averageRatings = movieDataset.groupBy("movieID").avg("rating")
averageRatings.take(10)

[Row(movieID=474, avg(rating)=4.252577319587629),
 Row(movieID=29, avg(rating)=2.6666666666666665),
 Row(movieID=26, avg(rating)=3.452054794520548),
 Row(movieID=964, avg(rating)=3.3333333333333335),
 Row(movieID=1677, avg(rating)=3.0),
 Row(movieID=65, avg(rating)=3.5391304347826087),
 Row(movieID=191, avg(rating)=4.163043478260869),
 Row(movieID=1224, avg(rating)=2.6666666666666665),
 Row(movieID=558, avg(rating)=3.6714285714285713),
 Row(movieID=1010, avg(rating)=3.25)]

In [11]:
# Compute count of ratings for each movieID
counts = movieDataset.groupBy("movieID").count()

In [12]:
# Join the two together (We now have movieID, avg(rating), and count columns)
averagesAndCounts = counts.join(averageRatings, "movieID")

In [13]:
# Sample joined Dataset  (replacement, fraction and seed)
averagesAndCounts.sample(False, 0.01, 15).collect()

[Row(movieID=229, count=171, avg(rating)=3.111111111111111),
 Row(movieID=843, count=30, avg(rating)=3.0),
 Row(movieID=161, count=220, avg(rating)=3.481818181818182),
 Row(movieID=861, count=3, avg(rating)=2.3333333333333335),
 Row(movieID=499, count=62, avg(rating)=3.9516129032258065),
 Row(movieID=208, count=200, avg(rating)=3.945),
 Row(movieID=1475, count=8, avg(rating)=2.875),
 Row(movieID=870, count=9, avg(rating)=2.7777777777777777),
 Row(movieID=1023, count=31, avg(rating)=2.5806451612903225),
 Row(movieID=762, count=115, avg(rating)=3.391304347826087),
 Row(movieID=1538, count=3, avg(rating)=3.0),
 Row(movieID=1387, count=3, avg(rating)=2.0),
 Row(movieID=1261, count=4, avg(rating)=3.25),
 Row(movieID=1173, count=7, avg(rating)=2.5714285714285716),
 Row(movieID=173, count=324, avg(rating)=4.172839506172839),
 Row(movieID=569, count=67, avg(rating)=2.701492537313433)]

In [14]:
# Pull the top 10 results by average rating and store it in new variable 
topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

In [15]:
# Print them out, converting movie ID's to names as we go.
for movie in topTen:
    print (movieNames[movie[0]], movie[1], movie[2])

Hostile Intentions (1994) 1 1.0
Careful (1992) 1 1.0
Lotto Land (1995) 1 1.0
Low Life, The (1994) 1 1.0
Falling in Love Again (1980) 2 1.0
Power 98 (1995) 1 1.0
Amityville: A New Generation (1993) 5 1.0
Further Gesture, A (1996) 1 1.0
Amityville: Dollhouse (1996) 3 1.0
Touki Bouki (Journey of the Hyena) (1973) 1 1.0


In [16]:
# Stop the session
spark.stop()

# II.- Using MLLIB for Movie Recommendation 

In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

In [18]:
def loadMovieNames():
    movieNames = {}
    with open("u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [19]:

# Load up movie ID -> movie name dictionary
''' 
def loadMovieNames():
    movieNames = {}
    with open("u.item") as f:
        for line in f:
            fields = line.split('|')
            #movieNames[int(fields[0])] = fields[1]#.decode('ascii', 'ignore' , encoding='utf-32')
            movieNames = movieNames.decode('ascii', 'ignore', 'r',  encoding='utf-32')
    return movieNames
'''

# Convert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
    fields = line.value.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))

In [20]:
movieNames = loadMovieNames()

In [21]:

# Create a SparkSession (the config bit is only for Windows!)
spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
spark

In [22]:
# Get the raw data
lines = spark.read.text("u.data").rdd
lines.take(5)

[Row(value='196\t242\t3\t881250949'),
 Row(value='186\t302\t3\t891717742'),
 Row(value='22\t377\t1\t878887116'),
 Row(value='244\t51\t2\t880606923'),
 Row(value='166\t346\t1\t886397596')]

In [23]:

# Load up our movie ID -> name dictionary
movieNames = loadMovieNames()


# Convert it to a RDD of Row objects with (userID, movieID, rating)
ratingsRDD = lines.map(parseInput)

# Convert to a DataFrame and cache it
ratings = spark.createDataFrame(ratingsRDD).cache()


In [24]:
ratingsRDD.take(5)

[Row(userID=196, movieID=242, rating=3.0),
 Row(userID=186, movieID=302, rating=3.0),
 Row(userID=22, movieID=377, rating=1.0),
 Row(userID=244, movieID=51, rating=2.0),
 Row(userID=166, movieID=346, rating=1.0)]

In [25]:
ratings.take(5)

[Row(userID=196, movieID=242, rating=3.0),
 Row(userID=186, movieID=302, rating=3.0),
 Row(userID=22, movieID=377, rating=1.0),
 Row(userID=244, movieID=51, rating=2.0),
 Row(userID=166, movieID=346, rating=1.0)]

In [26]:
# Create an ALS collaborative filtering model from the complete data set
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
model = als.fit(ratings)

In [27]:
'''# Print out ratings from user 1:
print("\nRatings for user ID 1:")
userRatings = ratings.filter("userID = 1")
for rating in userRatings.collect():
    print(movieNames[rating['movieID']], rating['rating'])
'''

'# Print out ratings from user 1:\nprint("\nRatings for user ID 1:")\nuserRatings = ratings.filter("userID = 1")\nfor rating in userRatings.collect():\n    print(movieNames[rating[\'movieID\']], rating[\'rating\'])\n'

In [28]:

print("\nTop 20 recommendations:")
# Find movies rated more than 100 times
ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
# Construct a "test" dataframe for user 1 with every movie rated more than 100 times
popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(1))

# Run our model on that list of popular movies for user ID 0
recommendations = model.transform(popularMovies)

# Get the top 20 movies with the highest predicted rating for this user
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

for recommendation in topRecommendations:
    print (movieNames[recommendation['movieID']], recommendation['prediction'])

spark.stop()


Top 20 recommendations:
Sling Blade (1996) 5.278684139251709
Pulp Fiction (1994) 5.004818916320801
Swingers (1996) 4.968068599700928
Chasing Amy (1997) 4.951290607452393
Godfather, The (1972) 4.84442663192749
Being There (1979) 4.835037708282471
Hoop Dreams (1994) 4.8089165687561035
Monty Python's Life of Brian (1979) 4.779086589813232
Secrets & Lies (1996) 4.770893573760986
Grosse Pointe Blank (1997) 4.769832134246826
Nikita (La Femme Nikita) (1990) 4.7573323249816895
Welcome to the Dollhouse (1995) 4.749566078186035
Chasing Amy (1997) 4.723423480987549
Clerks (1994) 4.709767818450928
Blade Runner (1982) 4.707928657531738
Donnie Brasco (1997) 4.704612731933594
Wrong Trousers, The (1993) 4.691067695617676
Dead Man Walking (1995) 4.677611351013184
Boot, Das (1981) 4.664533615112305
Usual Suspects, The (1995) 4.663516521453857
