#### Povezivanje Drive-a i otvaranje sesije

In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("SparkSQL").getOrCreate() # getOrCreate() creates new or connects to previously defined session if exists

# **1. Najpopularniji filmovi**

#### **1.1. Prikazati 50 najpopularnijih filmova**

In [None]:
movies_path = '/content/drive/My Drive/BigData/ml-100k/u.item'
movies_df = spark.read \
    .option("delimiter", "|") \
    .csv(movies_path, inferSchema=True, header=False) \
    .toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")

In [None]:
ratings_path = '/content/drive/My Drive/BigData/ml-100k/u.data'
ratings_df = spark.read \
    .option("delimiter", "\t") \
    .csv(ratings_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
from pyspark.sql.functions import count, desc
# Calculate the number of ratings for each movie
ratings_count_df = ratings_df.groupBy("movie_id").agg(count("rating").alias("num_ratings"))

# Calculate the number of views for each movie
views_count_df = ratings_df.groupBy("movie_id").agg(count("user_id").alias("num_views"))

# Join the ratings count and views count with the movies dataframe
popular_movies_df = movies_df.join(ratings_count_df, "movie_id").join(views_count_df, "movie_id")

# Show the top 50 most popular movies based on the number of ratings
print("Top 50 most popular movies based on the number of ratings:")
popular_movies_df.orderBy(desc("num_ratings")).select("movie_id", "title", "num_ratings").show(50, truncate=False)

# Show the top 50 most popular movies based on the number views
print("Top 50 most popular movies based on the number of views:")
popular_movies_df.orderBy(desc("num_views")).select("movie_id", "title", "num_views").show(50, truncate=False)

Top 50 most popular movies based on the number of ratings:
+--------+--------------------------------------------+-----------+
|movie_id|title                                       |num_ratings|
+--------+--------------------------------------------+-----------+
|50      |Star Wars (1977)                            |583        |
|258     |Contact (1997)                              |509        |
|100     |Fargo (1996)                                |508        |
|181     |Return of the Jedi (1983)                   |507        |
|294     |Liar Liar (1997)                            |485        |
|286     |English Patient, The (1996)                 |481        |
|288     |Scream (1996)                               |478        |
|1       |Toy Story (1995)                            |452        |
|300     |Air Force One (1997)                        |431        |
|121     |Independence Day (ID4) (1996)               |429        |
|174     |Raiders of the Lost Ark (1981)              |42

#### **1.2. Za svaki film izračunati broj pregleda i prosečnu ocenu i prikazati nazive 10 filmova sa najviše i 10 filmova sa najmanje pregleda.**

In [None]:
from pyspark.sql.functions import col, avg, count

# Calculate number of views and average rating for each movie
movie_stats = ratings_df.groupBy("movie_id") \
    .agg(count("user_id").alias("num_views"), avg("rating").alias("avg_rating"))

# Join with movies_df to get movie titles
movie_stats = movie_stats.join(movies_df, movie_stats["movie_id"] == movies_df["movie_id"]) \
    .select(movies_df["title"], "num_views", "avg_rating")

# Show 10 movies with the most views
most_views = movie_stats.orderBy(col("num_views").desc()).limit(10)
print("Top 10 movies with the most views:")
most_views.show(truncate=False)

# Show 10 movies with the least views
least_views = movie_stats.orderBy(col("num_views")).limit(10)
print("\nTop 10 movies with the least views:")
least_views.show(truncate=False)

Top 10 movies with the most views:
+-----------------------------+---------+------------------+
|title                        |num_views|avg_rating        |
+-----------------------------+---------+------------------+
|Star Wars (1977)             |583      |4.3584905660377355|
|Contact (1997)               |509      |3.8035363457760316|
|Fargo (1996)                 |508      |4.155511811023622 |
|Return of the Jedi (1983)    |507      |4.007889546351085 |
|Liar Liar (1997)             |485      |3.156701030927835 |
|English Patient, The (1996)  |481      |3.656964656964657 |
|Scream (1996)                |478      |3.4414225941422596|
|Toy Story (1995)             |452      |3.8783185840707963|
|Air Force One (1997)         |431      |3.6310904872389793|
|Independence Day (ID4) (1996)|429      |3.438228438228438 |
+-----------------------------+---------+------------------+


Top 10 movies with the least views:
+---------------------------------------------------------+---------+----

#### **1.3. Prikazati ukupnu prosečnu ocenu svih filmova.**

In [None]:
from pyspark.sql.functions import avg

# Calculate the average rating across all movies
average_rating_all_movies = ratings_df.agg(avg("rating").alias("avg_rating"))

# Show the average rating for all movies
average_rating_all_movies.show()

+----------+
|avg_rating|
+----------+
|   3.52986|
+----------+



#### **1.4. Kreirati atribut kategorički starosna_grupa na osnovu atributa age i prikazati 5 najpopularnijih filmova za svaku grupu.**

In [None]:
# Path to the u.user file
users_path = '/content/drive/My Drive/BigData/ml-100k/u.user'

# Read the u.user file into a DataFrame
users_df = spark.read \
    .option("delimiter", "|") \
    .csv(users_path, inferSchema=True) \
    .toDF("user_id", "age", "gender", "occupation", "zip_code")

# Show the first few rows of the DataFrame
users_df.show(5)

+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|      1| 24|     M|technician|   85711|
|      2| 53|     F|     other|   94043|
|      3| 23|     M|    writer|   32067|
|      4| 24|     M|technician|   43537|
|      5| 33|     F|     other|   15213|
+-------+---+------+----------+--------+
only showing top 5 rows



In [None]:

from pyspark.sql.functions import when, col, count, desc, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# Define age ranges and categorize users
users_df = users_df.withColumn("age_category",
    when(col("age").between(0, 25), "Young") \
    .when(col("age").between(26, 50), "Middle Age") \
    .otherwise("Old"))

# Calculate the number of views for each movie
movie_views = ratings_df.groupBy("movie_id").agg(count("user_id").alias("num_views"))


# Join with movie_df to get movie titles
movie_views_with_titles = movie_views.join(movies_df, movie_views["movie_id"] == movies_df["movie_id"]) \
    .select(movie_views["movie_id"], movies_df["title"], "num_views")

# Join with users_df to get the age category of users
top_movies_per_age_category = ratings_df.join(users_df, "user_id") \
    .join(movie_views_with_titles, "movie_id") \
    .groupBy("age_category", "movie_id", "title") \
    .agg(count("user_id").alias("total_views")) \
    .orderBy("age_category", desc("total_views")) \
    .withColumn("age_category", col("age_category").cast(StringType())) \
    .filter(col("age_category").isin(["Young", "Middle Age", "Old"]))

# Use window function to rank movies within each age category
window_spec = Window.partitionBy("age_category").orderBy(desc("total_views"))
top_movies_per_age_category = top_movies_per_age_category.withColumn("rank", row_number().over(window_spec))

# Filter only the top 5 movies for each age category
top_movies_per_age_category = top_movies_per_age_category.filter(col("rank") <= 5).drop("rank")

# Show the top 5 most popular movies for every age category
top_movies_per_age_category.show(truncate=False)

+------------+--------+---------------------------+-----------+
|age_category|movie_id|title                      |total_views|
+------------+--------+---------------------------+-----------+
|Middle Age  |50      |Star Wars (1977)           |362        |
|Middle Age  |100     |Fargo (1996)               |314        |
|Middle Age  |181     |Return of the Jedi (1983)  |307        |
|Middle Age  |258     |Contact (1997)             |303        |
|Middle Age  |286     |English Patient, The (1996)|300        |
|Old         |286     |English Patient, The (1996)|80         |
|Old         |100     |Fargo (1996)               |60         |
|Old         |300     |Air Force One (1997)       |54         |
|Old         |269     |Full Monty, The (1997)     |50         |
|Old         |127     |Godfather, The (1972)      |48         |
|Young       |288     |Scream (1996)              |195        |
|Young       |50      |Star Wars (1977)           |173        |
|Young       |181     |Return of the Jed

#### **1.5. Izračunati “skor” za svaki film na osnovu sledeće formule:(broj_pregleda*prosečna_ocena)/(max(broj_pregleda)*max(prosečna ocena)). Prikazati top 5 filmova na osnovu izračunatog „skora“.**

In [None]:
from pyspark.sql.functions import max, col

# Calculate max number of views and max average rating
max_views = movie_stats.agg(max("num_views")).collect()[0][0]
max_avg_rating = movie_stats.agg(max("avg_rating")).collect()[0][0]

# Calculate score for each movie
movie_stats = movie_stats.withColumn("score",
                                     (col("num_views") * col("avg_rating")) /
                                     (max_views * max_avg_rating))

# Show top 5 movies based on score
top_movies = movie_stats.orderBy(col("score").desc()).limit(5)
print("Top 5 movies based on score:")
top_movies.show(truncate=False)

Top 5 movies based on score:
+------------------------------+---------+------------------+------------------+
|title                         |num_views|avg_rating        |score             |
+------------------------------+---------+------------------+------------------+
|Star Wars (1977)              |583      |4.3584905660377355|0.8716981132075472|
|Fargo (1996)                  |508      |4.155511811023622 |0.7241852487135506|
|Return of the Jedi (1983)     |507      |4.007889546351085 |0.6970840480274443|
|Contact (1997)                |509      |3.8035363457760316|0.6641509433962264|
|Raiders of the Lost Ark (1981)|420      |4.252380952380952 |0.6126929674099485|
+------------------------------+---------+------------------+------------------+



#### **1.6. Prikazati 5 najpopularnijih filmova po polu.**

In [None]:
from pyspark.sql.functions import count, when, col, desc, first

# Join ratings_df with users_df to include gender information
joined_df = ratings_df.join(users_df, on=["user_id"])

# Calculate the number of views for each movie and gender
movie_gender_views = joined_df.groupBy("movie_id", "gender") \
    .agg(count("user_id").alias("num_views"))

# Convert gender to numeric values (0 for female, 1 for male)
movie_gender_views = movie_gender_views.withColumn("gender_numeric", when(movie_gender_views["gender"] == "F", 0).otherwise(1))

# Pivot the dataframe to have separate columns for male and female views
movie_gender_views_pivot = movie_gender_views.groupBy("movie_id") \
    .pivot("gender_numeric") \
    .agg(first("num_views"))

# Rename the columns
movie_gender_views_pivot = movie_gender_views_pivot.withColumnRenamed("0", "female_views").withColumnRenamed("1", "male_views")

# Calculate the overall number of views for each movie
movie_gender_views_pivot = movie_gender_views_pivot.withColumn("total_views", col("female_views") + col("male_views"))

# Join with movies_df to get movie titles
movie_gender_views_pivot = movie_gender_views_pivot.join(movies_df, movie_gender_views_pivot["movie_id"] == movies_df["movie_id"]) \
    .select(movies_df["title"], "total_views", "female_views", "male_views")

# Show top 5 most popular movies based on the number of views, segmented by gender
top_movies_views_female = movie_gender_views_pivot.orderBy(col("female_views").desc()).limit(5)
print("Top 5 most popular movies based on the number of views for females:")
top_movies_views_female.show(truncate=False)

top_movies_views_male = movie_gender_views_pivot.orderBy(col("male_views").desc()).limit(5)
print("\nTop 5 most popular movies based on the number of views for males:")
top_movies_views_male.show(truncate=False)

ModuleNotFoundError: No module named 'pyspark'

#### **1.7. Prikazati 3 najpopularnija filma po polu i zanimanju korisnika.**

In [None]:
# Configure Spark to show all rows without truncation
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 1000)  # Change 1000 to the desired number of rows


In [None]:
from pyspark.sql.functions import count, when, col

# Join ratings_df with users_df to include gender and occupation information
joined_df = ratings_df.join(users_df, on=["user_id"])

# Calculate the number of views for each movie, gender, and occupation
movie_gender_occupation_views = joined_df.groupBy("movie_id", "gender", "occupation") \
    .agg(count("user_id").alias("num_views"))

# Convert gender to numeric values (0 for female, 1 for male)
movie_gender_occupation_views = movie_gender_occupation_views.withColumn("gender_numeric", when(movie_gender_occupation_views["gender"] == "F", 0).otherwise(1))

# Pivot the dataframe to have separate columns for male and female views
movie_gender_occupation_views_pivot = movie_gender_occupation_views.groupBy("occupation", "movie_id") \
    .pivot("gender_numeric") \
    .agg(first("num_views"))

# Rename the columns
movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.withColumnRenamed("0", "female_views").withColumnRenamed("1", "male_views")

# Calculate the overall number of views for each movie and occupation
movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.withColumn("total_views",
                                                                                     col("female_views") + col("male_views"))

# Join with movies_df to get movie titles
movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.join(movies_df, movie_gender_occupation_views_pivot["movie_id"] == movies_df["movie_id"]) \
    .select("occupation", movies_df["title"], "total_views", "female_views", "male_views")

# Show top 5 most popular movies based on the number of views, segmented by gender and profession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

window_female = Window.partitionBy("occupation").orderBy(col("female_views").desc())
window_male = Window.partitionBy("occupation").orderBy(col("male_views").desc())

top_movies_views_female = movie_gender_occupation_views_pivot.withColumn("rank_female", rank().over(window_female)).filter(col("rank_female") <= 5)
print("Top 5 most popular movies based on the number of views for females, for each occupation:")
top_movies_views_female.show(truncate=False)

top_movies_views_male = movie_gender_occupation_views_pivot.withColumn("rank_male", rank().over(window_male)).filter(col("rank_male") <= 5)
print("\nTop 5 most popular movies based on the number of views for males, for each occupation:")
top_movies_views_male.show(truncate=False)


Top 5 most popular movies based on the number of views for females, for each occupation:
+-------------+----------------------------------------+-----------+------------+----------+-----------+
|occupation   |title                                   |total_views|female_views|male_views|rank_female|
+-------------+----------------------------------------+-----------+------------+----------+-----------+
|administrator|English Patient, The (1996)             |47         |21          |26        |1          |
|administrator|Star Wars (1977)                        |44         |21          |23        |1          |
|administrator|Jerry Maguire (1996)                    |38         |19          |19        |3          |
|administrator|Return of the Jedi (1983)               |40         |18          |22        |4          |
|administrator|Toy Story (1995)                        |31         |18          |13        |4          |
|artist       |Contact (1997)                          |15         |10 

#### **1.8. Prikazati 3 najpopularnija filma u svakom žanru.**

In [None]:
from pyspark.sql.functions import col, count, desc, row_number, lit
from pyspark.sql.window import Window

# Join the ratings dataset to get the number of views for each movie
movie_views = ratings_df.groupBy("movie_id").agg(count("user_id").alias("num_views"))

# Initialize an empty DataFrame to store the results
top_movies_per_genre = spark.createDataFrame([], schema="genre STRING, rank INT, movie_id INT, title STRING, total_views INT")

# Iterate over each genre column and calculate the top 3 movies for each genre
for genre in ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
              "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical",
              "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]:

    # Calculate the total number of views for each movie genre
    genre_views = movies_df.filter(col(genre) == 1).join(movie_views, "movie_id") \
        .groupBy("movie_id", "title").agg(count("movie_id").alias("total_views"))

    # Rank the movies within each genre based on the number of views (descending order)
    window = Window.partitionBy().orderBy(desc("total_views"))
    ranked_movies = genre_views.withColumn("rank", row_number().over(window))

    # Select the top 3 movies for each genre
    top_genre_movies = ranked_movies.filter(col("rank") <= 3).select(lit(genre).alias("genre"), col("rank"), col("movie_id"), col("title"), col("total_views"))

    # Append the results to the top_movies_per_genre DataFrame
    top_movies_per_genre = top_movies_per_genre.union(top_genre_movies)

# Show the top 3 most viewed movies for every genre
top_movies_per_genre.orderBy("genre", "rank").show(truncate=False)


+-----------+----+--------+-----------------------------------+-----------+
|genre      |rank|movie_id|title                              |total_views|
+-----------+----+--------+-----------------------------------+-----------+
|Action     |1   |148     |Ghost and the Darkness, The (1996) |1          |
|Action     |2   |833     |Bulletproof (1996)                 |1          |
|Action     |3   |1088    |Double Team (1997)                 |1          |
|Adventure  |1   |463     |Secret of Roan Inish, The (1994)   |1          |
|Adventure  |2   |148     |Ghost and the Darkness, The (1996) |1          |
|Adventure  |3   |897     |Time Tracers (1995)                |1          |
|Animation  |1   |588     |Beauty and the Beast (1991)        |1          |
|Animation  |2   |101     |Heavy Metal (1981)                 |1          |
|Animation  |3   |596     |Hunchback of Notre Dame, The (1996)|1          |
|Children   |1   |623     |Angels in the Outfield (1994)      |1          |
|Children   

In [None]:
from pyspark.sql.functions import col, count, desc, row_number, lit, sum
from pyspark.sql.window import Window

# Join the ratings dataset to get the number of views for each movie
movie_views = ratings_df.groupBy("movie_id").agg(count("user_id").alias("num_views"))

# Initialize an empty DataFrame to store the results
top_movies_per_genre = spark.createDataFrame([], schema="genre STRING, rank INT, movie_id INT, title STRING, total_views INT")

# Iterate over each genre column and calculate the top 3 movies for each genre
for genre in ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
              "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical",
              "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]:

    # Calculate the total number of views for each movie genre
    genre_views = movies_df.filter(col(genre) == 1).join(movie_views, "movie_id") \
        .groupBy("movie_id", "title").agg(sum(col("num_views")).alias("total_views"))

    # Rank the movies within each genre based on the number of views (descending order)
    window = Window.partitionBy().orderBy(desc("total_views"))
    ranked_movies = genre_views.withColumn("rank", row_number().over(window))

    # Select the top 3 movies for each genre
    top_genre_movies = ranked_movies.filter(col("rank") <= 3).select(lit(genre).alias("genre"), col("rank"), col("movie_id"), col("title"), col("total_views"))

    # Append the results to the top_movies_per_genre DataFrame
    top_movies_per_genre = top_movies_per_genre.union(top_genre_movies)

# Show the top 3 most viewed movies for every genre
top_movies_per_genre.orderBy("genre", "rank").show(truncate=False)


+-----------+----+--------+--------------------------------------------+-----------+
|genre      |rank|movie_id|title                                       |total_views|
+-----------+----+--------+--------------------------------------------+-----------+
|Action     |1   |50      |Star Wars (1977)                            |583        |
|Action     |2   |181     |Return of the Jedi (1983)                   |507        |
|Action     |3   |300     |Air Force One (1997)                        |431        |
|Adventure  |1   |50      |Star Wars (1977)                            |583        |
|Adventure  |2   |181     |Return of the Jedi (1983)                   |507        |
|Adventure  |3   |174     |Raiders of the Lost Ark (1981)              |420        |
|Animation  |1   |1       |Toy Story (1995)                            |452        |
|Animation  |2   |71      |Lion King, The (1994)                       |220        |
|Animation  |3   |95      |Aladdin (1992)                        

# **2. Modeli**

## Trening - ucitavanje, release year, NA

In [None]:
u1base_path = '/content/drive/My Drive/BigData/ml-100k/u1.base'
u1base_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u1base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
movies_path = '/content/drive/My Drive/BigData/ml-100k/u.item'
movies_df = spark.read \
    .option("delimiter", "|") \
    .csv(movies_path, inferSchema=True, header=False) \
    .toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")

In [None]:
# Path to the u.user file
users_path = '/content/drive/My Drive/BigData/ml-100k/u.user'

# Read the u.user file into a DataFrame
users_df = spark.read \
    .option("delimiter", "|") \
    .csv(users_path, inferSchema=True) \
    .toDF("user_id", "age", "gender", "occupation", "zip_code")


In [None]:
# Left join movies_df to u1base_df on movie_id
joined_df = u1base_df.join(movies_df, on='movie_id', how='left')

# Left join users_df to the result on user_id
final_df = joined_df.join(users_df, on='user_id', how='left')

# Show the first few rows of the final DataFrame
final_df.show(5)


+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+
|user_id|movie_id|rating|timestamp|            title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|
+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+
|      1|       1|     5|874965758| Toy Story (1995)| 01-Jan-1995|              NULL|http://us.imdb.co...|      0|     0|        0

In [None]:
from pyspark.sql.functions import col, expr

# Extract the year from the title column
final_df = final_df.withColumn("release_year", expr("substring(title, -5, 4)").cast("integer"))

# Show the first few rows of the DataFrame with the new column
final_df.select("title", "release_year").show(5)


+-----------------+------------+
|            title|release_year|
+-----------------+------------+
| Toy Story (1995)|        1995|
| GoldenEye (1995)|        1995|
|Four Rooms (1995)|        1995|
|Get Shorty (1995)|        1995|
|   Copycat (1995)|        1995|
+-----------------+------------+
only showing top 5 rows



In [None]:
final_df.show(5)

+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|user_id|movie_id|rating|timestamp|            title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|release_year|
+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|      1|       1|     5|874965758| Toy Story (1995)| 01-Jan-1995|              NULL|http:/

In [None]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts = final_df.select([spark_sum(col(column).isNull().cast("int")).alias(column) for column in final_df.columns])

null_counts.show()

+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|user_id|movie_id|rating|timestamp|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|release_year|
+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|      0|       0|     0|        0|    0|           3|             80000|       7|      0|     0|        0|        0|       0|     0|    0|          0|    0|      

In [None]:
from pyspark.sql.functions import col, when

median_release_year = final_df.approxQuantile("release_year", [0.5], 0.25)[0]

final_df = final_df.withColumn("release_year",
                               when(col("release_year").isNull(), expr("substring(release_date, -4, 4)").cast("integer"))
                               .otherwise(col("release_year")))

final_df = final_df.withColumn("release_year",
                               when(col("release_year").isNull(), median_release_year)
                               .otherwise(col("release_year")))

In [None]:
train_df = final_df.select("age", "gender", "occupation", "release_year",
                               "unknown", "Action", "Adventure", "Animation",
                               "Children", "Comedy", "Crime", "Documentary",
                               "Drama", "Fantasy", "Film-Noir", "Horror",
                               "Musical", "Mystery", "Romance", "Sci-Fi",
                               "Thriller", "War", "Western","rating")

train_df.show(5)

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
| 24|     M|technician|      1995.0|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5|
| 24|     M|technician|      1995.0|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|     3|
| 24|     M|technici

In [None]:
null_counts = train_df.select([spark_sum(col(column).isNull().cast("int")).alias(column) for column in train_df.columns])

null_counts.show()

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|  0|     0|         0|           0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     0|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+



## Test - ucitavanje, release year, NA


In [None]:
u1test_path = '/content/drive/My Drive/BigData/ml-100k/u1.test'
u1test_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u1base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
movies_path = '/content/drive/My Drive/BigData/ml-100k/u.item'
movies_df = spark.read \
    .option("delimiter", "|") \
    .csv(movies_path, inferSchema=True, header=False) \
    .toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")

In [None]:
# Path to the u.user file
users_path = '/content/drive/My Drive/BigData/ml-100k/u.user'

# Read the u.user file into a DataFrame
users_df = spark.read \
    .option("delimiter", "|") \
    .csv(users_path, inferSchema=True) \
    .toDF("user_id", "age", "gender", "occupation", "zip_code")


In [None]:
# Left join movies_df to u1base_df on movie_id
joined2_df = u1test_df.join(movies_df, on='movie_id', how='left')

# Left join users_df to the result on user_id
final2_df = joined2_df.join(users_df, on='user_id', how='left')

# Show the first few rows of the final DataFrame
final2_df.show(5)


+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+
|user_id|movie_id|rating|timestamp|            title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|
+-------+--------+------+---------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+
|      1|       1|     5|874965758| Toy Story (1995)| 01-Jan-1995|              NULL|http://us.imdb.co...|      0|     0|        0

In [None]:
from pyspark.sql.functions import col, expr

# Extract the year from the title column
final2_df = final2_df.withColumn("release_year", expr("substring(title, -5, 4)").cast("integer"))

# Show the first few rows of the DataFrame with the new column
final2_df.select("title", "release_year").show(5)

+-----------------+------------+
|            title|release_year|
+-----------------+------------+
| Toy Story (1995)|        1995|
| GoldenEye (1995)|        1995|
|Four Rooms (1995)|        1995|
|Get Shorty (1995)|        1995|
|   Copycat (1995)|        1995|
+-----------------+------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts = final2_df.select([spark_sum(col(column).isNull().cast("int")).alias(column) for column in final2_df.columns])

null_counts.show()


+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|user_id|movie_id|rating|timestamp|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|release_year|
+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|      0|       0|     0|        0|    0|           3|             80000|       7|      0|     0|        0|        0|       0|     0|    0|          0|    0|      

In [None]:
from pyspark.sql.functions import col, when

median_release_year = final2_df.approxQuantile("release_year", [0.5], 0.25)[0]

final2_df = final2_df.withColumn("release_year",
                               when(col("release_year").isNull(), expr("substring(release_date, -4, 4)").cast("integer"))
                               .otherwise(col("release_year")))

final2_df = final2_df.withColumn("release_year",
                               when(col("release_year").isNull(), median_release_year)
                               .otherwise(col("release_year")))

In [None]:
test_df = final2_df.select("age", "gender", "occupation", "release_year",
                               "unknown", "Action", "Adventure", "Animation",
                               "Children", "Comedy", "Crime", "Documentary",
                               "Drama", "Fantasy", "Film-Noir", "Horror",
                               "Musical", "Mystery", "Romance", "Sci-Fi",
                               "Thriller", "War", "Western","rating")
test_df.show(5)

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
| 24|     M|technician|      1995.0|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5|
| 24|     M|technician|      1995.0|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|     3|
| 24|     M|technici

In [None]:
null_counts = test_df.select([spark_sum(col(column).isNull().cast("int")).alias(column) for column in test_df.columns])

null_counts.show()

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|  0|     0|         0|           0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     0|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+



## Trening - OneHotEncoder, asembler

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

# StringIndexer to convert string labels into indices
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[gender_indexer, gender_encoder])

# Fit and transform the pipeline to the DataFrame
trainfinal_df = pipeline.fit(train_df).transform(train_df)

# Show the first few rows of the DataFrame with one-hot encoded gender
trainfinal_df.select("gender", "gender_index", "gender_encoded").show(5)


+------+------------+--------------+
|gender|gender_index|gender_encoded|
+------+------------+--------------+
|     M|         0.0| (1,[0],[1.0])|
|     M|         0.0| (1,[0],[1.0])|
|     M|         0.0| (1,[0],[1.0])|
|     M|         0.0| (1,[0],[1.0])|
|     M|         0.0| (1,[0],[1.0])|
+------+------------+--------------+
only showing top 5 rows



In [None]:
trainfinal_df=trainfinal_df.drop("gender","gender_index")

In [None]:
# StringIndexer to convert string labels into indices
occupation_indexer = StringIndexer(inputCol="occupation", outputCol="occupation_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
occupation_encoder = OneHotEncoder(inputCol="occupation_index", outputCol="occupation_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[occupation_indexer, occupation_encoder])

# Fit and transform the pipeline to the DataFrame
trainfinal_df = pipeline.fit(trainfinal_df).transform(trainfinal_df)


In [None]:
trainfinal_df=trainfinal_df.drop("occupation","occupation_index")

In [None]:
trainfinal_df.columns

['age',
 'release_year',
 'unknown',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'rating',
 'gender_encoded',
 'occupation_encoded']

In [None]:
assembler = VectorAssembler(
    inputCols=["age", "gender_encoded", "occupation_encoded", "release_year",
                               "unknown", "Action", "Adventure", "Animation",
                               "Children", "Comedy", "Crime", "Documentary",
                               "Drama", "Fantasy", "Film-Noir", "Horror",
                               "Musical", "Mystery", "Romance", "Sci-Fi",
                               "Thriller", "War", "Western"],
    outputCol="features")

In [None]:
output = assembler.transform(trainfinal_df)

In [None]:
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|(42,[0,1,10,22,26...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,39...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,29...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,27...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,29...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,24...|
+--------------------+
only showing top 20 rows



In [None]:
output.show()

+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+--------------------+
|age|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|gender_encoded|occupation_encoded|            features|
+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+--------------------+
| 24|      1995.0|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5| (1,[0],[1.0])|    (20,[8],[1.0])|(42,[0,1,10,22,26...|
| 24|      1995.0|      0|     1|        1|        0|       0|     0

In [None]:
finaltrain_data = output.select("features",'rating')

## Test - OneHotEncoder, asembler

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

# StringIndexer to convert string labels into indices
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[gender_indexer, gender_encoder])

# Fit and transform the pipeline to the DataFrame
testfinal_df = pipeline.fit(test_df).transform(test_df)


In [None]:
testfinal_df=testfinal_df.drop("gender","gender_index")

In [None]:

# StringIndexer to convert string labels into indices
occupation_indexer = StringIndexer(inputCol="occupation", outputCol="occupation_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
occupation_encoder = OneHotEncoder(inputCol="occupation_index", outputCol="occupation_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[occupation_indexer, occupation_encoder])

# Fit and transform the pipeline to the DataFrame
testfinal_df = pipeline.fit(testfinal_df).transform(testfinal_df)

In [None]:
testfinal_df=testfinal_df.drop("occupation","occupation_index")

In [None]:
testfinal_df.columns

['age',
 'release_year',
 'unknown',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'rating',
 'gender_encoded',
 'occupation_encoded']

In [None]:
output2 = assembler.transform(testfinal_df)

In [None]:
output2.select("features").show()

+--------------------+
|            features|
+--------------------+
|(42,[0,1,10,22,26...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,39...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,29...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,27...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,29...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,31...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,28...|
|(42,[0,1,10,22,24...|
|(42,[0,1,10,22,24...|
+--------------------+
only showing top 20 rows



In [None]:
finaltest_data = output2.select("features",'rating')

## Linear Regression

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the Linear Regression model
lr = LinearRegression(maxIter=10,featuresCol="features", labelCol="rating")

# Define a grid of hyperparameters to search over
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=lr.getLabelCol(), predictionCol=lr.getPredictionCol())

# Initialize CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit CrossValidator on the training data
cvModel = crossval.fit(finaltrain_data)

# Make predictions on test data
predictions = cvModel.transform(finaltest_data)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
print("Best Model Hyperparameters:")
print("regParam: ", cvModel.bestModel._java_obj.getRegParam())
print("elasticNetParam: ", cvModel.bestModel._java_obj.getElasticNetParam())

Root Mean Squared Error (RMSE) on test data = 1.07585
Best Model Hyperparameters:
regParam:  0.01
elasticNetParam:  0.0


## Random Forest

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Initialize the Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Define a grid of hyperparameters to search over
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 30, 50]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=rf.getLabelCol(), predictionCol=rf.getPredictionCol())

# Initialize CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit CrossValidator on the training data
cvModel = crossval.fit(finaltrain_data)

# Make predictions on test data
predictions = cvModel.transform(finaltest_data)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
# Best model's hyperparameters
print("Best Model Hyperparameters:")
print("numTrees:", cvModel.bestModel.getNumTrees)
print("maxDepth:", cvModel.bestModel.getOrDefault("maxDepth"))
print("minInstancesPerNode:", cvModel.bestModel.getOrDefault("minInstancesPerNode"))

Root Mean Squared Error (RMSE) on test data = 0.987116
Best Model Hyperparameters:
numTrees: 50
maxDepth: 15
minInstancesPerNode: 5


Vidimo da najbolji Random Forest model ima bolje parametre od najboljeg modela linerane regresije.

In [None]:
rf = RandomForestRegressor(numTrees=50,maxDepth=15,minInstancesPerNode=5, featuresCol="features", labelCol="rating")

In [None]:
rfModel = rf.fit(finaltrain_data,)

# **3. Procedura za sistem preporuke**

### Kreiranje dataseta top50 filmova

In [None]:
from pyspark.sql.functions import desc

# Calculate the number of ratings for each movie
ratings_count_df = ratings_df.groupBy("movie_id").agg(count("rating").alias("num_ratings"))

# Join the ratings count with the movies DataFrame
popular_movies_df = movies_df.join(ratings_count_df, "movie_id")

# Create a new DataFrame with the top 50 most popular movies based on the number of ratings
popular_movies_df = popular_movies_df.orderBy(desc("num_ratings")).limit(50)

# Show the new DataFrame
popular_movies_df.show(truncate=False)

+--------+--------------------------------+------------+------------------+---------------------------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+
|movie_id|title                           |release_date|video_release_date|IMDb_URL                                                                   |unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|num_ratings|
+--------+--------------------------------+------------+------------------+---------------------------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+
|50      |Star Wars (1977)                |01-Jan-19

In [None]:
popular_movies_df.columns

['movie_id',
 'title',
 'release_date',
 'video_release_date',
 'IMDb_URL',
 'unknown',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'num_ratings']

In [None]:
from pyspark.sql.functions import col, expr

# Extract the year from the title column
popular_movies_df = popular_movies_df.withColumn("release_year", expr("substring(title, -5, 4)").cast("integer"))

# Show the first few rows of the DataFrame with the new column
popular_movies_df.select("title", "release_year").show(5)

+--------------------+------------+
|               title|release_year|
+--------------------+------------+
|    Star Wars (1977)|        1977|
|      Contact (1997)|        1997|
|        Fargo (1996)|        1996|
|Return of the Jed...|        1983|
|    Liar Liar (1997)|        1997|
+--------------------+------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col, sum as spark_sum

# Izračun broja null vrijednosti po stupcima
null_counts = popular_movies_df.select([spark_sum(col(column).isNull().cast("int")).alias(column) for column in popular_movies_df.columns])

# Prikaz rezultata
null_counts.show()

+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+------------+
|movie_id|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|num_ratings|release_year|
+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+------------+
|       0|    0|           0|                50|       0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|          0|           0|
+--------+-----+------------+------------------+--------+---

In [None]:
from pyspark.sql.functions import col, when

median_release_year = popular_movies_df.approxQuantile("release_year", [0.5], 0.25)[0]

popular_movies_df = popular_movies_df.withColumn("release_year",
                               when(col("release_year").isNull(), expr("substring(release_date, -4, 4)").cast("integer"))
                               .otherwise(col("release_year")))

popular_movies_df = popular_movies_df.withColumn("release_year",
                               when(col("release_year").isNull(), median_release_year)
                               .otherwise(col("release_year")))

In [None]:
top50_df = popular_movies_df.select("movie_id", "title", "release_year",
                               "unknown", "Action", "Adventure", "Animation",
                               "Children", "Comedy", "Crime", "Documentary",
                               "Drama", "Fantasy", "Film-Noir", "Horror",
                               "Musical", "Mystery", "Romance", "Sci-Fi",
                               "Thriller", "War", "Western")

top50_df.show(5)

+--------+--------------------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|               title|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+--------+--------------------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      50|    Star Wars (1977)|      1977.0|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      1|     1|       0|  1|      0|
|     258|      Contact (1997)|      1997.0|      0|     0|        0|        0|       0|     0|    0|          0|    1|      0|        0|     0|      0|      0|      0|     1|       0|  0|      0|
|     100|     

Bitno je istaći da iako **nećemo u model** slati **movie id** i **title**, ove dve kolone smo zadržali u dataframe-u kako bismo na kraju imali kao izlaz iz procedure same nazive filmove koje treba preporučiti klijentu!

### Sredjivanje baze - prebacivanje gender i occupation uz OneHotEncoder

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
# StringIndexer to convert string labels into indices
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[gender_indexer, gender_encoder])

# Fit and transform the pipeline to the DataFrame
userstransformed_df = pipeline.fit(users_df).transform(users_df)

# Show the first few rows of the DataFrame with one-hot encoded gender
userstransformed_df.select("gender", "gender_index", "gender_encoded").show(5)


+------+------------+--------------+
|gender|gender_index|gender_encoded|
+------+------------+--------------+
|     M|         0.0| (1,[0],[1.0])|
|     F|         1.0|     (1,[],[])|
|     M|         0.0| (1,[0],[1.0])|
|     M|         0.0| (1,[0],[1.0])|
|     F|         1.0|     (1,[],[])|
+------+------------+--------------+
only showing top 5 rows



In [None]:
# StringIndexer to convert string labels into indices
occupation_indexer = StringIndexer(inputCol="occupation", outputCol="occupation_index")

# OneHotEncoder to convert indices into one-hot encoded vectors
occupation_encoder = OneHotEncoder(inputCol="occupation_index", outputCol="occupation_encoded")

# Pipeline to chain StringIndexer and OneHotEncoder
pipeline = Pipeline(stages=[occupation_indexer, occupation_encoder])

# Fit and transform the pipeline to the DataFrame
userstransformed_df= pipeline.fit(userstransformed_df).transform(userstransformed_df)

### Funkcije namenjene kreiranju finalnog dataframe-a koji ulazi u model

In [None]:
def get_user_info(user_id):
    user_info = userstransformed_df.filter(userstransformed_df.user_id == user_id).select('gender_encoded', 'occupation_encoded', 'age').collect()
    return user_info[0]

Funkcija get_user_info za datog usera, vraća njegove vrednosti iz baze (sređenog dataframe-a uz pomoć OneHotEncodera)

In [None]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import lit

def create_df_for_model(user_id):

  gender_encoded, occupation_encoded, age = get_user_info(user_id)

  gender_encoded_udf = udf(lambda: gender_encoded, VectorUDT())
  occupation_encoded_udf = udf(lambda: occupation_encoded, VectorUDT())

  #Odabir varijabli za filmove koje idu u model
  selected_columns = [col for col in top50_df.columns if col not in ["title", "movie_id"]]

  top50_df_movies = top50_df.select(selected_columns)

  recomendation_df = top50_df_movies.withColumn("gender_encoded", gender_encoded_udf())
  recomendation_df = recomendation_df.withColumn("occupation_encoded", occupation_encoded_udf())
  recomendation_df = recomendation_df.withColumn("age", lit(age))

  return recomendation_df

Funkcija vraća dataframe koji može ući u model.
1. Pozivajući get_user_info funkciju ona dobija sve informacije o korisniku.
2. Prilagođava gender_encoded i occupation_encoded tako da mogu biti upisane vektorskim zapisom u odgovarajuće kolone (age je tipa int pa za nju ovo ne moramo raditi)
3. Uzima sve kolone sem title i movie_id, jer one kao metapodaci neće ići u model
4. Spaja sve podatke i vraća dataframe

Simulacija same funkcije:

In [None]:
create_df_for_model(1).show()

+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+--------------+------------------+---+
|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|gender_encoded|occupation_encoded|age|
+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+--------------+------------------+---+
|      1977.0|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      1|     1|       0|  1|      0| (1,[0],[1.0])|   (20,[11],[1.0])| 24|
|      1997.0|      0|     0|        0|        0|       0|     0|    0|          0|    1|      0|        0|     0|      0|      0|      0|     1|       0|  0|      0| (1,[0],[1.0])

### Funkcije preporuke

In [None]:
def get_user_ratings(user_id):

  dataframe=create_df_for_model(user_id)

  output=assembler.transform(dataframe)

  unlabeled_data = output.select("features")

  predictions_for_user = rfModel.transform(unlabeled_data)

  return predictions_for_user

Funkcija get_user_ratings:
1. Poziva funkciju create_df_for_model uz pomoć koje kreira odgovarajući dataframe
2. Oslanjamo se na to da koristimo asembler prethodno korišćen na trening setu, kao i model koji je već ranije fitovan na trening setu, te njih samo pozivamo.
3. Vraćamo predikcije kako će user oceniti 50 najgledanijih filmova.

Simulacija funkcije:

In [None]:
get_user_ratings(1).show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|(42,[0,1,13,22,24...| 4.543060425541973|
|(42,[0,1,13,22,31...| 3.746221394485979|
|(42,[0,1,13,22,29...|3.7914356903527335|
|(42,[0,1,13,22,24...|3.9819745755846143|
|(42,[0,1,13,22,28...| 3.028546334262611|
|(42,[0,1,13,22,31...|3.5755561693230975|
|(42,[0,1,13,22,34...|3.1881490845758647|
|(42,[0,1,13,22,26...|3.5834923977026945|
|(42,[0,1,13,22,24...| 2.917219620636603|
|(42,[0,1,13,22,24...| 2.817979237468084|
|(42,[0,1,13,22,24...| 3.886261488873048|
|(42,[0,1,13,22,24...| 4.390802902617976|
|(42,[0,1,13,22,29...|3.8283943070511675|
|(42,[0,1,13,22,31...| 3.733703862650661|
|(42,[0,1,13,22,31...| 4.458225150664392|
|(42,[0,1,13,22,31...| 3.580499986302564|
|(42,[0,1,13,22,24...|2.7579803725911285|
|(42,[0,1,13,22,24...| 4.369749319383048|
|(42,[0,1,13,22,24...|3.0869745800007924|
|(42,[0,1,13,22,28...|3.9241199738033186|
+--------------------+------------

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

def get_user_recommendation(user_id):

  predictions_for_user = get_user_ratings(user_id)

  df1_with_index = predictions_for_user.withColumn("row_index", monotonically_increasing_id())
  df2_with_index = top50_df.withColumn("row_index", monotonically_increasing_id())

  merged_df = df1_with_index.join(df2_with_index, "row_index").drop("row_index")

  merged_df = merged_df.orderBy(desc("prediction")).select(lit(user_id).alias("user_id"),"movie_id","prediction")

  return merged_df

Za kraj funkcija get_user_recommendation
1. poziva prethodnu funkciju get_user_ratings, koja vraća sve ocene usera za 50 najpopularnijih filmova
2. spaja te ocene sa top50 dataframe-om kakobismo znali momovie_id i title tih top 50 filmova
3. sortira filmove na osnovu ratinga, vraćajući za datog specifičnog korisnika koji od top 50 filmova mu najpre odnosno kojim redolsedom treba preporučiti

Simulacija funkcije

In [None]:
get_user_recommendation(1).show()

+-------+--------+--------------------+------------------+
|user_id|movie_id|               title|        prediction|
+-------+--------+--------------------+------------------+
|      1|      50|    Star Wars (1977)| 4.543060425541973|
|      1|      98|Silence of the La...| 4.458225150664392|
|      1|     127|Godfather, The (1...| 4.390802902617976|
|      1|     172|Empire Strikes Ba...| 4.369749319383048|
|      1|     173|Princess Bride, T...| 4.227725787559744|
|      1|     183|        Alien (1979)| 4.173106899618158|
|      1|     318|Schindler's List ...| 4.122048214710337|
|      1|     302|L.A. Confidential...| 4.117428070655601|
|      1|     176|       Aliens (1986)| 4.095936650900483|
|      1|     195|Terminator, The (...| 4.075700287772512|
|      1|     168|Monty Python and ...| 3.996811336471928|
|      1|      96|Terminator 2: Jud...|3.9833379861250506|
|      1|     181|Return of the Jed...|3.9819745755846143|
|      1|     204|Back to the Futur...|3.924119973803318

# **4. Sistem preporuke**

## 4.1. Optimizacija hiperparametara - ALS


In [None]:
u1base_path = '/content/drive/My Drive/BigData/ml-100k/u1.base'
u1base_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u1base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
u1test_path = '/content/drive/My Drive/BigData/ml-100k/u1.test'
u1test_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u1base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the ALS model
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop")

# Define a grid of hyperparameters to search over
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(als.maxIter, [5, 10, 20]) \
    .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Initialize CrossValidator
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit CrossValidator on the training data
cvModel = crossval.fit(u1base_df)

# Make predictions on test data
predictions = cvModel.transform(u1test_df)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
print("Best Model Hyperparameters:")
print("rank: ", cvModel.bestModel.rank)
print("regParam: ", cvModel.bestModel._java_obj.parent().getRegParam())
print("maxIter: ", cvModel.bestModel._java_obj.parent().getMaxIter())

Root Mean Squared Error (RMSE) on test data = 0.640688
Best Model Hyperparameters:
rank:  30
regParam:  0.1
maxIter:  20


In [None]:
# Build the recommendation model using ALS on the training data
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=20, regParam=0.1, rank=30, userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop" )
alsModel = als.fit(u1base_df)

## 4.2. Kreiranje promenljivih users_with_grades i users_without_grades

Korišćenjem u2.base identifikujte korisnike koji su dali manje od 5 ocena ili se ne pojavljuju
u u1.base i napravite dve promenljive: users_with_grades (korisnici iz u2.base koji se
pojavljuju u u1.base i imaju minimum 5 ocena) i users_without_grades (korisnici iz u2.base
koji se ne pojavljuju u u1.base ili imaju manje od 5 ocena)).

In [None]:
u1base_path = '/content/drive/My Drive/BigData/ml-100k/u1.base'
u1base_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u1base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
u1base_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       1|     5|874965758|
|      1|       2|     3|876893171|
|      1|       3|     4|878542960|
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       7|     4|875071561|
|      1|       8|     1|875072484|
|      1|       9|     5|878543541|
|      1|      11|     2|875072262|
|      1|      13|     5|875071805|
|      1|      15|     5|875071608|
|      1|      16|     5|878543541|
|      1|      18|     4|887432020|
|      1|      19|     5|875071515|
|      1|      21|     1|878542772|
|      1|      22|     4|875072404|
|      1|      25|     4|875071805|
|      1|      26|     3|875072442|
|      1|      28|     4|875072173|
|      1|      29|     1|878542869|
+-------+--------+------+---------+
only showing top 20 rows



In [None]:
u2base_path = '/content/drive/My Drive/BigData/ml-100k/u2.base'
u2base_df = spark.read \
    .option("delimiter", "\t") \
    .csv(u2base_path, inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")

In [None]:
u2base_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       3|     4|878542960|
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       6|     5|887431973|
|      1|       7|     4|875071561|
|      1|      10|     3|875693118|
|      1|      11|     2|875072262|
|      1|      12|     5|878542960|
|      1|      13|     5|875071805|
|      1|      14|     5|874965706|
|      1|      15|     5|875071608|
|      1|      16|     5|878543541|
|      1|      17|     3|875073198|
|      1|      18|     4|887432020|
|      1|      19|     5|875071515|
|      1|      20|     4|887431883|
|      1|      23|     4|875072895|
|      1|      24|     3|875071713|
|      1|      25|     4|875071805|
|      1|      27|     2|876892946|
+-------+--------+------+---------+
only showing top 20 rows



In [None]:
# Extract user_ids from u1.base and u2.base
u1_users = u1base_df.select("user_id").distinct()
u2_users = u2base_df.select("user_id").distinct()
# Identify users from u2.base that do not exist in u1.base
missing_users = u2_users.subtract(u1_users)
# Show the missing user_ids
missing_users.show()

+-------+
|user_id|
+-------+
+-------+



In [None]:
from pyspark.sql.functions import col

# Perform a left anti join to find user IDs only in u2base_df
users_only_in_u2base = u2base_df.join(u1base_df, on=['user_id'], how='left_anti')

# Show the user IDs only in u2base_df
users_only_in_u2base.show()


+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
+-------+--------+------+---------+



In [None]:
user_rating_counts = u2base_df.groupBy('user_id').count()

# Filter users who have rated less than 5 movies
users_less_than_5_movies = user_rating_counts.filter(col('count') < 5)

users_less_than_5_movies.show()

+-------+-----+
|user_id|count|
+-------+-----+
+-------+-----+



In [None]:
from pyspark.sql.functions import col

user_rating_counts = u2base_df.groupBy('user_id').count()

# Filter users who have rated less than 5 movies
users_less_than_5_movies = user_rating_counts.filter(col('count') < 5)

# Join the original DataFrame with the filtered users to get their ratings
filtered_dataframe = u2base_df.join(users_less_than_5_movies, 'user_id', 'inner')

# Show the resulting DataFrame containing users who have rated less than 5 movies
filtered_dataframe.show()

+-------+--------+------+---------+-----+
|user_id|movie_id|rating|timestamp|count|
+-------+--------+------+---------+-----+
+-------+--------+------+---------+-----+




*Korišćenjem u2.base identifikujte korisnike koji su dali manje od 5 ocena ili se ne pojavljuju u u1.base - users_without_grades*.

Vidimo da nam uslov od 5 ocena i nije dovoljan i da nam za simulaciju ColdStarta trebaju dodatni uslovi, jer na ovaj način dobijamo prazan skup, tako da ćemo prvo proveriti koja granica ocena bi nam odgovarala.

In [None]:
from pyspark.sql import functions as F

# Grupisanje po korisničkom identifikatoru i brojanje ocena po korisniku
user_ratings_count = u2base_df.groupBy("user_id").agg(F.count("rating").alias("num_ratings"))

# Sortiranje korisnika po broju ocena u opadajućem redosledu
sorted_users = user_ratings_count.orderBy(F.asc("num_ratings"))

# Prikaz prvih nekoliko redova novog skupa podataka
sorted_users.show()

+-------+-----------+
|user_id|num_ratings|
+-------+-----------+
|    512|          9|
|    511|          9|
|    431|         10|
|    441|         10|
|    335|         11|
|    461|         12|
|    384|         12|
|    520|         12|
|    105|         12|
|    475|         12|
|     36|         12|
|    418|         13|
|     19|         13|
|    202|         13|
|    444|         13|
|    400|         13|
|    446|         13|
|    364|         13|
|    558|         13|
|    516|         13|
+-------+-----------+
only showing top 20 rows



In [None]:
# Define percentiles of interest
percentiles = [0.1,0.15,0.20,0.25, 0.50, 0.75, 0.90, 0.95, 0.99]

# Calculate percentiles for num_ratings
ratings_percentiles = sorted_users.approxQuantile("num_ratings", percentiles, relativeError=0.01)

# Print percentiles
for percentile, value in zip(percentiles, ratings_percentiles):
    print(f"Percentile {percentile * 100}%: {value}")

Percentile 10.0%: 19.0
Percentile 15.0%: 21.0
Percentile 20.0%: 24.0
Percentile 25.0%: 26.0
Percentile 50.0%: 49.0
Percentile 75.0%: 115.0
Percentile 90.0%: 190.0
Percentile 95.0%: 236.0
Percentile 99.0%: 669.0


Možemo uzeti klijente koji nisu ocenili više od 25 filmova, kako bismo dobili 20% klijenata, za sam model.

In [None]:
user_rating_counts = u2base_df.groupBy('user_id').count()

# Filter users who have rated less than 25 movies
users_less_than_25_movies = user_rating_counts.filter(col('count') < 25)

# Join the original DataFrame with the filtered users to get their ratings
filtered_dataframe = u2base_df.join(users_less_than_25_movies, 'user_id', 'inner')

# Show the resulting DataFrame containing users who have rated less than 5 movies
filtered_dataframe.show()

+-------+--------+------+---------+-----+
|user_id|movie_id|rating|timestamp|count|
+-------+--------+------+---------+-----+
|      4|      11|     4|892004520|   18|
|      4|      50|     5|892003526|   18|
|      4|     260|     4|892004275|   18|
|      4|     264|     3|892004275|   18|
|      4|     288|     4|892001445|   18|
|      4|     294|     5|892004409|   18|
|      4|     301|     5|892002353|   18|
|      4|     303|     5|892002352|   18|
|      4|     324|     5|892002353|   18|
|      4|     327|     5|892002352|   18|
|      4|     354|     5|892002353|   18|
|      4|     356|     3|892003459|   18|
|      4|     357|     4|892003525|   18|
|      4|     358|     2|892004275|   18|
|      4|     359|     5|892002352|   18|
|      4|     360|     5|892002352|   18|
|      4|     361|     5|892002353|   18|
|      4|     362|     5|892002352|   18|
|      9|       6|     5|886960055|   16|
|      9|       7|     4|886960030|   16|
+-------+--------+------+---------

In [None]:
users_without_grades= filtered_dataframe.select("user_id").distinct()

In [None]:
row_count = users_without_grades.count()
print("Number of rows in the DataFrame:", row_count)

Number of rows in the DataFrame: 195


In [None]:
from pyspark.sql.functions import col

# Count the number of unique user IDs in u2base_df
user_id_count = u2base_df.select('user_id').distinct().count()

# Print the number of unique user IDs
print("Number of unique user IDs in the DataFrame:", user_id_count)

Number of unique user IDs in the DataFrame: 943


Sada imamo users_without_grades koje cemo koristiti kao nove klijente, i nad njima pozivati content-based recomendation system.

In [None]:
all_users=u2base_df.select('user_id').distinct()

In [None]:
users_only_in_u2base = all_users.join(users_without_grades, on=['user_id'], how='left_anti')

users_with_grades=users_only_in_u2base.select('user_id').distinct()

user_id_count = users_with_grades.count()

# Print the number of unique user IDs
print("Number of unique user IDs in the DataFrame:", user_id_count)


Number of unique user IDs in the DataFrame: 748


Za users_with_grades koristicemo colaborative recommendation system.

In [None]:
movies_only_in_u2base = u2base_df.join(u1base_df, on=['movie_id'], how='left_anti')

movies_u2_only=movies_only_in_u2base.select('movie_id').distinct()

movies_u2_only_count = movies_u2_only.count()

print("Number of unique movie IDs in the DataFrame:", movies_u2_only_count)

Number of unique movie IDs in the DataFrame: 32


In [None]:
movies_both_in_u2u1base = u2base_df.join(u1base_df, on=['movie_id'], how='inner')

movies_u_12=movies_both_in_u2u1base.select('movie_id').distinct()

movies_u_12_count = movies_u_12.count()

print("Number of unique movie IDs in the DataFrame:", movies_u_12_count)

Number of unique movie IDs in the DataFrame: 1616


## 4.3. **ALS**

In [None]:
movies_u2_only.show()

+--------+
|movie_id|
+--------+
|     857|
|    1533|
|    1561|
|     830|
|    1156|
|    1493|
|    1457|
|    1582|
|    1565|
|     599|
|    1505|
|     711|
|    1586|
|    1458|
|     852|
|    1310|
|    1520|
|    1536|
|    1498|
|    1236|
+--------+
only showing top 20 rows



In [None]:
users_with_grades.show()

+-------+
|user_id|
+-------+
|    148|
|    463|
|    496|
|    833|
|    243|
|    392|
|    540|
|    623|
|    737|
|    897|
|     31|
|     85|
|    137|
|    251|
|    451|
|    580|
|     65|
|    458|
|    879|
|    883|
+-------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import lit

# Add a dummy column to both DataFrames for the cross join
users_with_grades_2 = users_with_grades.withColumn("dummy", lit(1))
movies_u2_only_2 = movies_u_12.withColumn("dummy", lit(1))

# Perform a cross join to get all combinations of user_id and movie_id
combinations_df = users_with_grades_2.crossJoin(movies_u2_only_2).drop("dummy")

# Show the resulting DataFrame
combinations_df.show()

+-------+--------+
|user_id|movie_id|
+-------+--------+
|    148|     496|
|    463|     496|
|    496|     496|
|    833|     496|
|    243|     496|
|    392|     496|
|    540|     496|
|    623|     496|
|    737|     496|
|    897|     496|
|     31|     496|
|     85|     496|
|    137|     496|
|    251|     496|
|    451|     496|
|    580|     496|
|     65|     496|
|    458|     496|
|    879|     496|
|    883|     496|
+-------+--------+
only showing top 20 rows



In [None]:
combinations_filtered_df= combinations_df.join(u1base_df, on=['user_id','movie_id'], how='left_anti')

In [None]:
predictions=alsModel.transform(combinations_filtered_df)

In [None]:
predictions.show()

+-------+--------+----------+
|user_id|movie_id|prediction|
+-------+--------+----------+
|    463|     496| 3.1894655|
|    833|     496| 2.9666798|
|    243|     496| 3.8639917|
|    392|     496| 4.6330442|
|    540|     496| 4.1411133|
|    623|     496|  4.360417|
|    737|     496| 3.0798228|
|     31|     496| 3.2548058|
|    137|     496|  4.213037|
|    251|     496|  4.474982|
|    451|     496|  3.259228|
|    580|     496| 3.4376345|
|     65|     496|  4.697401|
|    879|     496| 4.1149116|
|    255|     496| 2.5624268|
|    481|     496|  4.319248|
|    898|     496|  3.738114|
|    296|     496| 4.7384834|
|    853|     496| 3.4097672|
|    322|     496| 4.1037025|
+-------+--------+----------+
only showing top 20 rows



In [None]:
provera2=predictions.select(['user_id','movie_id']).distinct()

pro = provera2.count()

# Print the number of unique user IDs
print("Number of unique user IDs in the DataFrame:", pro)

Number of unique user IDs in the DataFrame: 1132508


In [None]:
from pyspark.sql.window import Window

# Assuming predictions is a PySpark DataFrame
window_spec = Window.partitionBy("user_id").orderBy(F.desc("prediction"))

sorted_predictions = predictions.withColumn("rank", F.row_number().over(window_spec))

sorted_predictions.show()

+-------+--------+----------+----+
|user_id|movie_id|prediction|rank|
+-------+--------+----------+----+
|      1|    1449|  4.900473|   1|
|      1|     408| 4.7349005|   2|
|      1|    1142|  4.729363|   3|
|      1|     357|   4.70418|   4|
|      1|     474| 4.6619873|   5|
|      1|     134|  4.656081|   6|
|      1|     285|  4.652624|   7|
|      1|     513| 4.6266813|   8|
|      1|      64| 4.5511584|   9|
|      1|     694| 4.5195055|  10|
|      1|     190| 4.5152802|  11|
|      1|      12| 4.5061355|  12|
|      1|     603|   4.48525|  13|
|      1|     174|  4.483014|  14|
|      1|     515|  4.475907|  15|
|      1|     100|  4.475713|  16|
|      1|     318|  4.471527|  17|
|      1|     171| 4.4549828|  18|
|      1|     856| 4.4367366|  19|
|      1|     493| 4.4278884|  20|
+-------+--------+----------+----+
only showing top 20 rows



In [None]:
# Filter out only the top prediction for each user
top_predictions = sorted_predictions.filter(F.col("rank") < 4)

# Drop the rank column if you don't need it anymore
top_predictions = top_predictions.drop("rank")

# Show the sorted predictions
top_predictions.show()

+-------+--------+----------+
|user_id|movie_id|prediction|
+-------+--------+----------+
|      1|    1449|  4.900473|
|      1|     408| 4.7349005|
|      1|    1142|  4.729363|
|      2|    1449| 4.9271026|
|      2|     963| 4.7906623|
|      2|    1194| 4.7591915|
|      3|     430| 4.1714244|
|      3|     896| 4.0647435|
|      3|    1005| 4.0561514|
|      5|    1240|  4.405801|
|      5|      89|  4.189338|
|      5|     114| 4.1229844|
|      6|    1643| 4.6006684|
|      6|     483|  4.365632|
|      6|     515| 4.3084908|
|      7|    1643|  6.021322|
|      7|    1131|  5.104259|
|      7|     320| 5.0423336|
|      8|    1167|  4.942864|
|      8|     313|  4.777533|
+-------+--------+----------+
only showing top 20 rows



## 4.4. **Procedura za sistem preporuke baziran na kolaborativnom filtriranju**

get_user_recommendation

In [None]:
from functools import reduce

In [None]:
def get_users_content_based_recommendation(dataframe):

  # Assuming you have a function named 'your_function' that takes a user_id parameter and returns a DataFrame

  # Collect unique user_ids from your DataFrame
  user_ids = dataframe.select("user_id").distinct().rdd.map(lambda x: x[0]).collect()

  # Apply the function to each user_id and collect the resulting DataFrames
  user_dataframes = [get_user_recommendation(user_id) for user_id in user_ids]

  # Combine the DataFrames into a single DataFrame using reduce or union
  combined_dataframe = reduce(lambda df1, df2: df1.union(df2), user_dataframes)

  # Show the combined DataFrame
  return combined_dataframe

In [None]:
users_without_grades.show()

+-------+
|user_id|
+-------+
|    471|
|    858|
|    516|
|    808|
|     53|
|    799|
|    133|
|     78|
|    513|
|    362|
|    857|
|    375|
|    876|
|    108|
|    155|
|     34|
|    596|
|    762|
|    300|
|    688|
+-------+
only showing top 20 rows



In [None]:
def recommendation_systems(u2, u1, movies_with_grades):

  # Filter users who have rated less than 25 movies
  user_rating_counts = u2.groupBy('user_id').count()
  users_less_than_25_movies = user_rating_counts.filter(col('count') < 25)
  users_without_grades = u2.join(users_less_than_25_movies, 'user_id', 'inner')
  users_without_grades= users_without_grades.select("user_id").distinct()

  all_users=u2.select('user_id').distinct()

  # Filter users who have rated more than 25 movies
  users_with_grades = all_users.join(users_without_grades, on=['user_id'], how='left_anti')
  users_with_grades=users_with_grades.select('user_id').distinct()
  print("User segmentation based on grades done")

  #ALS model for users with grades
  users_with_grades_2 = users_with_grades.withColumn("dummy", lit(1))
  movies_with_grades = movies_with_grades.withColumn("dummy", lit(1))
  combinations_df = users_with_grades_2.crossJoin(movies_with_grades).drop("dummy")

  combinations_filtered_df= combinations_df.join(u1, on=['user_id','movie_id'], how='left_anti')
  als_predictions=alsModel.transform(combinations_filtered_df)
  window_spec  = Window.partitionBy("user_id").orderBy(F.desc("prediction"))
  als_sorted_predictions = als_predictions.withColumn("rank", F.row_number().over(window_spec))
  als_sorted_predictions= als_sorted_predictions.drop("rank")
  print("Colaborative filtering done")

  #prediction model for users without grades
  rf_predictions=get_users_content_based_recommendation(users_without_grades)
  print("Content based done")

  final_df = als_sorted_predictions.union(rf_predictions)

  sorted_final = final_df.orderBy(final_df.user_id.asc(), final_df.prediction.desc())

  return sorted_final

In [None]:
final=recommendation_systems(u2base_df,u1base_df, movies_u_12)

User segmentation based on grades done
Colaborative filtering done
Content based done


In [None]:
u2base_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       3|     4|878542960|
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       6|     5|887431973|
|      1|       7|     4|875071561|
|      1|      10|     3|875693118|
|      1|      11|     2|875072262|
|      1|      12|     5|878542960|
|      1|      13|     5|875071805|
|      1|      14|     5|874965706|
|      1|      15|     5|875071608|
|      1|      16|     5|878543541|
|      1|      17|     3|875073198|
|      1|      18|     4|887432020|
|      1|      19|     5|875071515|
|      1|      20|     4|887431883|
|      1|      23|     4|875072895|
|      1|      24|     3|875071713|
|      1|      25|     4|875071805|
|      1|      27|     2|876892946|
+-------+--------+------+---------+
only showing top 20 rows



In [None]:
# Grouping by user_id and counting the number of ratings for each user
ratings_count = u2base_df.groupBy('user_id').agg(count('rating').alias('ratings_count'))

# Sorting the users based on the number of ratings they've given
ratings_count_sorted = ratings_count.orderBy('ratings_count', ascending=False)

# Displaying the result
ratings_count_sorted.show()

+-------+-------------+
|user_id|ratings_count|
+-------+-------------+
|    655|          669|
|     13|          483|
|    405|          469|
|    846|          405|
|    682|          399|
|    276|          394|
|    303|          372|
|    880|          368|
|    234|          367|
|    896|          362|
|    796|          358|
|    758|          357|
|    537|          354|
|    181|          342|
|    279|          337|
|    804|          332|
|    889|          326|
|    393|          324|
|    727|          322|
|    450|          319|
+-------+-------------+
only showing top 20 rows



In [None]:
user_655_data = final.filter(final.user_id == 655)

In [None]:
user_655_data.show()

+-------+--------+------------------+
|user_id|movie_id|        prediction|
+-------+--------+------------------+
|    655|    1449| 3.655669689178467|
|    655|     169|3.5332837104797363|
|    655|     408|3.5297415256500244|
|    655|    1122| 3.489656925201416|
|    655|     641|3.4562602043151855|
|    655|     646|3.4023537635803223|
|    655|     180|3.4021127223968506|
|    655|     488| 3.396965980529785|
|    655|     506|3.3946940898895264|
|    655|     593|3.3863847255706787|
|    655|    1064| 3.377713918685913|
|    655|     648| 3.371241807937622|
|    655|     745| 3.342625379562378|
|    655|     589|3.3390207290649414|
|    655|     493|3.3312554359436035|
|    655|    1039| 3.323634386062622|
|    655|    1558|3.3192474842071533|
|    655|     482|3.3185818195343018|
|    655|     199|3.3081538677215576|
|    655|     484|3.2836647033691406|
+-------+--------+------------------+
only showing top 20 rows

