In [None]:
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def parseUser(line):
    fields=line.split('|')
    return Row(user_id=int(fields[0]),age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4])

def parseRating(line):
    fields=line.split('\t')
    return Row(user_id=int(fields[0]),movie_id=int(fields[1]),rating=float(fields[2]),timestamp=int(fields[3]))

def parseMovie(line):
    fields=line.split('|')
    return Row(movie_id=int(fields[0]),title=fields[1],release_year=fields[2],unknown=bool(int(fields[5])),action=bool(int(fields[6])),adventure=bool(int(fields[7])),animation=bool(int(fields[8])),children=bool(int(fields[9])),comedy=bool(int(fields[10])),crime=bool(int(fields[11])),documentary=bool(int(fields[12])),drama=bool(int(fields[13])),fantasy=bool(int(fields[14])),filmnoir=bool(int(fields[15])),horror=bool(int(fields[16])),musical=bool(int(fields[17])),mystery=bool(int(fields[18])),romance=bool(int(fields[19])),scifi=bool(int(fields[20])),thriller=bool(int(fields[21])),war=bool(int(fields[22])),western=bool(int(fields[23])))

if __name__ == "__main__":
    #Create a SparkSession
    spark = SparkSession.builder.appName("TEST").config("spark.cassandra.connection.host","127.0.0.1").getOrCreate()
    #Get the data & Convert to RDD & Convert to DataFrame
    user_lines=spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.user")
    users = user_lines.map(parseUser)
    usersDataset = spark.createDataFrame(users)
    rating_lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    ratings = rating_lines.map(parseRating)
    ratingsDataset = spark.createDataFrame(ratings)
    movie_lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.item")
    movies = movie_lines.map(parseMovie)
    moviesDataset = spark.createDataFrame(movies)
    #Write into Cassandra
    usersDataset.write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="users", keyspace="movielens")\
        .save()
    ratingsDataset.write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="ratings", keyspace="movielens")\
        .save()
    moviesDataset.write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="movies", keyspace="movielens")\
        .save()
    #Read from Cassandra into DataFrame
    readUsers=spark.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table="users", keyspace="movielens")\
        .load()
    readRatings=spark.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table="ratings", keyspace="movielens")\
        .load()
    readMovies=spark.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table="movies", keyspace="movielens")\
        .load()
    readUsers.createOrReplaceTempView("users")
    readRatings.createOrReplaceTempView("ratings")
    readMovies.createOrReplaceTempView("movies")
    #Q1:
    sqlDF1=spark.sql("SELECT r.movie_id,m.title,ROUND(AVG(r.rating),2) AS avg_rating FROM ratings r JOIN movies m ON r.movie_id=m.movie_id GROUP BY r.movie_id,m.title ORDER BY r.movie_id LIMIT 10")
    sqlDF1.show()
    #Q2:
    sqlDF2=spark.sql("SELECT r.movie_id,m.title,ROUND(AVG(r.rating),2) AS avg_rating FROM ratings r JOIN movies m ON r.movie_id=m.movie_id GROUP BY r.movie_id,m.title ORDER BY avg_rating DESC LIMIT 10")
    sqlDF2.show()
    #Q4:
    sqlDF4=spark.sql("SELECT user_id,age,occupation FROM users WHERE age<20 ORDER BY age LIMIT 10")
    sqlDF4.show()
    #Q5:
    sqlDF5=spark.sql("SELECT user_id,age,occupation FROM users WHERE occupation='scientist' AND age BETWEEN 30 AND 40 ORDER BY age LIMIT 10")
    sqlDF5.show()
    #Q3:
    #Find the user_id of people who have rated more than 50 times
    active_users=spark.sql("SELECT user_id,COUNT(*) AS total_ratings FROM ratings GROUP BY user_id HAVING COUNT(*) >=50")
    active_users.createOrReplaceTempView("active_users")
    #Find user genre preferences
    user_genre_preferences=spark.sql("SELECT r.user_id,SUM(CAST(m.unknown AS INT)) AS unknown,SUM(CAST(m.action AS INT)) AS action,SUM(CAST(m.adventure AS INT)) AS adventure,SUM(CAST(m.animation AS INT)) AS animation,SUM(CAST(m.children AS INT)) AS children,SUM(CAST(m.comedy AS INT)) AS comedy,SUM(CAST(m.crime AS INT)) AS crime,SUM(CAST(m.documentary AS INT)) AS documentary,SUM(CAST(m.drama AS INT)) AS drama,SUM(CAST(m.fantasy AS INT)) AS fantasy,SUM(CAST(m.filmnoir AS INT)) AS filmnoir,SUM(CAST(m.horror AS INT)) AS horror,SUM(CAST(m.musical AS INT)) AS musical,SUM(CAST(m.mystery AS INT)) AS mystery,SUM(CAST(m.romance AS INT)) AS romance,SUM(CAST(m.scifi AS INT)) AS scifi,SUM(CAST(m.thriller AS INT)) AS thriller,SUM(CAST(m.war AS INT)) AS war,SUM(CAST(m.western AS INT)) AS western FROM ratings r JOIN movies m ON r.movie_id=m.movie_id JOIN active_users a ON r.user_id=a.user_id GROUP BY r.user_id")
    user_genre_preferences.createOrReplaceTempView("user_genre_preferences")
    sqlDF3=spark.sql("SELECT u.user_id,u.occupation,CASE WHEN action = max_value THEN 'action' WHEN adventure=max_value THEN 'adventure' WHEN animation = max_value THEN 'animation' WHEN children=max_value THEN 'children' WHEN comedy=max_value THEN 'comedy' WHEN crime=max_value THEN 'crime' WHEN documentary=max_value THEN 'documentary' WHEN drama=max_value THEN 'drama' WHEN fantasy=max_value THEN 'fantasy' WHEN filmnoir=max_value THEN 'filmnoir' WHEN horror=max_value THEN 'horror' WHEN musical=max_value THEN 'musical' WHEN mystery=max_value THEN 'mystery' WHEN romance=max_value THEN 'romance' WHEN scifi=max_value THEN 'scifi' WHEN thriller=max_value THEN 'thriller' WHEN war=max_value THEN 'thriller' WHEN western=max_value THEN 'western' ELSE 'unknown' END AS favorite_genre FROM user_genre_preferences p JOIN (SELECT user_id,GREATEST(unknown,action,adventure,animation,children,comedy,crime,documentary,drama,fantasy,filmnoir,horror,musical,mystery,romance,scifi,thriller,war,western)AS max_value FROM user_genre_preferences) m ON p.user_id=m.user_id JOIN users u ON p.user_id=u.user_id ORDER BY u.user_id LIMIT 10")
    sqlDF3.show()
    spark.stop()