<a href="https://colab.research.google.com/github/DianaKahar/Data_management4/blob/main/P137263_Assignment4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**STQD6324 DATA MANAGEMENT**

In [None]:
pip install pyspark cassandra-driver pandas

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cassandra-driver
  Downloading cassandra_driver-3.29.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.9/18.9 MB[0m [31m31.5 MB/s[0m eta [36m0:00:00[0m
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2932d751dc44796719c75095f0c6adfe8c2656b3111336dd349f8732091bb744
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import avg, col, explode, array

In [None]:
spark = SparkSession.builder \
    .appName("MovieLens Analysis") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .getOrCreate()

In [None]:
def parse_user(line):
    fields = line.split('|')
    return Row(user_id=int(fields[0]), age=int(fields[1]), gender=fields[2], occupation=fields[3], zip=fields[4])

def parse_data(line):
    fields = line.split("\t")
    return Row(user_id=int(fields[0]), movie_id=int(fields[1]), rating=int(fields[2]), timestamp=int(fields[3]))

def parse_item(line):
    fields = line.split("|")
    genres = list(map(int, fields[5:]))
    return Row(movie_id=int(fields[0]), title=fields[1], release_date=fields[2],
               vid_release_date=fields[3], url=fields[4], genres=genres)

In [None]:
if __name__ == "__main__":


    # Parse data
    lines1 = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.user")
    user = line1.map(parse_user)

    line2 = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    rating = line2.map(parse_data)

    line3 = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.item")
    name = line3.map(parse_item)

    # Convert to Dataframe
    userDT = spark.createDataFrame(user)
    ratingDT = spark.createDataFrame(rating)
    nameDT = spark.createDataFrame(name)

    # Write DataFrames to Cassandra
    userDT.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table="user", keyspace="movielen") \
        .save()

    ratingDT.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table="rating", keyspace="movielen") \
        .save()

    nameDT.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table="name", keyspace="movielen") \
        .save()

    # Read DataFrames from Cassandra
    readUser = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="user", keyspace="movielen").load()

    readRating = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="rating", keyspace="movielen").load()

    readName = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="name", keyspace="movielen").load()

    # Create temporary views for DataFrames
    readUser.createOrReplaceTempView("user")
    readRating.createOrReplaceTempView("rating")
    readName.createOrReplaceTempView("name")

    # i) Calculate the average rating for each movie
    avg_ratings = readRating.groupBy("movie_id").agg(avg("rating").alias("avg_rating")).orderBy(col("avg_rating").desc())
    avg_ratings.show(10)

    # ii) Identify the top ten movies with the highest average ratings
    top_movies = avg_ratings.join(readName, "movie_id").select("title", "avg_rating").orderBy(col("avg_rating").desc()).limit(10)
    top_movies.show()

    # iii) Find the users who have rated at least 50 movies and identify their favourite movie genres
    user_ratings_count = readRating.groupBy("user_id").count().filter(col("count") >= 50)
    user_genre_ratings = readRating.join(readName, "movie_id").withColumn("genre", explode(array([col(f).alias(f) for f in [
        "unknown", "action", "adventure", "animation", "children", "comedy", "crime", "documentary",
        "drama", "fantasy", "film_noir", "horror", "musical", "mystery", "romance", "sci_fi", "thriller", "war", "western"]])))
    user_genre_ratings = user_genre_ratings.groupBy("user_id", "genre").count()
    frequent_users_genres = user_ratings_count.join(user_genre_ratings, "user_id").orderBy("user_id", "count", ascending=[1, 0])
    frequent_users_genres.show(10)

    # iv) Find all the users with age that is less than 20 years old
    young_users = readUser.filter(col("age") < 20)
    young_users.show(10)

    # v) Find all the users who have the occupation “scientist” and their age is between 30 and 40 years old
    scientists = readUser.filter((col("occupation") == "scientist") & (col("age").between(30, 40)))
    scientists.show(10)


    spark.stop()