## Cassandra Keyspace and Table setup

The Cassandra keyspace and tables were pre-created using the following commands. This is to ensure a smooth write-read pipeline between Spark and Cassandra.

CREATE TABLE movielens.users (
    user_id int PRIMARY KEY,
    age int,
    gender text,
    occupation text,
    zip_code text
)

CREATE TABLE movielens.movies (
    movie_id int PRIMARY KEY,
    imdb_url text,
    release_date text,
    title text,
    video_release_date text
)

CREATE TABLE movielens.ratings (
    movie_id int,
    user_id int,
    rating int,
    timestamp text,
    PRIMARY KEY (movie_id, user_id)
)

CREATE TABLE movielens.users_under_20 (
    user_id int PRIMARY KEY,
    age int,
    gender text,
    occupation text,
    zip_code text
)

CREATE TABLE movielens.scientists_30_40 (
    user_id int PRIMARY KEY,
    age int,
    gender text,
    occupation text,
    zip_code text
)

## Environment setup
Setting up the required libraries, Spark session and environment paths. This is to ensure that Spark can interact with Cassandra correctly on the local instance.

In [4]:
import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, desc, col, count
from pyspark.sql.functions import col, array, lit, when, expr
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from cassandra.cluster import Cluster

In [5]:
import os

os.environ["PYSPARK_PYTHON"] = "/opt/anaconda3/envs/sparkenv38/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/opt/anaconda3/envs/sparkenv38/bin/python"

## Start Spark Session

This is to initialize the SparkSession and connect it to the local Cassandra instance. This is essential to enable reading/writing data into the Cassandra database.

In [7]:
spark = SparkSession.builder \
    .appName("MovieLensAnalysis") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()

25/07/21 16:18:09 WARN Utils: Your hostname, Pavethras-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.104 instead (on interface en0)
25/07/21 16:18:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/spark-3.4.1/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pavethraavadiar/.ivy2/cache
The jars for the packages stored in: /Users/pavethraavadiar/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c61da60a-9d1d-400b-bb28-04362dd9dcb3;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reac

In [8]:
print("Driver Python:", sys.version)
print("PYSPARK_PYTHON:", os.environ.get("PYSPARK_PYTHON"))
print("PYSPARK_DRIVER_PYTHON:", os.environ.get("PYSPARK_DRIVER_PYTHON"))

Driver Python: 3.8.20 (default, Oct  3 2024, 10:22:23) 
[Clang 14.0.6 ]
PYSPARK_PYTHON: /opt/anaconda3/envs/sparkenv38/bin/python
PYSPARK_DRIVER_PYTHON: /opt/anaconda3/envs/sparkenv38/bin/python


## Loading and Processing User Data ('u.user' file)

This step is to parse the pipe-separated user file and convert the RDD to a DataFrame with appropriate column names and data types. Renaming "zip" to "zip_code" improves clarity and schema compatibility. This data will later be written to Cassandra for persistence. 

In [10]:
def load_users(file_path):
    users_rdd = spark.sparkContext.textFile(file_path)
    users_parsed = users_rdd.map(lambda x: x.split("|"))
    users_df = users_parsed.map(lambda u: (int(u[0]), int(u[1]), u[2], u[3], u[4])) \
        .toDF(["user_id", "age", "gender", "occupation", "zip"])
    return users_df

In [11]:
users_df = load_users("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.user")
users_df.show()

+-------+---+------+-------------+-----+
|user_id|age|gender|   occupation|  zip|
+-------+---+------+-------------+-----+
|      1| 24|     M|   technician|85711|
|      2| 53|     F|        other|94043|
|      3| 23|     M|       writer|32067|
|      4| 24|     M|   technician|43537|
|      5| 33|     F|        other|15213|
|      6| 42|     M|    executive|98101|
|      7| 57|     M|administrator|91344|
|      8| 36|     M|administrator|05201|
|      9| 29|     M|      student|01002|
|     10| 53|     M|       lawyer|90703|
|     11| 39|     F|        other|30329|
|     12| 28|     F|        other|06405|
|     13| 47|     M|     educator|29206|
|     14| 45|     M|    scientist|55106|
|     15| 49|     F|     educator|97301|
|     16| 21|     M|entertainment|10309|
|     17| 30|     M|   programmer|06355|
|     18| 35|     F|        other|37212|
|     19| 40|     M|    librarian|02138|
|     20| 42|     F|    homemaker|95660|
+-------+---+------+-------------+-----+
only showing top

In [12]:
def load_users(file_path):
    users_rdd = spark.sparkContext.textFile(file_path)
    users_parsed = users_rdd.map(lambda x: x.split("|"))
    users_df = users_parsed.map(lambda u: (int(u[0]), int(u[1]), u[2], u[3], u[4])) \
        .toDF(["user_id", "age", "gender", "occupation", "zip"])
    return users_df

users_df = load_users("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.user")
users_df = users_df.withColumnRenamed("zip", "zip_code")
users_df.show()

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|      1| 24|     M|   technician|   85711|
|      2| 53|     F|        other|   94043|
|      3| 23|     M|       writer|   32067|
|      4| 24|     M|   technician|   43537|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|      9| 29|     M|      student|   01002|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     12| 28|     F|        other|   06405|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|   97301|
|     16| 21|     M|entertainment|   10309|
|     17| 30|     M|   programmer|   06355|
|     18| 35|     F|        other|   37212|
|     19| 40|     M|    librarian|   02138|
|     20| 42|     F|    homemake

## Writing users data into Cassandra

In the next step, the processed users DataFrame was persisted into the Cassandra keyspace 'movielens', under the 'users' table. This is to save clean data for reuse purpose.

In [14]:
users_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="users", keyspace="movielens") \
    .save()

In [15]:
users_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



## Load and Process Movies data ('u.item')

In the next step, only the first 5 columns of the movie metadata, which are the most relevant for basic analysis are loaded. This is to focus on the essential features like movie_id, title, and release_date. Additional genre information is also included for extended analysis.

In [17]:
# Define schema
movie_schema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("video_release_date", StringType(), True),
    StructField("imdb_url", StringType(), True)
])

# Load data (only first 5 columns from u.item)
movies_rdd = spark.sparkContext.textFile("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.item")
movies_parsed = movies_rdd.map(lambda x: x.split("|")).map(lambda fields: (
    int(fields[0]),
    fields[1],
    fields[2],
    fields[3],
    fields[4]
))

movies_df = spark.createDataFrame(movies_parsed, schema=movie_schema)
movies_df.show(5)

+--------+-----------------+------------+------------------+--------------------+
|movie_id|            title|release_date|video_release_date|            imdb_url|
+--------+-----------------+------------+------------------+--------------------+
|       1| Toy Story (1995)| 01-Jan-1995|                  |http://us.imdb.co...|
|       2| GoldenEye (1995)| 01-Jan-1995|                  |http://us.imdb.co...|
|       3|Four Rooms (1995)| 01-Jan-1995|                  |http://us.imdb.co...|
|       4|Get Shorty (1995)| 01-Jan-1995|                  |http://us.imdb.co...|
|       5|   Copycat (1995)| 01-Jan-1995|                  |http://us.imdb.co...|
+--------+-----------------+------------+------------------+--------------------+
only showing top 5 rows



In [18]:
movies_df.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- video_release_date: string (nullable = true)
 |-- imdb_url: string (nullable = true)



## Writing movies data into Cassandra
Saving the processed movies DataFrame into the 'movielens' keyspace under the 'movies' table in Cassandra.

In [20]:
movies_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="movies", keyspace="movielens") \
    .save()

## Confirm persistence by reloading:

In [22]:
movies_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="movies", keyspace="movielens") \
    .load()

movies_df.createOrReplaceTempView("movies")

## Loading and Processing Ratings data ('u.data')
Importing the ratings data, defining the scheme, and exclusing the timestamp as it's not needed for this analysis.

In [24]:
ratings_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", StringType(), True)
])

Next, the rating data is loaded and the timestamp field is discarded, because this field is often not needed for average rating analysis. 

In [26]:
ratings_df = spark.read \
    .option("delimiter", "\t") \
    .schema(ratings_schema) \
    .csv("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.data")

In [27]:
ratings_df = ratings_df.drop("timestamp")

In [28]:
ratings_df.printSchema()
ratings_df.show(5)

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|    196|     242|     3|
|    186|     302|     3|
|     22|     377|     1|
|    244|      51|     2|
|    166|     346|     1|
+-------+--------+------+
only showing top 5 rows



## Writing ratings data into Cassandra

In the following code, the ratings DataFrame is persisted into Cassandra keyspace 'movielens' under the 'ratings' table for effcient querying. 

In [30]:
ratings_df.select("user_id", "movie_id", "rating") \
    .write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="ratings", keyspace="movielens") \
    .save()

                                                                                

## Verifying Ratings data from Cassandra

Reading the ratings table back from Cassandra to confirm successful data persistence.

In [32]:
ratings_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="ratings", keyspace="movielens") \
    .load()

ratings_df.createOrReplaceTempView("ratings")
ratings_df.show(5)

+--------+-------+------+---------+
|movie_id|user_id|rating|timestamp|
+--------+-------+------+---------+
|    1657|    727|     3|     null|
|    1443|    225|     4|     null|
|    1443|    254|     4|     null|
|    1443|    429|     2|     null|
|    1443|    551|     5|     null|
+--------+-------+------+---------+
only showing top 5 rows



In [33]:
spark.sql("""
    SELECT movie_id, ROUND(AVG(rating), 2) AS avg_rating
    FROM ratings
    GROUP BY movie_id
    ORDER BY movie_id
    LIMIT 10
""").show()

+--------+----------+
|movie_id|avg_rating|
+--------+----------+
|       1|      3.88|
|       2|      3.21|
|       3|      3.03|
|       4|      3.55|
|       5|       3.3|
|       6|      3.58|
|       7|       3.8|
|       8|       4.0|
|       9|       3.9|
|      10|      3.83|
+--------+----------+



## Question i) Calculate the average rating for each movie.

This query joins ratings with movie titles and calculates the average rating grouped by movie_id. It then orders the result to display top-rated movies. This helps identify general user preferences. This is done to understand user rating behavior.

In [35]:
spark.sql("""
    SELECT 
        r.movie_id, 
        m.title, 
        ROUND(AVG(r.rating), 2) AS avg_rating
    FROM ratings r
    JOIN movies m ON r.movie_id = m.movie_id
    GROUP BY r.movie_id, m.title
    ORDER BY r.movie_id
    LIMIT 10
""").show(truncate=False)


+--------+----------------------------------------------------+----------+
|movie_id|title                                               |avg_rating|
+--------+----------------------------------------------------+----------+
|1       |Toy Story (1995)                                    |3.88      |
|2       |GoldenEye (1995)                                    |3.21      |
|3       |Four Rooms (1995)                                   |3.03      |
|4       |Get Shorty (1995)                                   |3.55      |
|5       |Copycat (1995)                                      |3.3       |
|6       |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|3.58      |
|7       |Twelve Monkeys (1995)                               |3.8       |
|8       |Babe (1995)                                         |4.0       |
|9       |Dead Man Walking (1995)                             |3.9       |
|10      |Richard III (1995)                                  |3.83      |
+--------+---------------

## Question ii) Identify the top ten movies with the highest average ratings.

This query joins movie info with ratings to show the top 10 highest-rated movies. This is to gather insights on user preferences.

In [37]:
spark.sql("""
    SELECT 
        m.movie_id, 
        m.title, 
        ROUND(AVG(r.rating), 2) AS avg_rating,
        COUNT(r.rating) AS num_ratings
    FROM ratings r
    JOIN movies m ON r.movie_id = m.movie_id
    GROUP BY m.movie_id, m.title
    ORDER BY avg_rating DESC, num_ratings DESC
    LIMIT 10
""").show(truncate=False)


+--------+-------------------------------------------------+----------+-----------+
|movie_id|title                                            |avg_rating|num_ratings|
+--------+-------------------------------------------------+----------+-----------+
|1293    |Star Kid (1997)                                  |5.0       |3          |
|1189    |Prefontaine (1997)                               |5.0       |3          |
|1500    |Santa with Muscles (1996)                        |5.0       |2          |
|1467    |Saint of Fort Washington, The (1993)             |5.0       |2          |
|1536    |Aiqing wansui (1994)                             |5.0       |1          |
|1122    |They Made Me a Criminal (1939)                   |5.0       |1          |
|1653    |Entertaining Angels: The Dorothy Day Story (1996)|5.0       |1          |
|814     |Great Day in Harlem, A (1994)                    |5.0       |1          |
|1201    |Marlene Dietrich: Shadow and Light (1996)        |5.0       |1    

In [38]:
spark.sql("""
    SELECT 
        m.movie_id,
        m.title,
        ROUND(AVG(r.rating), 2) AS avg_rating
    FROM ratings r
    JOIN movies m ON r.movie_id = m.movie_id
    GROUP BY m.movie_id, m.title
    ORDER BY avg_rating DESC, m.title ASC
    LIMIT 10
""").show(truncate=False)

+--------+-------------------------------------------------+----------+
|movie_id|title                                            |avg_rating|
+--------+-------------------------------------------------+----------+
|1536    |Aiqing wansui (1994)                             |5.0       |
|1653    |Entertaining Angels: The Dorothy Day Story (1996)|5.0       |
|814     |Great Day in Harlem, A (1994)                    |5.0       |
|1201    |Marlene Dietrich: Shadow and Light (1996)        |5.0       |
|1189    |Prefontaine (1997)                               |5.0       |
|1467    |Saint of Fort Washington, The (1993)             |5.0       |
|1500    |Santa with Muscles (1996)                        |5.0       |
|1599    |Someone Else's America (1995)                    |5.0       |
|1293    |Star Kid (1997)                                  |5.0       |
|1122    |They Made Me a Criminal (1939)                   |5.0       |
+--------+-------------------------------------------------+----

                                                                                

## Question iii) Find the users who have rated at least 50 movies and identify their favourite movie genres.

The goal is to identify users who have rated at least 50 movies and determine their favorite genre based on frequency of genres they rated. This helps to understand user preferences based on their engagement with different movie genres.


In the next step, comprehensive schema including all 19 genres is created. This enables multi-label genre analysis.

In [41]:
genres = [
    "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", 
    "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", 
    "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

In [42]:
# Start Spark
spark = SparkSession.builder \
    .appName("Load u.item with genres") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .getOrCreate()

# Define schema
genre_fields = [StructField(g.lower().replace("-", "_"), IntegerType(), True) for g in genres]
item_schema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("video_release_date", StringType(), True),
    StructField("imdb_url", StringType(), True),
] + genre_fields)

# Load the file
movies_with_genres_df = spark.read \
    .option("delimiter", "|") \
    .schema(item_schema) \
    .csv("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.item", encoding="ISO-8859-1")

movies_with_genres_df.show(5, truncate=False)

+--------+-----------------+------------+------------------+------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|title            |release_date|video_release_date|imdb_url                                              |unknown|action|adventure|animation|children|comedy|crime|documentary|drama|fantasy|film_noir|horror|musical|mystery|romance|sci_fi|thriller|war|western|
+--------+-----------------+------------+------------------+------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|1       |Toy Story (1995) |01-Jan-1995 |null              |http://us.imdb.com/M/title-exact?Toy%20Story%20(1995) |0      |0     |0        |1        |1       |1     |0    |0          |0    |0     

25/07/21 16:18:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [43]:
movies_df.printSchema()
movies_df.show(5, truncate=False)

root
 |-- movie_id: integer (nullable = false)
 |-- imdb_url: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video_release_date: string (nullable = true)

+--------+-----------------------------------------------------+------------+-----------------------------+------------------+
|movie_id|imdb_url                                             |release_date|title                        |video_release_date|
+--------+-----------------------------------------------------+------------+-----------------------------+------------------+
|1657    |http://us.imdb.com/M/title-exact?Target%20(1995)     |28-Feb-1996 |Target (1995)                |                  |
|1443    |http://us.imdb.com/M/title-exact?8%20Seconds%20(1994)|01-Jan-1994 |8 Seconds (1994)             |                  |
|1062    |http://us.imdb.com/M/title-exact?imdb-title-119815   |23-Jan-1998 |Four Days in September (1997)|                  |
|1090    |http://us

In [44]:
movies_df.columns

['movie_id', 'imdb_url', 'release_date', 'title', 'video_release_date']

In [45]:
genre_columns = [
    'unknown', 'Action', 'Adventure', 'Animation', "Children's", 'Comedy',
    'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror',
    'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western'
]

item_schema = ['movie_id', 'movie_title', 'release_date', 'video_release_date', 'imdb_url'] + genre_columns

In [46]:
movies_df = spark.read \
    .option("delimiter", "|") \
    .csv("/Users/pavethraavadiar/Documents/Masters - Sem 2/Data management/project 3/ml-100k/u.item") \
    .toDF(*item_schema)

In the next step, an array column genres_array is built to store genre labels. Then it is converted to a comma-separated string. This transformation simplifies filtering and aggregation by genre later.

In [48]:
# Create array of genre names where the flag is 1
genre_exprs = [when(col(g) == 1, lit(g)).otherwise(lit(None)) for g in genre_columns]
movies_df = movies_df.withColumn("genres_array", array(*genre_exprs))

# Flatten genres_array to a comma-separated string (optional)
from pyspark.sql.functions import concat_ws
movies_df = movies_df.withColumn("genres", concat_ws(",", col("genres_array")))


In [49]:
# Register temp views
ratings_df.createOrReplaceTempView("ratings")
movies_df.createOrReplaceTempView("movies")

# Qiii: Users with ≥50 ratings and their favourite genre
spark.sql("""
    WITH user_rating_counts AS (
        SELECT user_id, COUNT(*) AS num_ratings
        FROM ratings
        GROUP BY user_id
        HAVING num_ratings >= 50
    ),
    user_genres AS (
        SELECT r.user_id, m.genres, COUNT(*) AS genre_count
        FROM ratings r
        JOIN movies m ON r.movie_id = m.movie_id
        JOIN user_rating_counts u ON r.user_id = u.user_id
        GROUP BY r.user_id, m.genres
    ),
    ranked_genres AS (
        SELECT user_id, genres, genre_count,
               ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY genre_count DESC) AS rank
        FROM user_genres
    )
    SELECT user_id, genres AS favourite_genre, genre_count
    FROM ranked_genres
    WHERE rank = 1
    LIMIT 10
""").show(truncate=False)

25/07/21 16:18:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+---------------+-----------+
|user_id|favourite_genre|genre_count|
+-------+---------------+-----------+
|1      |Drama          |39         |
|2      |Drama          |13         |
|3      |Drama          |6          |
|5      |Comedy         |28         |
|6      |Drama          |48         |
|7      |Drama          |51         |
|8      |Action,Thriller|6          |
|10     |Drama          |34         |
|11     |Comedy         |27         |
|12     |Drama          |8          |
+-------+---------------+-----------+



## Question iv) Find all the users who are less than 20 years old.

This is a simple filter to identify younger users and is useful for demographic profiling.

In [51]:
users_df.createOrReplaceTempView("users")

spark.sql("""
    SELECT * 
    FROM users 
    WHERE age < 20
    LIMIT 10
""").show(truncate=False)

+-------+---+------+-------------+--------+
|user_id|age|gender|occupation   |zip_code|
+-------+---+------+-------------+--------+
|30     |7  |M     |student      |55436   |
|36     |19 |F     |student      |93117   |
|52     |18 |F     |student      |55105   |
|57     |16 |M     |none         |84010   |
|67     |17 |M     |student      |60402   |
|68     |19 |M     |student      |22904   |
|101    |15 |M     |student      |05146   |
|110    |19 |M     |student      |77840   |
|142    |13 |M     |other        |48118   |
|179    |15 |M     |entertainment|20755   |
+-------+---+------+-------------+--------+



## Question v) Find all the users whose occupation is “scientist” and whose age is between 30 and 40 years old.

This is a targeted filtering step of professionals by occupation and age group. This shows how Spark SQL supports conditional filtering for insights.

In [53]:

spark.sql("""
    SELECT * 
    FROM users 
    WHERE occupation = 'scientist' 
      AND age BETWEEN 30 AND 40
    LIMIT 10
""").show(truncate=False)


+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|40     |38 |M     |scientist |27514   |
|71     |39 |M     |scientist |98034   |
|74     |39 |M     |scientist |T8H1N   |
|107    |39 |M     |scientist |60466   |
|183    |33 |M     |scientist |27708   |
|272    |33 |M     |scientist |53706   |
|309    |40 |M     |scientist |70802   |
|337    |37 |M     |scientist |10522   |
|430    |38 |M     |scientist |98199   |
|538    |31 |M     |scientist |21010   |
+-------+---+------+----------+--------+



## Save filtered data to Cassandra

Filtered datasets are encouraged to be saved for future analysis or integration with other systems (e.g. dashboards or apps). This reflects practical data analytics skills.

In [55]:
# Qiv: Users under 20
users_under_20 = users_df.filter(users_df.age < 20)

# Qv: Scientists between 30 and 40
scientists_30_40 = users_df.filter((users_df.occupation == "scientist") & (users_df.age >= 30) & (users_df.age <= 40))

In [56]:
# Write users under 20 to Cassandra
users_under_20.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="users_under_20", keyspace="movielens") \
    .save()

# Write scientists aged 30–40 to Cassandra
scientists_30_40.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="scientists_30_40", keyspace="movielens") \
    .save()

## Verifying Data Saved in Cassandra

Reading data back from Cassandra ensures that the data was written and saved correctly and can be reused.

In [58]:
# Read back to confirm
under_20_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="users_under_20", keyspace="movielens") \
    .load()

scientists_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="scientists_30_40", keyspace="movielens") \
    .load()

under_20_df.show()
scientists_df.show()

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|    375| 17|     M|entertainment|   37777|
|    851| 18|     M|        other|   29646|
|    859| 18|     F|        other|   06492|
|    813| 14|     F|      student|   02136|
|     52| 18|     F|      student|   55105|
|    397| 17|     M|      student|   27514|
|    257| 17|     M|      student|   77005|
|    425| 19|     M|      student|   58644|
|    110| 19|     M|      student|   77840|
|    849| 15|     F|      student|   25652|
|    729| 19|     M|      student|   56567|
|    221| 19|     M|      student|   20685|
|    368| 18|     M|      student|   92113|
|    507| 18|     F|       writer|   28450|
|    624| 19|     M|      student|   30067|
|    592| 18|     M|      student|   97520|
|    434| 16|     F|      student|   49705|
|    631| 18|     F|      student|   38866|
|    787| 18|     F|      student|   98620|
|    646| 17|     F|      studen

## Conclusion

All required datasets were loaded, processed, and persisted into Cassandra. The SQL queries answered the assignment questions with the top 10 results displayed as requested. Moreover, filtered datasets were also saved for future analysis, with verification steps confirming data integrity. Therefore, this notebook demonstrates end-to-end data analysis using Spark and Cassandra on the MovieLens 100K dataset.