In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS']='--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "movie lens app")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
def load_and_get_table_df(keys_space_name, table_name):
    table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table=table_name, keyspace=keys_space_name)\
        .load()
    return table_df

In [4]:
movies = load_and_get_table_df("movie_dataset", "movies")

In [5]:
ratings=load_and_get_table_df("movie_dataset","ratings")

In [6]:
#Getting Schema of the movies and ratings table
print("Movies Table Schema",movies.printSchema)
print("Ratings Table Schema",ratings.printSchema)

('Movies Table Schema', <bound method DataFrame.printSchema of DataFrame[movie_id: int, genres: string, movie_title: string, year: int]>)
('Ratings Table Schema', <bound method DataFrame.printSchema of DataFrame[user_id: int, movie_id: int, rating: double, timestamp: bigint]>)


In [8]:
#Showing the first few records of movies data table
movies.head(5)

[Row(movie_id=6981, genres=u'Drama', movie_title=u'Ordet Word, The ', year=1955),
 Row(movie_id=3254, genres=u'Comedy', movie_title=u"Wayne's World 2 ", year=1993),
 Row(movie_id=4877, genres=u'Comedy|Romance', movie_title=u'Better Than Sex ', year=2000),
 Row(movie_id=170551, genres=u'Comedy', movie_title=u'The Quiet Family ', year=1998),
 Row(movie_id=7071, genres=u'Drama', movie_title=u'Woman Under the Influence, A ', year=1974)]

In [9]:
#Showing the first few records of ratins data table
ratings.head(5)

[Row(user_id=53, movie_id=203, rating=5.0, timestamp=1237748081),
 Row(user_id=53, movie_id=249, rating=5.0, timestamp=1237748109),
 Row(user_id=53, movie_id=381, rating=5.0, timestamp=1237748115),
 Row(user_id=53, movie_id=413, rating=5.0, timestamp=1237748065),
 Row(user_id=53, movie_id=481, rating=5.0, timestamp=1237748105)]

In [10]:
#Joining movies and ratings table and showing the first few records
movies_ratings=movies.join(ratings,movies.movie_id==ratings.movie_id,"left_outer")
movies_ratings.head(5)

[Row(movie_id=69227, genres=u'Children|Comedy', movie_title=u'Ernest Rides Again ', year=1993, user_id=599, movie_id=69227, rating=1.0, timestamp=1519373251),
 Row(movie_id=164753, genres=u'Romance', movie_title=u'Anything for Love ', year=2016, user_id=448, movie_id=164753, rating=2.0, timestamp=1489862782),
 Row(movie_id=152091, genres=u'Comedy', movie_title=u'The Brothers Grimsby ', year=2016, user_id=125, movie_id=152091, rating=4.0, timestamp=1474411738),
 Row(movie_id=152091, genres=u'Comedy', movie_title=u'The Brothers Grimsby ', year=2016, user_id=249, movie_id=152091, rating=3.0, timestamp=1518388995),
 Row(movie_id=152091, genres=u'Comedy', movie_title=u'The Brothers Grimsby ', year=2016, user_id=365, movie_id=152091, rating=3.0, timestamp=1489796774)]

In [20]:
#Finding out number of +ve reviews for movie
# 3.5 is taken as threshold; Anything >= 3.5 is +ve 
movies_pos=movies_ratings.filter(movies_ratings.rating>=3.5)
movies_pos.groupBy("movie_title").count().show()

+--------------------+-----+
|         movie_title|count|
+--------------------+-----+
|Tale of Two Siste...|    3|
|   House Bunny, The |    3|
|Road Warrior, The...|   30|
|Kissing Jessica S...|    3|
|Night at the Oper...|    9|
|   Army of Darkness |   42|
|       Maximum Ride |    1|
|          127 Hours |   15|
|    Band Wagon, The |    2|
|    Alien: Covenant |    2|
|Passage to India, A |    2|
|       General, The |    9|
|              Dummy |    3|
|               Dune |   28|
|Inglorious Bastar...|    5|
|'Hellboy': The Se...|    1|
|          Gift, The |    3|
|Girltrash: All Ni...|    1|
|           Jump In! |    1|
|Hard Day's Night, A |   10|
+--------------------+-----+
only showing top 20 rows



In [21]:
#Finding out number of +ve reviews for movie
# 3.5 is taken as threshold; Anything >= 3.5 is +ve 
movies_neg=movies_ratings.filter(movies_ratings.rating<3.5)
movies_neg.groupBy("movie_title").count().show()

+--------------------+-----+
|         movie_title|count|
+--------------------+-----+
|Tale of Two Siste...|    1|
|   House Bunny, The |    7|
|Road Warrior, The...|   10|
|Kissing Jessica S...|    1|
|   Army of Darkness |    9|
|       Leprechaun 2 |    1|
|     Addiction, The |    2|
|          127 Hours |    3|
|    Band Wagon, The |    1|
|    Alien: Covenant |    3|
|       General, The |    2|
|              Dummy |    1|
|            Altered |    1|
|Orca: The Killer ...|    1|
|               Dune |   13|
|          SpaceCamp |    6|
|Earth Girls Are E...|    8|
|Inglorious Bastar...|    2|
|Batman: The Killi...|    1|
|          Gift, The |   10|
+--------------------+-----+
only showing top 20 rows



In [31]:
#Checking the count of number of movies segregated by year
movies_ratings.groupBy("year").count().show()

+----+-----+
|year|count|
+----+-----+
|1959|  243|
|1990| 1926|
|3000|   46|
|1903|    2|
|1975|  636|
|1977|  567|
|1924|    6|
|2003| 3138|
|2007| 2293|
|2018|   92|
|1974|  465|
|2015| 1088|
|1927|   29|
|1955|  182|
|2006| 2585|
|1978|  490|
|1908|    1|
|1925|   19|
|1961|  226|
|2013| 1199|
+----+-----+
only showing top 20 rows



In [60]:
#Creating a DataFrame for getting the total +Ve and -ve reviews given by a user
#+ve(ratings>=3.5) and -ve(ratins<3.5)
from pyspark.sql.functions  import *
user_temp_df1=movies_pos.select("user_id","rating")
user_temp_df1=user_temp_df1.groupBy("user_id").count().withColumnRenamed("count","pos_rating")
# user_temp_df1.show()

user_temp_df2=movies_neg.select("user_id","rating")
user_temp_df2=user_temp_df2.groupBy("user_id").count().withColumnRenamed("count","neg_rating")

# user_temp_df1.join(user_temp_df2,"user_id").show()
user_df=user_temp_df1.join(user_temp_df2,"user_id")
user_df.show()




+-------+----------+----------+
|user_id|pos_rating|neg_rating|
+-------+----------+----------+
|    148|        35|        13|
|    463|        26|         7|
|    471|        23|         5|
|    496|        17|        12|
|    243|        26|        10|
|    392|        10|        15|
|    540|        36|         6|
|     31|        36|        14|
|    516|        14|        12|
|     85|        23|        11|
|    137|       116|        25|
|    251|        22|         1|
|    451|        20|        14|
|    580|       294|       142|
|     65|        30|         4|
|    458|        44|        15|
|    255|        11|        33|
|    481|         7|        24|
|    588|        19|        37|
|    133|         7|        28|
+-------+----------+----------+
only showing top 20 rows



In [73]:
from pyspark.sql.functions  import *
year_temp_df1=movies_pos.select("year","rating")
year_temp_df1=year_temp_df1.groupBy("year").count().withColumnRenamed("count","pos_rating")
# user_temp_df1.show()

year_temp_df2=movies_neg.select("year","rating")
year_temp_df2=year_temp_df2.groupBy("year").count().withColumnRenamed("count","neg_rating")

# user_temp_df1.join(user_temp_df2,"user_id").show()
year_df=year_temp_df1.join(year_temp_df2,"year")
year_df.show()

+----+----------+----------+
|year|pos_rating|neg_rating|
+----+----------+----------+
|1959|       174|        69|
|1990|      1084|       842|
|1903|         1|         1|
|3000|        27|        19|
|1975|       476|       160|
|1977|       411|       156|
|1924|         5|         1|
|2003|      1992|      1146|
|2007|      1519|       774|
|2018|        59|        33|
|1974|       364|       100|
|2015|       677|       411|
|1927|        21|         8|
|1955|       130|        52|
|2006|      1664|       921|
|1978|       278|       212|
|1925|        15|         4|
|1961|       149|        76|
|2013|       755|       444|
|1942|       141|        44|
+----+----------+----------+
only showing top 20 rows



In [74]:
from pyspark.sql.functions  import *
movie_temp_df1=movies_pos.select("movie_title","rating")
movie_temp_df1=movie_temp_df1.groupBy("movie_title").count().withColumnRenamed("count","pos_rating")


movie_temp_df2=movies_neg.select("movie_title","rating")
movie_temp_df2=movie_temp_df2.groupBy("movie_title").count().withColumnRenamed("count","neg_rating")


movie_df=movie_temp_df1.join(movie_temp_df2,"movie_title")
movie_df.show()

+--------------------+----------+----------+
|         movie_title|pos_rating|neg_rating|
+--------------------+----------+----------+
|          127 Hours |        15|         3|
|       Alien Nation |         4|         9|
|    Alien: Covenant |         2|         3|
|   Army of Darkness |        42|         9|
|          Bad Santa |        17|         9|
|    Band Wagon, The |         2|         1|
|     Changing Lanes |         4|         5|
|              Dummy |         3|         1|
|               Dune |        28|        13|
|Fast & Furious Fa...|         4|         6|
|       Fletch Lives |         2|         7|
|       General, The |         9|         2|
|          Gift, The |         3|        10|
|Hard Day's Night, A |        10|         9|
|   House Bunny, The |         3|         7|
|Inglorious Bastar...|         5|         2|
|Jonestown: The Li...|         3|         1|
|Kissing Jessica S...|         3|         1|
|             Ransom |        17|        20|
|Remains o