## Spark Developer Training

**Manaranjan Pradhan**<br/>
**manaranjan@enablecloud.com**<br/>
*This notebook is given as part of Spark Training to Participants. Forwarding to others is strictly prohibited.*

In [4]:
from pyspark import SparkConf, SparkContext,SparkFiles
from pyspark.sql import *

spark = SparkSession.builder \
.master("local") \
.appName("BDM2-Local") \
.getOrCreate()


# Lab: Introduction to Spark DataFrames

## Data: Analyzing Movielens Data

Note: The dataset is available at https://grouplens.org/datasets/movielens/

### Things to learn

* Reading a file into spark dataframes 
* Applying schema while reading records into a dataframe
* Displaying records 
* Applying operations like distinct, grouping, sorting, aggregating, filtering etc.
* Joining multiple dataframes
* Utility functions like describing schema, showing records, rename columns, listing columns etc

Documentation for Spark DataFrame APIs are available at

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [5]:
spark

### Loading data onto DBFS

- Go to **Data** on the left menu
- Click on **Create Table**
- Add **lab** to /FileStore/tables directory path
- Drag and drop files **ratings.data** under **Files** section

Repeat the above step for uploading **movies.dat** file.

In [None]:
%fs rm -r /FileStore/tables/lab/topmovies

In [None]:
%fs ls /FileStore/tables/lab

path,name,size
dbfs:/FileStore/tables/lab/movies-1.dat,movies-1.dat,163560
dbfs:/FileStore/tables/lab/movies.dat,movies.dat,163560
dbfs:/FileStore/tables/lab/ratings-1.dat,ratings-1.dat,21593504
dbfs:/FileStore/tables/lab/ratings.dat,ratings.dat,21593504
dbfs:/FileStore/tables/lab/txnjsonsmall,txnjsonsmall,588495


### Reading the ratings file

All ratings are contained in the file ratings.csv. Each line of this file represents one rating of one movie by one user, and has the following format:

userId,movieId,rating,timestamp
The lines within this file are ordered first by userId, then, within user, by movieId.
The columns are TAB separated.

Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).

Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.

In [6]:
ratings = (spark.read   
                .option("sep", "\t")
                .csv('ratings.dat'))

**Note:**  The details of the the csv reader is given in the url https://docs.databricks.com/data/data-sources/read-csv.html

In [7]:
# What is the data type of the ratings variable. It should be a dataframe.
ratings

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

### Displaying Records

In [8]:
# Read the first few rows of the dataframe
ratings.show()

+---+----+---+---------+
|_c0| _c1|_c2|      _c3|
+---+----+---+---------+
|  1|1193|  5|978300760|
|  1| 661|  3|978302109|
|  1| 914|  3|978301968|
|  1|3408|  4|978300275|
|  1|2355|  5|978824291|
|  1|1197|  3|978302268|
|  1|1287|  5|978302039|
|  1|2804|  5|978300719|
|  1| 594|  4|978302268|
|  1| 919|  4|978301368|
|  1| 595|  5|978824268|
|  1| 938|  4|978301752|
|  1|2398|  4|978302281|
|  1|2918|  4|978302124|
|  1|1035|  5|978301753|
|  1|2791|  4|978302188|
|  1|2687|  3|978824268|
|  1|2018|  4|978301777|
|  1|3105|  5|978301713|
|  1|2797|  4|978302039|
+---+----+---+---------+
only showing top 20 rows



### Describe the schema of the records

In [9]:
ratings.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [None]:
ratings.schema

### Apply the schema to the dataframe

In [10]:
from pyspark.sql.types import IntegerType, LongType, StringType, StructType, StructField

fields = StructType([StructField("userid", IntegerType(), True),
          StructField("movieid", IntegerType(), True),
          StructField("rating", IntegerType(), True),
          StructField("timestamp", LongType(), True) ])

### Applying the schema, while reading the records

In [11]:
## Read the tab separated file. Which contains userid, movieid, ratings and timestamp
ratings_df = (spark.read   
                .option("sep", "\t")
                .schema(fields)
                .csv('ratings.dat'))

In [12]:
ratings_df.show()

+------+-------+------+---------+
|userid|movieid|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
|     1|   1197|     3|978302268|
|     1|   1287|     5|978302039|
|     1|   2804|     5|978300719|
|     1|    594|     4|978302268|
|     1|    919|     4|978301368|
|     1|    595|     5|978824268|
|     1|    938|     4|978301752|
|     1|   2398|     4|978302281|
|     1|   2918|     4|978302124|
|     1|   1035|     5|978301753|
|     1|   2791|     4|978302188|
|     1|   2687|     3|978824268|
|     1|   2018|     4|978301777|
|     1|   3105|     5|978301713|
|     1|   2797|     4|978302039|
+------+-------+------+---------+
only showing top 20 rows



In [13]:
ratings_df.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = true)



### Dealing with bad Records


Verify correctness of the data
When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema.


- PERMISSIVE (default): nulls are inserted for fields that could not be parsed correctly
- DROPMALFORMED: drops lines that contain fields that could not be parsed
- FAILFAST: aborts the reading if any malformed data is found

<code>
sqlContext.read.format("csv").option("mode", "DROPMALFORMED").option("badRecordsPath", "/tmp/badRecordsPath")
</code>

### Show the list of columns in the dataframe

In [14]:
# Return a list of columns
ratings_df.columns

['userid', 'movieid', 'rating', 'timestamp']

In [15]:
## How many records in the dataframe?
ratings_df.count()

1000209

### Drop a column

In [17]:
## We donot need the timestamp column.. let's drop it
ratings_df = ratings_df.drop( 'timestamp')

In [18]:
ratings_df.show()

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|     1|   1193|     5|
|     1|    661|     3|
|     1|    914|     3|
|     1|   3408|     4|
|     1|   2355|     5|
|     1|   1197|     3|
|     1|   1287|     5|
|     1|   2804|     5|
|     1|    594|     4|
|     1|    919|     4|
|     1|    595|     5|
|     1|    938|     4|
|     1|   2398|     4|
|     1|   2918|     4|
|     1|   1035|     5|
|     1|   2791|     4|
|     1|   2687|     3|
|     1|   2018|     4|
|     1|   3105|     5|
|     1|   2797|     4|
+------+-------+------+
only showing top 20 rows



### How many unique movies and users in the dataset

In [19]:
ratings_df.select("userid").distinct().count()

6040

In [20]:
ratings_df.select("movieid").distinct().count()

3706

### Applying operations like groupby() and sort()

In [21]:
movie_counts = ratings_df.groupBy("movieid").count()

In [None]:
from pyspark.sql.functions import *
movie_counts = movie_counts.sort(desc("count"))

In [None]:
movie_counts.show( 10 )

### Applying an aggregation function to the group by

In [None]:
avg_ratings = ratings_df.groupBy("movieid").agg( {"rating":"avg"} )

In [None]:
avg_ratings.printSchema()

In [None]:
avg_ratings = avg_ratings.sort( desc( "avg(rating)" ) )

In [None]:
avg_ratings.show( 10 )

### Joining multiple dataframes

In [None]:
avg_ratings_count = avg_ratings.join( movie_counts, 
                                     avg_ratings.movieid == movie_counts.movieid , 
                                     'inner' ).drop(movie_counts.movieid)

In [None]:
avg_ratings_count.printSchema()

### Renaming a column in a dataframe

In [None]:
avg_ratings_count = avg_ratings_count.withColumnRenamed( "avg(rating)",
                                                        "mean_rating" )

In [None]:
avg_ratings_count.printSchema()

In [None]:
avg_ratings_count = avg_ratings_count                                      \
                    .withColumn( "mean_rating",
                                round( avg_ratings_count["mean_rating"]
                                      , 2 ) )

In [None]:
avg_ratings_count = avg_ratings_count.sort( desc( "mean_rating" ) )

In [None]:
avg_ratings_count.show( 10 )

### Filtering records in a dataframe based on a criteria

In [None]:
avg_ratings_count = avg_ratings_count.filter( avg_ratings_count["count"] > 20 )

In [None]:
avg_ratings_count = avg_ratings_count.sort( desc( "mean_rating" ) , desc( "count") )

In [None]:
avg_ratings_count.show( 10 )

### Loading movies data 

Movie information is contained in the file movies.csv. Each line of this file after the header row represents one movie, and has the following format:

movieId,title,genres

In [None]:
movies_df = (spark.read
            .option("sep", '\t')
            .option("header", True)
            .option("inferSchema", True) # small dataset auto inference is possible
            .csv('/FileStore/tables/lab/movies.dat'))

In [None]:
movies_df.show( 10 )

### Displaying columns withour truncating values

In [None]:
movies_df.show( 10, False )

In [None]:
movies_df.printSchema()

### Joining Ratings and Movies data to find top 20 best rated movies

In [None]:
top_movies = avg_ratings_count.limit(20)                            \
            .join( movies_df,
                  avg_ratings_count.movieid == movies_df.movieid,
                  "inner" ).drop(movies_df.movieid)

In [None]:
top_movies_20 = top_movies.select( "movieid", "mean_rating", "count", "name" )

In [None]:
display(top_movies_20)

movieid,mean_rating,count,name
50,4.52,1783,"Usual Suspects, The (1995)"
260,4.45,2991,Star Wars: Episode IV - A New Hope (1977)
318,4.55,2227,"Shawshank Redemption, The (1994)"
527,4.51,2304,Schindler's List (1993)
720,4.43,438,Wallace & Gromit: The Best of Aardman Animation (1996)
745,4.52,657,"Close Shave, A (1995)"
750,4.45,1367,Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)
858,4.52,2223,"Godfather, The (1972)"
904,4.48,1050,Rear Window (1954)
922,4.49,470,Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)


### Saving the results into a csv file

In [None]:
top_movies_20.write                                          \
    .option("header", "true")                                \
    .csv("/FileStore/tables/lab/topmovies")

In [None]:
%fs ls /FileStore/tables/lab/topmovies

path,name,size
dbfs:/FileStore/tables/lab/topmovies/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/lab/topmovies/_committed_910014243987915413,_committed_910014243987915413,111
dbfs:/FileStore/tables/lab/topmovies/_started_910014243987915413,_started_910014243987915413,0
dbfs:/FileStore/tables/lab/topmovies/part-00000-tid-910014243987915413-b4ece488-9527-4df6-98a6-f27de48df33b-85-1-c000.csv,part-00000-tid-910014243987915413-b4ece488-9527-4df6-98a6-f27de48df33b-85-1-c000.csv,972


## Exercises for Class

1. Find out 20 least rated movies. But only consider those movies which are rated by at least 50 users.

2. Find out best 10 and worst 10 movies in "animation" category

**Tips for filtering animation movies:**

<code>
df.filter(lower(col("column name")).contains("specify the search string here"))
</code>

## What we learnt

Please make a note of things that you learnt.