In [21]:
import findspark

findspark.init('/home/darshan/spark-3.0.0-preview2-bin-hadoop2.7')

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
from pyspark.sql.functions import format_number

In [22]:
spark = SparkSession.builder.appName("ml-100k").getOrCreate()

In [23]:
lines = spark.sparkContext.textFile("u.data")

In [24]:
def parseInput(line):
    fields = line.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]), timestamp = (fields[3]))

In [25]:
movies = lines.map(parseInput)

movieDataset = spark.createDataFrame(movies)

In [26]:
movieDataset.printSchema()

root
 |-- movieID: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- userID: long (nullable = true)



In [27]:
movieDataset.show()

+-------+------+---------+------+
|movieID|rating|timestamp|userID|
+-------+------+---------+------+
|     50|   5.0|881250949|     0|
|    172|   5.0|881250949|     0|
|    133|   1.0|881250949|     0|
|    242|   3.0|881250949|   196|
|    302|   3.0|891717742|   186|
|    377|   1.0|878887116|    22|
|     51|   2.0|880606923|   244|
|    346|   1.0|886397596|   166|
|    474|   4.0|884182806|   298|
|    265|   2.0|881171488|   115|
|    465|   5.0|891628467|   253|
|    451|   3.0|886324817|   305|
|     86|   3.0|883603013|     6|
|    257|   2.0|879372434|    62|
|   1014|   5.0|879781125|   286|
|    222|   5.0|876042340|   200|
|     40|   3.0|891035994|   210|
|     29|   3.0|888104457|   224|
|    785|   3.0|879485318|   303|
|    387|   5.0|879270459|   122|
+-------+------+---------+------+
only showing top 20 rows



In [28]:
def loadMovieNames():
    movieID = []
    movieName = []
    releaseDate = []
    with open("u.item", encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieID.append(int(fields[0]))
            movieName.append(fields[1])
            releaseDate.append(fields[2])
    return movieID, movieName, releaseDate

In [29]:
movieID, movieName, releaseDate = loadMovieNames()

df = pd.DataFrame({'movieID':movieID, 'movieName':movieName, 'releaseDate':releaseDate})
movieNames = spark.createDataFrame(df)

In [30]:
movieNames.printSchema()

root
 |-- movieID: long (nullable = true)
 |-- movieName: string (nullable = true)
 |-- releaseDate: string (nullable = true)



In [31]:
movieNames.show()

+-------+--------------------+-----------+
|movieID|           movieName|releaseDate|
+-------+--------------------+-----------+
|      1|    Toy Story (1995)|01-Jan-1995|
|      2|    GoldenEye (1995)|01-Jan-1995|
|      3|   Four Rooms (1995)|01-Jan-1995|
|      4|   Get Shorty (1995)|01-Jan-1995|
|      5|      Copycat (1995)|01-Jan-1995|
|      6|Shanghai Triad (Y...|01-Jan-1995|
|      7|Twelve Monkeys (1...|01-Jan-1995|
|      8|         Babe (1995)|01-Jan-1995|
|      9|Dead Man Walking ...|01-Jan-1995|
|     10|  Richard III (1995)|22-Jan-1996|
|     11|Seven (Se7en) (1995)|01-Jan-1995|
|     12|Usual Suspects, T...|14-Aug-1995|
|     13|Mighty Aphrodite ...|30-Oct-1995|
|     14|  Postino, Il (1994)|01-Jan-1994|
|     15|Mr. Holland's Opu...|29-Jan-1996|
|     16|French Twist (Gaz...|01-Jan-1995|
|     17|From Dusk Till Da...|05-Feb-1996|
|     18|White Balloon, Th...|01-Jan-1995|
|     19|Antonia's Line (1...|01-Jan-1995|
|     20|Angels and Insect...|01-Jan-1995|
+-------+--

In [32]:
#top 10 most rated movies
movie_count = movieDataset.groupBy('movieID').count()

most_rated_movies = movie_count.join(movieNames, movie_count.movieID == movieNames.movieID). \
                    select(['movieName','count']).orderBy(movie_count['count'].desc())
most_rated_movies.show(10)

+--------------------+-----+
|           movieName|count|
+--------------------+-----+
|    Star Wars (1977)|  584|
|      Contact (1997)|  509|
|        Fargo (1996)|  508|
|Return of the Jed...|  507|
|    Liar Liar (1997)|  485|
|English Patient, ...|  481|
|       Scream (1996)|  478|
|    Toy Story (1995)|  452|
|Air Force One (1997)|  431|
|Independence Day ...|  429|
+--------------------+-----+
only showing top 10 rows



In [33]:
#Average Ratings for each movie
averageRatings = movieDataset.groupBy("movieID").avg("rating")


averageRatings = averageRatings.withColumn("avg(rating)",format_number('avg(rating)',1))
averageRatings.show()

+-------+-----------+
|movieID|avg(rating)|
+-------+-----------+
|    474|        4.3|
|     29|        2.7|
|     26|        3.5|
|    964|        3.3|
|   1677|        3.0|
|     65|        3.5|
|    191|        4.2|
|   1224|        2.7|
|    558|        3.7|
|   1010|        3.2|
|    418|        3.6|
|   1277|        3.4|
|   1258|        2.5|
|    541|        2.9|
|   1360|        1.5|
|    222|        3.7|
|    938|        2.9|
|    293|        3.8|
|    270|        3.6|
|   1127|        2.9|
+-------+-----------+
only showing top 20 rows



In [34]:
#Number of movies for each rating
countsPerMovie = movieDataset.groupBy("rating").count()
countsPerMovie.show()

+------+-----+
|rating|count|
+------+-----+
|   1.0| 6111|
|   4.0|34174|
|   3.0|27145|
|   2.0|11370|
|   5.0|21203|
+------+-----+



In [35]:
#Average & counts for each movie
averagesAndCounts = movie_count.join(averageRatings, "movieID")

averagesAndCounts.orderBy(averagesAndCounts['avg(rating)'].desc()).show()

+-------+-----+-----------+
|movieID|count|avg(rating)|
+-------+-----+-----------+
|   1201|    1|        5.0|
|   1500|    2|        5.0|
|   1653|    1|        5.0|
|   1599|    1|        5.0|
|    814|    1|        5.0|
|   1189|    3|        5.0|
|   1293|    3|        5.0|
|   1122|    1|        5.0|
|   1536|    1|        5.0|
|   1467|    2|        5.0|
|   1449|    8|        4.6|
|   1398|    2|        4.5|
|   1642|    2|        4.5|
|    483|  243|        4.5|
|    318|  298|        4.5|
|    408|  112|        4.5|
|   1594|    2|        4.5|
|    169|  118|        4.5|
|    119|    4|        4.5|
|    114|   67|        4.4|
+-------+-----+-----------+
only showing top 20 rows



In [36]:
#highest rated movies(out of movies which are rated by atleast 10 people)
averagesAndCounts.filter("count > 10").join(movieNames, 'movieID'). \
select(['movieName','avg(rating)']).orderBy(averagesAndCounts['avg(rating)'].desc()).show()

+--------------------+-----------+
|           movieName|avg(rating)|
+--------------------+-----------+
|Close Shave, A (1...|        4.5|
|   Casablanca (1942)|        4.5|
|Schindler's List ...|        4.5|
|Wrong Trousers, T...|        4.5|
|Shawshank Redempt...|        4.4|
|  Rear Window (1954)|        4.4|
|Wallace & Gromit:...|        4.4|
|Usual Suspects, T...|        4.4|
|    Star Wars (1977)|        4.4|
|Raiders of the Lo...|        4.3|
|To Kill a Mocking...|        4.3|
|      Vertigo (1958)|        4.3|
|Third Man, The (1...|        4.3|
| Citizen Kane (1941)|        4.3|
| 12 Angry Men (1957)|        4.3|
|Dr. Strangelove o...|        4.3|
|Godfather, The (1...|        4.3|
|North by Northwes...|        4.3|
|Manchurian Candid...|        4.3|
|Some Folks Call I...|        4.3|
+--------------------+-----------+
only showing top 20 rows



In [37]:
#Lowest rated movies(out of movies which are rated by atleast 10 people)

averagesAndCounts.filter("count > 10").join(movieNames, 'movieID'). \
select(['movieName','avg(rating)']).orderBy('avg(rating)').show()

+--------------------+-----------+
|           movieName|avg(rating)|
+--------------------+-----------+
|Children of the C...|        1.3|
|   Body Parts (1991)|        1.6|
|Amityville II: Th...|        1.6|
|Free Willy 3: The...|        1.7|
|    Robocop 3 (1993)|        1.7|
|Lawnmower Man 2: ...|        1.7|
|Leave It to Beave...|        1.8|
|Vampire in Brookl...|        1.8|
|Ready to Wear (Pr...|        1.8|
| Gone Fishin' (1997)|        1.8|
|         Solo (1996)|        1.8|
|    Barb Wire (1996)|        1.9|
|Crow: City of Ang...|        1.9|
|    Big Bully (1996)|        1.9|
|     Bio-Dome (1996)|        1.9|
|     Jaws 3-D (1983)|        1.9|
| Home Alone 3 (1997)|        1.9|
|All Dogs Go to He...|        1.9|
|    Mr. Magoo (1997)|        1.9|
|    Jury Duty (1995)|        2.0|
+--------------------+-----------+
only showing top 20 rows



In [38]:
#Getting the Year from the Release Date
movieNamesYear = movieNames.withColumn('releaseYear',  movieNames.releaseDate.substr(8, 4))

In [39]:
#Most number of movie releases in a year
YearCount = movieNamesYear.groupBy('releaseYear').count()
YearCount.orderBy(YearCount['count'].desc()).show()

+-----------+-----+
|releaseYear|count|
+-----------+-----+
|       1996|  355|
|       1997|  286|
|       1995|  219|
|       1994|  214|
|       1993|  126|
|       1998|   65|
|       1992|   37|
|       1990|   24|
|       1991|   22|
|       1986|   15|
|       1989|   15|
|       1987|   13|
|       1982|   13|
|       1981|   12|
|       1988|   11|
|       1979|    9|
|       1958|    9|
|       1980|    8|
|       1974|    8|
|       1957|    8|
+-----------+-----+
only showing top 20 rows

