In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("movie-analytics")
sc = SparkContext(conf=conf)

In [2]:
print(sc)

<pyspark.context.SparkContext object at 0x047CCBB0>


In [3]:
## Create an sql context
from pyspark.sql import SQLContext
import collections
sqlContext = SQLContext(sc)
print(sqlContext)

<pyspark.sql.context.SQLContext object at 0x04C14D90>


In [4]:
# Data Definition:
# Column 1: User ID
# Column 2: Movie ID
# Column 3: Rating
# Column 4: Timestamp

# calculate the number of movies that are rated on a scale of 1 to 5.
my_lines = sc.textFile('E:/code/git-2018/ETL-Workflow/ETL-Examples/src/main/python/resources/movie-lens-data/ml-100k/u.data')
ratings = my_lines.map(lambda x : x.split()[2])
res = ratings.countByValue()
my_sortedres = collections.OrderedDict(sorted(res.items()))
for key,value in my_sortedres.items():
    print ("%s %i" %(key,value))

1 6110
2 11370
3 27145
4 34174
5 21201


In [5]:
# Read the tab separated file. Which contains userid, movieid, ratings and timestamp
ratings = sqlContext.read.format("com.databricks.spark.csv")\
          .options(delimiter='\t')\
          .load('E:/code/git-2018/ETL-Workflow/ETL-Examples/src/main/python/resources/movie-lens-data/ml-100k/ratings.csv')

In [6]:
# What is the data type of the ratings variable. It should be a dataframe.
ratings

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [7]:
# Displaying Records
ratings.show()

+---+----+---+---------+
|_c0| _c1|_c2|      _c3|
+---+----+---+---------+
|196| 242|  3|881250949|
|186| 302|  3|891717742|
| 22| 377|  1|878887116|
|244|  51|  2|880606923|
|166| 346|  1|886397596|
|298| 474|  4|884182806|
|115| 265|  2|881171488|
|253| 465|  5|891628467|
|305| 451|  3|886324817|
|  6|  86|  3|883603013|
| 62| 257|  2|879372434|
|286|1014|  5|879781125|
|200| 222|  5|876042340|
|210|  40|  3|891035994|
|224|  29|  3|888104457|
|303| 785|  3|879485318|
|122| 387|  5|879270459|
|194| 274|  2|879539794|
|291|1042|  4|874834944|
|234|1184|  2|892079237|
+---+----+---+---------+
only showing top 20 rows



In [8]:
# Describe the schema of the records
ratings.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [9]:
ratings.schema

StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true)))

In [10]:
# Apply a custom defined schema to the dataframe

from pyspark.sql.types import *

fields = [StructField("userid", IntegerType(), True),
        StructField("movieid", IntegerType(), True),
        StructField("rating", IntegerType(), True),
        StructField("timestamp", LongType(), True)]

In [11]:
# Applying the schema, while reading the records

## Read the tab separated file. Which contains userid, movieid, ratings and timestamp
ratings_df = sqlContext.read.format("com.databricks.spark.csv")\
                            .options(delimiter='\t')\
                            .load('E:/code/git-2018/ETL-Workflow/ETL-Examples/src/main/python/resources/movie-lens-data/ml-100k/ratings.csv',schema = StructType(fields))

In [12]:
ratings_df.show(5, False)

+------+-------+------+---------+
|userid|movieid|rating|timestamp|
+------+-------+------+---------+
|196   |242    |3     |881250949|
|186   |302    |3     |891717742|
|22    |377    |1     |878887116|
|244   |51     |2     |880606923|
|166   |346    |1     |886397596|
+------+-------+------+---------+
only showing top 5 rows



In [13]:
# The RDD or DataFrame can be persisted in Memory

ratings_df.persist()

DataFrame[userid: int, movieid: int, rating: int, timestamp: bigint]

In [14]:
ratings_df.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [15]:
# Show the list of columns in the dataframe
ratings_df.columns

['userid', 'movieid', 'rating', 'timestamp']

In [16]:
## How many records in the dataframe?
ratings_df.count()

100000

In [17]:
# How to drop a column?
ratings_df = ratings_df.drop('timestamp')

In [18]:
ratings_df.show(5, False)

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|196   |242    |3     |
|186   |302    |3     |
|22    |377    |1     |
|244   |51     |2     |
|166   |346    |1     |
+------+-------+------+
only showing top 5 rows



In [19]:
# Applying operations like groupby() and sort()
movie_counts = ratings_df.groupBy("movieid").count()

In [20]:
from pyspark.sql.functions import *
movie_counts = movie_counts.sort(desc("count"))

In [21]:
movie_counts.show(10, False)

+-------+-----+
|movieid|count|
+-------+-----+
|50     |583  |
|258    |509  |
|100    |508  |
|181    |507  |
|294    |485  |
|286    |481  |
|288    |478  |
|1      |452  |
|300    |431  |
|121    |429  |
+-------+-----+
only showing top 10 rows



In [22]:
# Applying an aggregation function to the group by
avg_ratings = ratings_df.groupBy("movieid").agg({"rating":"avg"})

In [23]:
avg_ratings.show(10, False)

+-------+------------------+
|movieid|avg(rating)       |
+-------+------------------+
|496    |4.121212121212121 |
|471    |3.6108597285067874|
|463    |3.859154929577465 |
|148    |3.203125          |
|1342   |2.5               |
|833    |3.204081632653061 |
|1088   |2.230769230769231 |
|1591   |3.1666666666666665|
|1238   |3.125             |
|1580   |1.0               |
+-------+------------------+
only showing top 10 rows



In [24]:
avg_ratings = avg_ratings.sort(desc("avg(rating)"))

In [27]:
avg_ratings.show(10, False)

+-------+-----------+
|movieid|avg(rating)|
+-------+-----------+
|814    |5.0        |
|1536   |5.0        |
|1467   |5.0        |
|1293   |5.0        |
|1500   |5.0        |
|1599   |5.0        |
|1122   |5.0        |
|1201   |5.0        |
|1653   |5.0        |
|1189   |5.0        |
+-------+-----------+
only showing top 10 rows



In [28]:
# Joining multiple dataframes
avg_ratings_count = avg_ratings.join( movie_counts,
                                   avg_ratings.movieid == movie_counts.movieid ,
                                   'inner' ).drop(movie_counts.movieid)

In [29]:
avg_ratings_count.printSchema()

root
 |-- avg(rating): double (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- count: long (nullable = false)



In [30]:
# Renaming a column in a dataframe
avg_ratings_count = avg_ratings_count.withColumnRenamed("avg(rating)","mean_rating" )

In [31]:
avg_ratings_count.printSchema()

root
 |-- mean_rating: double (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- count: long (nullable = false)



In [32]:
avg_ratings_count = avg_ratings_count\
                    .withColumn("mean_rating",
                              round(avg_ratings_count["mean_rating"]
                                    ,2))

In [33]:
avg_ratings_count = avg_ratings_count.sort(desc("mean_rating"))

In [36]:
avg_ratings_count.show(10, False)

+-----------+-------+-----+
|mean_rating|movieid|count|
+-----------+-------+-----+
|5.0        |1500   |2    |
|5.0        |1201   |1    |
|5.0        |1599   |1    |
|5.0        |1122   |1    |
|5.0        |1467   |2    |
|5.0        |1189   |3    |
|5.0        |1293   |3    |
|5.0        |1536   |1    |
|5.0        |814    |1    |
|5.0        |1653   |1    |
+-----------+-------+-----+
only showing top 10 rows



In [37]:
# Filtering records in a dataframe based on a criteria
avg_ratings_count = avg_ratings_count.filter(avg_ratings_count["count"] > 20)

In [38]:
avg_ratings_count = avg_ratings_count.sort(desc("mean_rating") ,desc("count"))

In [39]:
avg_ratings_count.show(100, False)

+-----------+-------+-----+
|mean_rating|movieid|count|
+-----------+-------+-----+
|4.49       |408    |112  |
|4.47       |318    |298  |
|4.47       |169    |118  |
|4.46       |483    |243  |
|4.45       |64     |283  |
|4.45       |114    |67   |
|4.39       |12     |267  |
|4.39       |603    |209  |
|4.36       |50     |583  |
|4.34       |178    |125  |
|4.33       |513    |72   |
|4.29       |98     |390  |
|4.29       |357    |264  |
|4.29       |427    |219  |
|4.29       |134    |198  |
|4.29       |963    |41   |
|4.28       |127    |413  |
|4.28       |480    |179  |
|4.27       |285    |162  |
|4.26       |272    |198  |
|4.26       |657    |131  |
|4.26       |251    |46   |
|4.25       |174    |420  |
|4.25       |313    |350  |
|4.25       |474    |194  |
|4.25       |479    |179  |
|4.23       |511    |173  |
|4.21       |484    |138  |
|4.21       |641    |33   |
|4.2        |172    |367  |
|4.2        |515    |201  |
|4.2        |223    |136  |
|4.2        |316    