In [2]:
from pyspark.sql import SparkSession, Row, functions
from pyspark.sql.functions import avg, sum, collect_list

# Create SparkSession

In [3]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

23/11/29 10:35:30 WARN Utils: Your hostname, anbish resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp1s0)
23/11/29 10:35:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/29 10:35:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read the data using sparkContext.textFile()
- SparkContext.textFile is a method in Apache Spark, a fast and general-purpose cluster computing system, used for distributed data processing.<br>
- The textFile function is specifically designed for reading text files from distributed file systems like HDFS (Hadoop Distributed File System) or local file systems in a parallel and distributed manner.



In [4]:
lines = spark.sparkContext.textFile("../data/ml-100k/u.data")
lines

../data/ml-100k/u.data MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

- lines right now is an RDD

# Create the parse function for the RDD
- we parse each line in the 'lines' to Row form:<br> <code>Row(field1=value1, field2=value2,....)</code>

In [5]:
def parse_input(line):
    fields = line.split()
    movieID = fields[1]
    rating = fields[2]
    return Row(MovieID=int(movieID), Rating=float(rating))

# Map the lines with 'parse_input' function

In [6]:
movie_ratings = lines.map(parse_input)
movie_ratings.take(10)

                                                                                

[Row(MovieID=242, Rating=3.0),
 Row(MovieID=302, Rating=3.0),
 Row(MovieID=377, Rating=1.0),
 Row(MovieID=51, Rating=2.0),
 Row(MovieID=346, Rating=1.0),
 Row(MovieID=474, Rating=4.0),
 Row(MovieID=265, Rating=2.0),
 Row(MovieID=465, Rating=5.0),
 Row(MovieID=451, Rating=3.0),
 Row(MovieID=86, Rating=3.0)]

- 'movie_ratings' is a list contain multiple 'Row' objects, we need to convert this list to a dataframe

In [7]:
movie_ratings = spark.createDataFrame(movie_ratings)

In [8]:
movie_ratings.show(10)

+-------+------+
|MovieID|Rating|
+-------+------+
|    242|   3.0|
|    302|   3.0|
|    377|   1.0|
|     51|   2.0|
|    346|   1.0|
|    474|   4.0|
|    265|   2.0|
|    465|   5.0|
|    451|   3.0|
|     86|   3.0|
+-------+------+
only showing top 10 rows



# Get Average ratings of movies

In [9]:
avg_ratings = movie_ratings.groupBy("MovieID").avg("Rating")
avg_ratings.show(10)

+-------+------------------+
|MovieID|       avg(Rating)|
+-------+------------------+
|    474| 4.252577319587629|
|     29|2.6666666666666665|
|     26| 3.452054794520548|
|    964|3.3333333333333335|
|     65|3.5391304347826087|
|    191| 4.163043478260869|
|   1224|2.6666666666666665|
|    558|3.6714285714285713|
|   1010|              3.25|
|    418|3.5813953488372094|
+-------+------------------+
only showing top 10 rows



# Count the rating of each movie

In [10]:
count_ratings = movie_ratings.groupBy("MovieID").count()
count_ratings.show(10)

+-------+-----+
|MovieID|count|
+-------+-----+
|    474|  194|
|     29|  114|
|     26|   73|
|    964|    9|
|     65|  115|
|    191|  276|
|   1224|   12|
|    558|   70|
|   1010|   44|
|    418|  129|
+-------+-----+
only showing top 10 rows



# Join avg ratings with count ratings using MovieID

In [11]:
count_and_avg_ratings = count_ratings.join(avg_ratings, on="MovieID")
count_and_avg_ratings.show(10)

                                                                                

+-------+-----+------------------+
|MovieID|count|       avg(Rating)|
+-------+-----+------------------+
|    474|  194| 4.252577319587629|
|     29|  114|2.6666666666666665|
|     26|   73| 3.452054794520548|
|    964|    9|3.3333333333333335|
|     65|  115|3.5391304347826087|
|    191|  276| 4.163043478260869|
|   1224|   12|2.6666666666666665|
|    558|   70|3.6714285714285713|
|   1010|   44|              3.25|
|    418|  129|3.5813953488372094|
+-------+-----+------------------+
only showing top 10 rows



# Sort the average and count table by avg(rating)

In [12]:
sorted_df = count_and_avg_ratings.orderBy("avg(Rating)")
top_ten = sorted_df.head(10)

# Read movies's info in the u.item file and get the movie ID and movie name

In [13]:
def load_movie_names():
    movie_names = {}
    with open("../data/ml-100k/u.item", encoding="latin1") as f:
        for line in f:
            fields = line.split("|")
            movieID = int(fields[0])
            name = fields[1]

            movie_names[movieID] = name
    return movie_names

movie_names = load_movie_names()
movie_names

{1: 'Toy Story (1995)',
 2: 'GoldenEye (1995)',
 3: 'Four Rooms (1995)',
 4: 'Get Shorty (1995)',
 5: 'Copycat (1995)',
 6: 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 7: 'Twelve Monkeys (1995)',
 8: 'Babe (1995)',
 9: 'Dead Man Walking (1995)',
 10: 'Richard III (1995)',
 11: 'Seven (Se7en) (1995)',
 12: 'Usual Suspects, The (1995)',
 13: 'Mighty Aphrodite (1995)',
 14: 'Postino, Il (1994)',
 15: "Mr. Holland's Opus (1995)",
 16: 'French Twist (Gazon maudit) (1995)',
 17: 'From Dusk Till Dawn (1996)',
 18: 'White Balloon, The (1995)',
 19: "Antonia's Line (1995)",
 20: 'Angels and Insects (1995)',
 21: 'Muppet Treasure Island (1996)',
 22: 'Braveheart (1995)',
 23: 'Taxi Driver (1976)',
 24: 'Rumble in the Bronx (1995)',
 25: 'Birdcage, The (1996)',
 26: 'Brothers McMullen, The (1995)',
 27: 'Bad Boys (1995)',
 28: 'Apollo 13 (1995)',
 29: 'Batman Forever (1995)',
 30: 'Belle de jour (1967)',
 31: 'Crimson Tide (1995)',
 32: 'Crumb (1994)',
 33: 'Desperado (1995)',
 34: '

In [14]:
for movie in top_ten:
    id = movie[0]
    count = movie[1]
    rating = movie[2]

    print(f"{movie_names[id]} || count: {count} || rating: {rating}")


Touki Bouki (Journey of the Hyena) (1973) || count: 1 || rating: 1.0
Amityville: Dollhouse (1996) || count: 3 || rating: 1.0
Quartier Mozart (1992) || count: 1 || rating: 1.0
Power 98 (1995) || count: 1 || rating: 1.0
Amityville: A New Generation (1993) || count: 5 || rating: 1.0
Lotto Land (1995) || count: 1 || rating: 1.0
Hostile Intentions (1994) || count: 1 || rating: 1.0
Falling in Love Again (1980) || count: 2 || rating: 1.0
The Courtyard (1995) || count: 1 || rating: 1.0
Bloody Child, The (1996) || count: 1 || rating: 1.0


---

# Exercise 2

# Read raw data

In [15]:
lines2 = spark.sparkContext.textFile("../data/ml-100k/u.data")

# Create parse input function


In [16]:
def parse_input2(line):
    fields = line.split()
    id = int(fields[1])
    rating = float(fields[2])

    return Row(MovieID=int(id), Rating=float(rating), count=int(1))

rdd_ratings2 = lines2.map(parse_input2)


# Convert RDD to dataframe

In [17]:
ratings2 = spark.createDataFrame(rdd_ratings2)
ratings2.show(10)

+-------+------+-----+
|MovieID|Rating|count|
+-------+------+-----+
|    242|   3.0|    1|
|    302|   3.0|    1|
|    377|   1.0|    1|
|     51|   2.0|    1|
|    346|   1.0|    1|
|    474|   4.0|    1|
|    265|   2.0|    1|
|    465|   5.0|    1|
|    451|   3.0|    1|
|     86|   3.0|    1|
+-------+------+-----+
only showing top 10 rows



# Group by MovieID and get total of rating and count

In [18]:
ratingTotalAndCounts = ratings2.groupBy("MovieID").agg(sum("Rating"), sum("count"))
ratingTotalAndCounts.show(10)

+-------+-----------+----------+
|MovieID|sum(Rating)|sum(count)|
+-------+-----------+----------+
|    474|      825.0|       194|
|     29|      304.0|       114|
|     26|      252.0|        73|
|    964|       30.0|         9|
|     65|      407.0|       115|
|    191|     1149.0|       276|
|   1224|       32.0|        12|
|    558|      257.0|        70|
|   1010|      143.0|        44|
|    418|      462.0|       129|
+-------+-----------+----------+
only showing top 10 rows



# Get avg rating

In [19]:
avg_ratings2 = ratings2.groupBy("MovieID").avg("Rating")
avg_ratings2.show(10)

+-------+------------------+
|MovieID|       avg(Rating)|
+-------+------------------+
|    474| 4.252577319587629|
|     29|2.6666666666666665|
|     26| 3.452054794520548|
|    964|3.3333333333333335|
|     65|3.5391304347826087|
|    191| 4.163043478260869|
|   1224|2.6666666666666665|
|    558|3.6714285714285713|
|   1010|              3.25|
|    418|3.5813953488372094|
+-------+------------------+
only showing top 10 rows



# Sort by avg rating

In [20]:
sorted_ratings = avg_ratings2.orderBy("avg(Rating)")
sorted_ratings.show(10)

+-------+-----------+
|MovieID|avg(Rating)|
+-------+-----------+
|   1571|        1.0|
|   1334|        1.0|
|    439|        1.0|
|   1570|        1.0|
|    830|        1.0|
|   1374|        1.0|
|   1559|        1.0|
|    858|        1.0|
|   1343|        1.0|
|   1548|        1.0|
+-------+-----------+
only showing top 10 rows



---

# Exercise 3

In [21]:
import numpy as np

# Create user matrix

### - 10000 ratings made by 943 users, each user has rated at least 20 movies <br>
- The full u data set, 100000 ratings by 943 users on 1682 items. <br>
              Each user has rated at least 20 movies. <br> Users and items are
              numbered consecutively from 1. <br> The data is randomly
              ordered. <br> This is a tab separated list of <br>
	         user id | item id | rating | timestamp. 

In [68]:
users = np.zeros((943 , 1682))
users.shape


(943, 1682)

# Create parse input function 3 to get the user id with movie id and rating

In [69]:
def parse_input3(line):
    fields = line.split()
    userID = fields[0]
    movieID = fields[1]
    rating = fields[2]

    return Row(UserID=int(userID), MovieID=int(movieID), Rating=float(rating))

rdd3 = lines.map(parse_input3)




# convert RDD to df

In [70]:

df = spark.createDataFrame(rdd3)
df.show(10)



+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|   196|    242|   3.0|
|   186|    302|   3.0|
|    22|    377|   1.0|
|   244|     51|   2.0|
|   166|    346|   1.0|
|   298|    474|   4.0|
|   115|    265|   2.0|
|   253|    465|   5.0|
|   305|    451|   3.0|
|     6|     86|   3.0|
+------+-------+------+
only showing top 10 rows



# Group by UserID and collect list of rated movies and the rate for each movie 

In [71]:
user_rates = df.groupBy("UserID").agg(collect_list("MovieID").alias("RatedMovies"), collect_list("Rating").alias("Rates"))
user_rates.show(10)


+------+--------------------+--------------------+
|UserID|         RatedMovies|               Rates|
+------+--------------------+--------------------+
|     1|[189, 33, 160, 20...|[3.0, 4.0, 4.0, 4...|
|     2|[292, 251, 50, 31...|[4.0, 5.0, 5.0, 1...|
|     3|[335, 245, 337, 3...|[1.0, 1.0, 1.0, 3...|
|     4|[264, 303, 361, 3...|[3.0, 5.0, 5.0, 4...|
|     5|[17, 439, 225, 11...|[4.0, 1.0, 2.0, 1...|
|     6|[14, 98, 463, 301...|[5.0, 5.0, 4.0, 2...|
|     7|[510, 594, 640, 3...|[5.0, 3.0, 3.0, 3...|
|     8|[550, 22, 50, 182...|[3.0, 5.0, 5.0, 5...|
|     9|[298, 691, 521, 4...|[5.0, 5.0, 4.0, 5...|
|    10|[175, 611, 7, 100...|[3.0, 5.0, 4.0, 5...|
+------+--------------------+--------------------+
only showing top 10 rows



# Convert df to matrix with hot vector

In [74]:

for user in user_rates.collect():
    userID = user["UserID"] - 1
    rated_movies = np.array(user["RatedMovies"]) - 1 # this return a list so we convert it to np array
    rates = np.array(user["Rates"]) # this return a list so we convert it to np array

    # print(rated_movies.shape)

    for i in range(rated_movies.shape[0]):
        users[userID, rated_movies[i]] = rates[i]


users





array([[5., 3., 4., ..., 0., 0., 0.],
       [4., 4., 3., ..., 0., 0., 0.],
       [0., 4., 0., ..., 0., 0., 0.],
       ...,
       [5., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 5., 0., ..., 0., 0., 0.]])

# Visualize the matrix

In [77]:
cols = [f"movie_{i + 1}" for i in range(0, 1682)]
spark.createDataFrame(users, schema=cols).show(10)

23/11/29 11:09:05 WARN TaskSetManager: Stage 169 contains a task of very large size (1160 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------

# Get the distances

In [82]:
def find_dist(vect1):

    dists = []
    # matrix = df.collect()

    for i in range(1, users.shape[0]):
        vect2 = np.array(users[i])

        norm_v1 = np.linalg.norm(vect1)
        norm_v2 = np.linalg.norm(vect2)

        dot_prod = np.dot(vect1, vect2)

        dists.append(dot_prod/(norm_v1) * (norm_v2))

    return dists

dists = find_dist(users[0])

In [83]:
len(dists)

942

# List of watched movie for user 1

In [86]:
watched_movies_user1 = user_rates.collect()[0][1]
print(watched_movies_user1)

[189, 33, 160, 20, 202, 171, 265, 155, 117, 47, 222, 253, 113, 227, 17, 90, 64, 92, 228, 266, 121, 114, 132, 74, 134, 98, 186, 221, 84, 31, 70, 60, 177, 27, 260, 145, 174, 159, 82, 56, 272, 80, 229, 140, 225, 235, 120, 125, 215, 6, 104, 49, 206, 76, 72, 185, 96, 213, 233, 258, 81, 78, 212, 143, 151, 51, 175, 107, 218, 209, 259, 108, 262, 12, 14, 97, 44, 53, 163, 210, 184, 157, 201, 150, 183, 248, 208, 128, 242, 148, 112, 193, 264, 219, 232, 236, 252, 200, 180, 250, 85, 91, 10, 254, 129, 241, 130, 255, 103, 118, 54, 267, 24, 86, 196, 39, 164, 230, 36, 23, 224, 73, 67, 65, 190, 100, 226, 243, 154, 214, 161, 62, 188, 102, 69, 170, 38, 9, 246, 22, 21, 179, 187, 135, 68, 146, 176, 166, 138, 247, 89, 2, 30, 63, 249, 269, 32, 141, 211, 40, 270, 133, 239, 194, 256, 220, 93, 8, 205, 234, 105, 147, 99, 1, 197, 173, 75, 268, 34, 144, 271, 119, 26, 158, 37, 181, 136, 257, 237, 131, 109, 182, 71, 223, 46, 169, 41, 162, 110, 66, 77, 199, 57, 50, 192, 178, 5, 87, 238, 156, 106, 167, 115, 61, 11, 245,