## Install dependencies

In [1]:
%pip install -r requirements.txt





[notice] A new release of pip is available: 24.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import pandas as pd
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import year
import time

In [3]:
# Add a decorator to time the function
def timeit(method):
    def timed(*args, **kw):
        start_time = time.time()
        result = method(*args, **kw)
        end_time = time.time()
        print(f"{method.__name__} took: {end_time - start_time} sec")
        return result
    return timed

## Read data from Postgre

- First, download the Postgre JDBC driver

In [4]:
@timeit
def connect_postgre():
    spark_postgre = SparkSession \
        .builder \
        .config("spark.jars", "D:\\projects\\uni_projects\\Big-Data\\Movie-Recommendation-System\\jars\\postgresql-42.7.4.jar") \
        .config("spark.driver.extraClassPath", "D:\\projects\\uni_projects\\Big-Data\\Movie-Recommendation-System\\jars\\postgresql-42.7.4.jar") \
        .getOrCreate()
    return spark_postgre

spark_postgre = connect_postgre()

connect_postgre took: 7.4272308349609375 sec


In [5]:
@timeit
def read_movies_from_postgre(spark_postgre):
    df = spark_postgre.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("dbtable", "movies") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .load()
    df.show(20)

read_movies_from_postgre(spark_postgre)

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Adventure|Animati...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|Comedy|Drama|Romance|
|       5|Father of the Bri...|              Comedy|
|       6|         Heat (1995)|Action|Crime|Thri...|
|       7|      Sabrina (1995)|      Comedy|Romance|
|       8| Tom and Huck (1995)|  Adventure|Children|
|       9| Sudden Death (1995)|              Action|
|      10|    GoldenEye (1995)|Action|Adventure|...|
|      11|American Presiden...|Comedy|Drama|Romance|
|      12|Dracula: Dead and...|       Comedy|Horror|
|      13|        Balto (1995)|Adventure|Animati...|
|      14|        Nixon (1995)|               Drama|
|      15|Cutthroat Island ...|Action|Adventure|...|
|      16|       Casino (1995)|         Crime|

In [6]:
# @timeit
# def read_ratings_from_postgre(spark_postgre):
#     df = spark_postgre.read \
#         .format("jdbc") \
#         .option("url", "jdbc:postgresql://localhost:5432/postgres") \
#         .option("dbtable", "ratings") \
#         .option("user", "postgres") \
#         .option("password", "Baotram2004") \
#         .option("driver", "org.postgresql.Driver") \
#         .load()
#     try:
#         df.show(20)
#     except Exception as e:
#         print(e)

# read_ratings_from_postgre(spark_postgre)

In [7]:
# @timeit
# def read_ratings_from_postgre(spark_postgre):
#     df = spark_postgre.read \
#         .format("jdbc") \
#         .option("url", "jdbc:postgresql://localhost:5432/postgres") \
#         .option("dbtable", "ratings") \
#         .option("user", "postgres") \
#         .option("password", "Baotram2004") \
#         .option("driver", "org.postgresql.Driver") \
#         .load()
#     try:
#         df.head()
#     except Exception as e:
#         print(e)

# read_ratings_from_postgre(spark_postgre)

### Query data from Postgre

In [8]:
# query with limit 20
@timeit
def query_from_postgre(spark_postgre, n_rows):
    df = spark_postgre.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", f"select * from ratings_temp limit {n_rows}") \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df
        
query_from_postgre(spark_postgre, 20)

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 103718|     923|   2.5|1142615606|
| 103718|     924|   4.0|1142617549|
| 103718|     942|   4.0|1142618811|
| 103718|     948|   4.0|1142619581|
| 103718|     953|   4.0|1142616074|
| 103718|     968|   4.0|1142620815|
| 103718|     971|   3.5|1142618549|
| 103718|    1012|   3.0|1142620685|
| 103718|    1028|   5.0|1142619696|
| 103718|    1035|   5.0|1142620501|
| 103718|    1036|   2.5|1142621824|
| 103718|    1079|   4.5|1142615841|
| 103718|    1084|   4.0|1142615765|
| 103718|    1094|   3.5|1142613004|
| 103718|    1096|   4.5|1142619119|
| 103718|    1097|   4.0|1142616475|
| 103718|    1104|   4.0|1142616283|
| 103718|    1120|   3.5|1142617744|
| 103718|    1127|   2.0|1142622069|
| 103718|    1136|   5.0|1142616019|
+-------+--------+------+----------+

query_from_postgre took: 0.3027932643890381 sec


DataFrame[user_id: int, movie_id: int, rating: decimal(2,1), timestamp: bigint]

In [9]:
df = query_from_postgre(spark_postgre, 1000000)

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 103718|     923|   2.5|1142615606|
| 103718|     924|   4.0|1142617549|
| 103718|     942|   4.0|1142618811|
| 103718|     948|   4.0|1142619581|
| 103718|     953|   4.0|1142616074|
| 103718|     968|   4.0|1142620815|
| 103718|     971|   3.5|1142618549|
| 103718|    1012|   3.0|1142620685|
| 103718|    1028|   5.0|1142619696|
| 103718|    1035|   5.0|1142620501|
| 103718|    1036|   2.5|1142621824|
| 103718|    1079|   4.5|1142615841|
| 103718|    1084|   4.0|1142615765|
| 103718|    1094|   3.5|1142613004|
| 103718|    1096|   4.5|1142619119|
| 103718|    1097|   4.0|1142616475|
| 103718|    1104|   4.0|1142616283|
| 103718|    1120|   3.5|1142617744|
| 103718|    1127|   2.0|1142622069|
| 103718|    1136|   5.0|1142616019|
+-------+--------+------+----------+
only showing top 20 rows

query_from_postgre took: 1.1580145359039307 sec


In [10]:
df = query_from_postgre(spark_postgre, 7000000)
# df.orderBy("user_id", ascending=False).show()

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 109659|     410|   3.0| 846332765|
| 109659|     435|   3.0| 846332887|
| 109659|     454|   3.0| 846332728|
| 109659|     457|   2.0| 846332649|
| 109659|     480|   3.0| 846332675|
| 109659|     485|   3.0| 846333196|
| 109659|     508|   3.0| 846333026|
| 109659|     515|   3.0| 846333196|
| 109659|     529|   3.0| 846333386|
| 109659|     551|   3.0| 846333080|
| 109659|     587|   3.0| 846332801|
| 109659|     589|   3.0| 846332728|
| 109659|     590|   3.0| 846332579|
| 109659|     592|   3.0| 846332579|
| 109659|     594|   3.0| 846333196|
| 109659|     597|   3.0| 846332800|
| 109660|     348|   3.0|1209416017|
| 109660|     910|   4.0|1209415983|
| 109660|    1244|   4.0|1209416086|
| 109660|    1285|   3.5|1209415996|
+-------+--------+------+----------+
only showing top 20 rows

query_from_postgre took: 12.543602466583252 sec


### Query by User ID

In [11]:
@timeit
def get_ratings_by_user(spark, user_id):
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", f"select * from ratings where user_id={user_id}") \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df

df = get_ratings_by_user(spark_postgre, 1)
df.count()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|      17|   4.0|944249077|
|      1|      25|   1.0|944250228|
|      1|      29|   2.0|943230976|
|      1|      30|   5.0|944249077|
|      1|      32|   5.0|943228858|
|      1|      34|   2.0|943228491|
|      1|      36|   1.0|944249008|
|      1|      80|   5.0|944248943|
|      1|     110|   3.0|943231119|
|      1|     111|   5.0|944249008|
|      1|     161|   1.0|943231162|
|      1|     166|   5.0|943228442|
|      1|     176|   4.0|944079496|
|      1|     223|   3.0|944082810|
|      1|     232|   5.0|943228442|
|      1|     260|   5.0|943228696|
|      1|     302|   4.0|944253272|
|      1|     306|   5.0|944248888|
|      1|     307|   5.0|944253207|
|      1|     322|   4.0|944053801|
+-------+--------+------+---------+
only showing top 20 rows

get_ratings_by_user took: 0.27585935592651367 sec


141

In [12]:
@timeit
def get_ratings_by_movies(spark, movie_id):
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", f"select * from ratings where movie_id={movie_id}") \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df

df = get_ratings_by_movies(spark_postgre, 1)
# df.sort("user_id").show()

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
|  97327|       1|   4.0|1094386730|
|  97328|       1|   4.0| 849355256|
|  97333|       1|   4.5|1424401700|
|  97341|       1|   3.5|1150877485|
|  97343|       1|   5.0|1107395297|
|  97344|       1|   3.0| 847016342|
|  97348|       1|   3.0|1204254666|
|  97352|       1|   4.0| 993066661|
|  97353|       1|   5.0|1162844477|
|  97354|       1|   4.5|1056059668|
|  97355|       1|   5.0|1596879789|
|  97361|       1|   2.0| 938909633|
|  97362|       1|   3.0| 860668956|
|  97367|       1|   5.0|1038922871|
|  97369|       1|   4.0| 861311425|
|  97370|       1|   3.5|1112317841|
|  97372|       1|   3.5|1113443208|
|  97377|       1|   3.0|1135736368|
|  97379|       1|   5.0|1625017177|
|  97383|       1|   4.0| 973553197|
+-------+--------+------+----------+
only showing top 20 rows

get_ratings_by_movies took: 1.957317590713501 sec


In [13]:
@timeit
def get_ratings_by_movies(spark, movie_ids):
    query = "select * from ratings where "
    for movie_id in movie_ids:
        query += f"movie_id = {movie_id} or "
    query = query[:-4]
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", query) \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df

df = get_ratings_by_movies(spark_postgre, [1, 2])

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
|  97327|       1|   4.0|1094386730|
|  97328|       1|   4.0| 849355256|
|  97329|       2|   2.0| 844797324|
|  97333|       1|   4.5|1424401700|
|  97337|       2|   3.0| 844897034|
|  97341|       1|   3.5|1150877485|
|  97343|       1|   5.0|1107395297|
|  97344|       1|   3.0| 847016342|
|  97344|       2|   2.0| 847016692|
|  97348|       1|   3.0|1204254666|
|  97348|       2|   1.0|1204255277|
|  97350|       2|   4.0| 836825763|
|  97352|       1|   4.0| 993066661|
|  97353|       1|   5.0|1162844477|
|  97353|       2|   3.0|1162844374|
|  97354|       1|   4.5|1056059668|
|  97354|       2|   4.0|1057766202|
|  97355|       1|   5.0|1596879789|
|  97361|       1|   2.0| 938909633|
|  97361|       2|   1.0| 938908216|
+-------+--------+------+----------+
only showing top 20 rows

get_ratings_by_movies took: 1.9949018955230713 sec


In [14]:
@timeit
def get_ratings_by_user_and_movie(spark, user_id, movie_id):
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", f"select * from ratings where user_id={user_id} and movie_id={movie_id}") \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df

df = get_ratings_by_user_and_movie(spark_postgre, 168198, 99999)

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 168198|   99999|   3.0|1689394313|
+-------+--------+------+----------+

get_ratings_by_user_and_movie took: 0.22433042526245117 sec


```sql
WITH recent_rating AS (
    SELECT 
        r.user_id, 
        r.movie_id, 
        r.rating, 
        r.timestamp
    FROM 
        clickhouse.movie_lens.ratings r
    WHERE 
        r.timestamp >= (
            SELECT MAX(timestamp) - INTERVAL '1 year' 
            FROM clickhouse.movie_lens.ratings
        )
),
user_activity AS (
    SELECT 
        user_id, 
        COUNT(*) AS total_rated_movie
    FROM 
        recent_rating
    GROUP BY 
        user_id
)
SELECT 
    user_id 
FROM 
    user_activity 
ORDER BY 
    total_rated_movie DESC 
LIMIT 10;
```

In [15]:
@timeit
def get_top_avg_recent_ratings(spark):
    query = "with recent_rating as (select r.user_id, r.movie_id, r.rating from ratings r where to_timestamp(r.timestamp) >= (select max(to_timestamp(timestamp)) - interval '1 year' from ratings)), user_activity as (select user_id, count(*) as total_rated_movie from recent_rating group by user_id) select user_id from user_activity order by total_rated_movie desc limit 10"
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", query) \
        .load()
    try:
        df.show()
    except Exception as e:
        print(e)
    return df

df = get_top_avg_recent_ratings(spark_postgre)

+-------+
|user_id|
+-------+
| 103013|
| 108412|
|  87324|
| 161180|
|   1668|
|  98875|
| 159164|
|  22270|
|  57389|
|  43302|
+-------+

get_top_avg_recent_ratings took: 4.132871389389038 sec


In [16]:
@timeit
def get_top_avg_recent_ratings(spark):
    query = "with recent_rating as (select r.user_id, r.movie_id, r.rating from ratings r where to_timestamp(r.timestamp) >= (select max(to_timestamp(timestamp)) - interval '1 year' from ratings)), user_activity as (select user_id, count(*) as total_rated_movie from recent_rating group by user_id) select user_id from user_activity order by total_rated_movie desc"
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("user", "postgres") \
        .option("password", "Baotram2004") \
        .option("driver", "org.postgresql.Driver") \
        .option("query", query) \
        .load()
    try:
        df.show(20)
    except Exception as e:
        print(e)
    return df

df = get_top_avg_recent_ratings(spark_postgre)
df.count()

+-------+
|user_id|
+-------+
| 103013|
| 108412|
|  87324|
| 161180|
|   1668|
|  98875|
| 159164|
|  22270|
|  57389|
|  43302|
| 100887|
|  65925|
| 187665|
| 178868|
| 108029|
| 168198|
| 134520|
|  99084|
| 181856|
| 177834|
+-------+
only showing top 20 rows

get_top_avg_recent_ratings took: 3.9204039573669434 sec


10086

## Use Apache Spark for parallel data processing

### Initial a Spark Session

In [17]:
# spark = SparkSession.builder \
#     .appName("Big-Data-Demo") \
#     .master("local[*]") \
#     .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
#     .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/")  \
#     .getOrCreate() \
    
spark = SparkSession.builder \
    .appName("Big-Data-Demo") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/") \
    .getOrCreate() \

print(f"----------spark version: {spark.version}------------")

----------spark version: 3.5.3------------


### Load data from MongoDB

#### Load all data from a collection

In [18]:
# load all data from MongoDB
df_ratings = spark.read.format("mongo")\
    .option("uri", "mongodb://127.0.0.1:27017/")\
    .option("database", "test")\
    .option("collection", "ratings")\
    .load()    

df_movies = spark.read.format("mongo")\
    .option("uri", "mongodb://127.0.0.1:27017/test.movies")\
    .option("database", "test")\
    .option("collection", "movies")\
    .load()   

# Show the loaded data
print(f"Original Rating Data:")
print(df_ratings.show())
print(f"Original Movie Data:")
print( df_movies.show())

Py4JJavaError: An error occurred while calling o120.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


#### Make query to the database

In [None]:
# sol 1: failed, maybe because of the version of the spark
query = '{"rating": {"$gt": 4}}'
print(f"Query sent to MongoDB: {query}")

# Load filtered data
df_ratings_filtered = spark.read.format("mongo")\
    .option("uri", "mongodb://127.0.0.1:27017/")\
    .option("database", "test")\
    .option("collection", "ratings")\
    .option("query", query)\
    .load()

# Show filtered data
print("Filtered Data (movies with rating > 4):")
df_ratings_filtered.show()
query = '{"rating": {"$gt": 4}}'

Query sent to MongoDB: {"rating": {"$gt": 4}}
Filtered Data (movies with rating > 4):
+--------------------+-------+------+---------+------+
|                 _id|movieId|rating|timestamp|userId|
+--------------------+-------+------+---------+------+
|{6743f02df2b2062a...|      1|   4.0|964982703|     1|
|{6743f02df2b2062a...|      3|   4.0|964981247|     1|
|{6743f02df2b2062a...|      6|   4.0|964982224|     1|
|{6743f02df2b2062a...|     47|   5.0|964983815|     1|
|{6743f02df2b2062a...|     50|   5.0|964982931|     1|
|{6743f02df2b2062a...|     70|   3.0|964982400|     1|
|{6743f02df2b2062a...|    101|   5.0|964980868|     1|
|{6743f02df2b2062a...|    110|   4.0|964982176|     1|
|{6743f02df2b2062a...|    151|   5.0|964984041|     1|
|{6743f02df2b2062a...|    157|   5.0|964984100|     1|
|{6743f02df2b2062a...|    163|   5.0|964983650|     1|
|{6743f02df2b2062a...|    216|   5.0|964981208|     1|
|{6743f02df2b2062a...|    223|   3.0|964980985|     1|
|{6743f02df2b2062a...|    231|   5

In [None]:
# set format as "mongo" instead of "mongodb", set uri
start_time = time.time()
df_ratings_filtered = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1:27017/") \
                 .option("database", "test") \
                 .option("collection", "ratings") \
                 .load()

df_ratings_filtered.filter(df_ratings_filtered['rating'] > 4)
end_time = time.time()
df_ratings_filtered.head()
print(f"Execution time: {end_time - start_time} seconds")

Execution time: 0.21632814407348633 seconds


In [None]:
# sol 2: return cursors points to rows in the collection
start_time = time.time()
client = MongoClient('mongodb://localhost:27017/')
db = client["test"]
collection = db["ratings"]

query = {"rating": {"$gt": 4}}  # Find documents where age > 25
cursors = collection.find(query)

# for doc in results:
#     print(doc)

stop_time1 = time.time()

filtered_df = pd.DataFrame(list(cursors))
stop_time = time.time()
print(filtered_df.head())
print(f"Time taken to fetch data from MongoDB: {stop_time1 - start_time}")
print(f"Total time taken: {stop_time - start_time}")
# cursors = list(cursors)
# print(cursors[0]['userId', 'movieId', 'rating', 'timestamp'])

# filtered_df = spark.createDataFrame(list(cursors))
# filtered_df.show()

                        _id  userId  movieId  rating  timestamp
0  6743f02df2b2062a22ecadae       1       47     5.0  964983815
1  6743f02df2b2062a22ecadaf       1       50     5.0  964982931
2  6743f02df2b2062a22ecadb1       1      101     5.0  964980868
3  6743f02df2b2062a22ecadb3       1      151     5.0  964984041
4  6743f02df2b2062a22ecadb4       1      157     5.0  964984100
Time taken to fetch data from MongoDB: 0.0040130615234375
Total time taken: 0.9110825061798096


In [None]:
# sol 3: get the dataframe, then filter it using pandas => worse than using MongoDB query
start_time = time.time()
df_ratings = spark.read.format("mongo")\
    .option("uri", "mongodb://127.0.0.1:27017/test.ratings")\
    .load()

df_ratings_pd = df_ratings.toPandas()
df_ratings_pd = df_ratings_pd[df_ratings_pd['rating'] > 4]

df_ratings_pd.head()
stop_time = time.time()
print(f"Time taken: {stop_time - start_time}")

Time taken: 19.08342432975769


### Spark's dataframe operations

In [None]:
# df_movies.show()
# df_ratings.show()

joined_df = df_movies.join(df_ratings, on="movieId", how="right")
print(f"movies_ratings_df:")
# print(joined_df.columns)
# print(joined_df.select("timestamp").show())
# print(type(joined_df))
joined_df.show()

movies_ratings_df:
+-------+--------------------+--------------------+--------------------+--------------------+------+---------+------+
|movieId|                 _id|              genres|               title|                 _id|rating|timestamp|userId|
+-------+--------------------+--------------------+--------------------+--------------------+------+---------+------+
|      1|{6743f02df2b2062a...|Adventure|Animati...|    Toy Story (1995)|{6743f02df2b2062a...|   4.0|964982703|     1|
|      1|{674450be3c85934c...|Adventure|Animati...|    Toy Story (1995)|{6743f02df2b2062a...|   4.0|964982703|     1|
|      1|{67450b9460d4f003...|Adventure|Animati...|    Toy Story (1995)|{6743f02df2b2062a...|   4.0|964982703|     1|
|      1|{6745127b01820ef6...|Adventure|Animati...|    Toy Story (1995)|{6743f02df2b2062a...|   4.0|964982703|     1|
|      3|{6743f02df2b2062a...|      Comedy|Romance|Grumpier Old Men ...|{6743f02df2b2062a...|   4.0|964981247|     1|
|      3|{674450be3c85934c...|      C

### Create a new column named "Year"

In [None]:
from pyspark.sql.functions import substring
from pyspark.sql.functions import col

joined_df = joined_df.withColumn("year", substring("title", -5, 4))
joined_df = joined_df.filter(col("year").rlike("^[12]"))
joined_df = joined_df.withColumn("year", col("year").cast("int"))

joined_df.show()
# joined_df.select("year").distinct().show()

+-------+--------------------+--------------------+--------------------+--------------------+------+----------+------+----+
|movieId|                 _id|              genres|               title|                 _id|rating| timestamp|userId|year|
+-------+--------------------+--------------------+--------------------+--------------------+------+----------+------+----+
|   1580|{6745127b01820ef6...|Action|Comedy|Sci-Fi|Men in Black (a.k...|{6743f02df2b2062a...|   3.0| 964981125|     1|1997|
|   1580|{67450b9460d4f003...|Action|Comedy|Sci-Fi|Men in Black (a.k...|{6743f02df2b2062a...|   3.0| 964981125|     1|1997|
|   1580|{674450be3c85934c...|Action|Comedy|Sci-Fi|Men in Black (a.k...|{6743f02df2b2062a...|   3.0| 964981125|     1|1997|
|   1580|{6743f02df2b2062a...|Action|Comedy|Sci-Fi|Men in Black (a.k...|{6743f02df2b2062a...|   3.0| 964981125|     1|1997|
|   2366|{6745127b01820ef6...|Action|Adventure|...|    King Kong (1933)|{6743f02df2b2062a...|   4.0| 964982462|     1|1933|
|   2366

## Train a movie recommender model

### Linear Regression Model

In [None]:
# recommender = RecommendationEngine(joined_df)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
feature_assembler = VectorAssembler(inputCols=["userId", "movieId"], outputCol="features")
assembled_df = feature_assembler.transform(joined_df)
(training_data, test_data) = assembled_df.randomSplit([0.8, 0.2])

model = LinearRegression(featuresCol="features", labelCol="rating", predictionCol="prediction")
lr_model = model.fit(training_data)

In [None]:
# make predictions
predictions = lr_model.transform(test_data)

# show predictions
predictions.select("features", "rating", "prediction").show()

# evaluate the model using R2 and RMSE
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R2: {r2}")

+------------+------+------------------+
|    features|rating|        prediction|
+------------+------+------------------+
| [19.0,12.0]|   1.0|3.5889894264465667|
| [44.0,12.0]|   1.0|  3.58205064574231|
|[276.0,12.0]|   4.0|3.5176587608068086|
|[571.0,12.0]|   1.0|3.4357811484965803|
| [44.0,12.0]|   1.0|  3.58205064574231|
|[151.0,12.0]|   3.0|3.5523526643280916|
|[276.0,12.0]|   4.0|3.5176587608068086|
|[524.0,12.0]|   1.0| 3.448826056220583|
| [19.0,12.0]|   1.0|3.5889894264465667|
|[217.0,12.0]|   3.0|3.5340342832688543|
|[274.0,12.0]|   3.5| 3.518213863263149|
|[524.0,12.0]|   1.0| 3.448826056220583|
|[599.0,12.0]|   1.5| 3.428009714107813|
|[276.0,12.0]|   4.0|3.5176587608068086|
|[288.0,12.0]|   2.0|3.5143281460687654|
|[571.0,12.0]|   1.0|3.4357811484965803|
|[217.0,12.0]|   3.0|3.5340342832688543|
|[288.0,12.0]|   2.0|3.5143281460687654|
|[294.0,12.0]|   1.0| 3.512662838699744|
|[350.0,12.0]|   3.0| 3.497119969922209|
+------------+------+------------------+
only showing top

### ALS

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

ratings = joined_df.select("userId", "movieId", "rating")
(training, test) = ratings.randomSplit([0.8, 0.2])

# set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

# top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Root-mean-square error = 0.546270888146419


In [None]:
print(userRecs.show())

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3925, 6.7853904...|
|     2|[{74754, 7.789781...|
|     3|[{48322, 6.951864...|
|     4|[{34338, 7.271497...|
|     5|[{5034, 6.932786}...|
|     6|[{971, 7.036011},...|
|     7|[{4678, 9.650388}...|
|     8|[{5055, 7.2378206...|
|     9|[{106100, 7.42635...|
|    10|[{2693, 8.148736}...|
|    11|[{3200, 6.935401}...|
|    12|[{26258, 8.614412...|
|    13|[{2148, 7.473914}...|
|    14|[{2906, 11.770087...|
|    15|[{1734, 7.8901587...|
|    16|[{3925, 5.546985}...|
|    17|[{1658, 6.442111}...|
|    18|[{3200, 5.1955605...|
|    19|[{5666, 5.1309776...|
|    20|[{179819, 7.15204...|
+------+--------------------+
only showing top 20 rows

None


In [None]:
userSubsetRecs.show()
users.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   463|[{79224, 7.039547...|
|   496|[{34338, 9.542832...|
|   148|[{4450, 7.3070965...|
+------+--------------------+

+------+
|userId|
+------+
|   463|
|   496|
|   148|
+------+



In [None]:
movieSubSetRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[{569, 5.2485666}...|
|   3175|[{258, 6.324589},...|
|   2366|[{494, 7.989951},...|
+-------+--------------------+

