# Exploratory Data Analysis: Fun with the __MovieLens__ dataset, using PySpark

### <font color='green'>_This notebook supports Google Colab_  </font>

<font color='green'>Look for the "_Sidebar_: Google Colab" section below to setup and run this Spark notebook on Google Colab.</font>

We are going to grok what I think are the most commonly useful features of the PySpark _DataFrames_ - select, filter, join, groupby, pivot, and windows.  
But doing it that '10 minutes to xx' style seems so booooring.  

What if we could load an actual dataset and ask meaningful questions about it instead?  
_As if we were doing exploratory data analysis?_ I'd say yeah!  
  
We'll use the [MovieLens](https://grouplens.org/datasets/movielens/) dataset for these exercises.  
This dataset is non trivial and should expand to about __1GB__ on you hard-drive.  

Download and unzip [MovieLens 25M Dataset](https://grouplens.org/datasets/movielens/25m/) for this analysis.

Either ensure the data is in ```"./data/ml-25m"``` folder or update the path to the data below.

**Citation**:  
*F. Maxwell Harper and Joseph A. Konstan.* 2015.  
The MovieLens Datasets: History and Context.  
ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. <https://doi.org/10.1145/2827872>  

You got this.  


## Approach

The idea is to tackle simple Spark use-cases first and move on to more complex ones.  

The first exercise is just loading the data, then with each file we get into progressively more and more involved munging.  

We start by simple cleanup and counting related beats on the ```Tags``` data, and then get into more involved ideas on the ```movies``` data. The fun progressively increases.  

We are going to try and avoid the more mathematically involved parts of exploratory data analysis - for e.g. statistical analysis on various features etc. - the core focus in the ability to grok pyspark functions and have fun while doing it.  

By the end you'd not only have an idea of PySpark, but also how we ask questions and analyze a chunk of data.  

_You may also end up with a watch-list to binge on your next weekend._ :)   


## _Sidebar_: Google Colab

You don't need to run this on your local machine.
The notebook is setup to run on Google Colab as well.

For a detailed description of how this is setup, see the [02.000 (optional) Setup_Spark_in_Google_Colab](https://github.com/shauryashaurya/learn-data-munging/blob/main/02.000%20(optional)%20Setup_Spark_in_Google_Colab.ipynb) notebook

Open the notebook in Google Colab using the following button, then uncomment the setup marked # SETUP FOR COLAB  
  
  
<a href="https://colab.research.google.com/github/shauryashaurya/learn-data-munging/blob/main/02.002%20-%20Exploratory%20Data%20Analysis%20-%20Fun%20with%20the%20Movielens%20dataset%20using%20PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

  
_NOTE: keep the # SETUP FOR COLAB step below commented (disabled) when you are running this notebook locally_

In [None]:
# SETUP FOR COLAB: select all the lines below and uncomment (CTRL+/ on windows)

# # grab spark
# # as of Dec 2022, the latest version is 3.2.3, get the link from Apache Spark's website
# ! wget -q https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
# # unzip spark
# !tar xf spark-3.2.3-bin-hadoop3.2.tgz
# # install findspark package
# !pip install -q findspark
# # Let's download and unzip the MovieLens 25M Dataset as well.
# ! mkdir ./data
# ! wget -q https://files.grouplens.org/datasets/movielens/ml-25m.zip
# ! unzip ./ml-25m.zip -d ./data/

# # got to provide JAVA_HOME and SPARK_HOME vairables
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

In [None]:
# Step 1: initialize findspark
import findspark
findspark.init()

In [None]:
# Step 2: import pyspark
import pyspark
from pyspark.sql import SparkSession
pyspark.__version__

In [None]:
# Step 3: Create a spark session

# using local[*] to use as many logical cores as available, use 1 when in doubt
# 'local[1]' indicates spark on 1 core on the local machine or specify the number of cores needed
# use .config("spark.some.config.option", "some-value") for additional configuration

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("Analyzing Movielens Data") \
    .getOrCreate()

# spark

# ...to read and load the data *correctly*

This is typically the first problem you need to work out. You'll see.  
  
If you've downloaded and unzipped the data, you'll see that some of the files are quite large (genome-scores.csv is 400+ Mb, ratings.csv is 600+ Mb).  

So before we start loading the data to explore further, let's go through the [readme](https://files.grouplens.org/datasets/movielens/ml-25m-README.html) file to build a strategy for loading and analyzing data without clogging up the system.  

In real life, either you'll have to load files in small chunks to work out a strategy or you'll have to rely on defined schema for data.  

Here's the list of files (as of Aug 2022) that you get when you unzip the dataset:
1. **movies**.csv - list of movies with at least one rating.  
    Header: ```movieId,title,genres```  
1. **links**.csv - IDs to generate links to the movie listing on imdb.com and themoviedb.org  
    Header: ```movieId,imdbId,tmdbId```  
1. **ratings**.csv - Each line of this file after the header row represents one rating of one movie by one user.  
    Header: ```userId,movieId,rating,timestamp```  
1. **tags**.csv - Each line of this file after the header row represents one tag applied to one movie by one user.  
    Header: ```userId,movieId,tag,timestamp```  
1. Tag Genome: The tag genome contains tag relevance scores for movies. See [this](http://files.grouplens.org/papers/tag_genome.pdf)  
	1. **genome-tags**.csv - A list of tags  
    Header: ```tagId,tag```  
	1. **genome-scores**.csv - Each movie in the genome has a relevance score value for every tag in the genome  
    Header: ```movieId,tagId,relevance```  
1. README.txt - Check out the README.txt for more details about the files.  

## formatting and encoding

From the Readme file, we have the following observations about the data:
1. Each file is a CSV with a single header row
1. Separator char is ```,```
1. Escape char is ```"```
1. Encoding is UTF-8

Let's set these options when reading the CSV files.

In [None]:
from pyspark.sql.types import *
# where possible, let's avoid inferSchema
# 
schema_movies = StructType([
    StructField('movieId', StringType(), False),
    StructField('title', StringType(), False),
    StructField('genres', StringType(), True)    
    ])
# 
schema_links = StructType([
    StructField('movieId', StringType(), False),
    StructField('imdbId', StringType(), True),
    StructField('tmdbId', StringType(), True)
    ])
# 
schema_ratings = StructType([
    StructField('userId', StringType(), False),
    StructField('movieId', StringType(), False),
    StructField('rating', FloatType(), True),
    StructField('timestamp', StringType(), True)
    ])
# 
schema_tags = StructType([
    StructField('userId', StringType(), False),
    StructField('movieId', StringType(), False),
    StructField('tag', StringType(), True),
    StructField('timestamp', StringType(), True)
    ])
# 
schema_genome_tags = StructType([
    StructField('tagId', StringType(), False),
    StructField('tag', StringType(), False)
    ])
# 
# using arbitrary precision signed decimals (java.math.BigDecimal) for relevance scores
schema_genome_scores = StructType([
    StructField('movieId', StringType(), False),
    StructField('tagId', StringType(), False),
    StructField('relevance', DecimalType(), False)
    ])

In [None]:
datalocation = "./data/ml-25m/"
file_path_movies = datalocation + 'movies.csv'
file_path_links = datalocation + 'links.csv'
file_path_ratings = datalocation + 'ratings.csv'
file_path_tags = datalocation + 'tags.csv'
file_path_genome_tags = datalocation + 'genome-tags.csv'
file_path_genome_scores = datalocation + 'genome-scores.csv'

## Loading the data

Let's load each file in turn and observe, just to get a sense of familiarity with the data.  

In [None]:
movies_raw = spark.read.format('csv') \
    .option('encoding', 'UTF-8') \
    .option('header', True) \
    .option('sep', ',') \
    .option('escape','\"') \
    .schema(schema_movies) \
    .load(file_path_movies)

#### A note on comparing the *method-chaining* syntax between pandas and pyspark:  
Pandas supports that nice "method chaining" syntax where you can club everything in parens  
and write one operation per line  
to do that in spark,  
we use the multi-line format - end each line with a space-backslash  
and python will continue to add the next line to your single link of code  
good thing about the pandas syntax is   
you can comment a line and the next one is picked up just fine  
also you can pipe() things to another variable for debugging or capturing state  
we need to explore how to do that in pyspark  
commenting in the middle definetely breaks in pyspark.  

In [None]:
movies_raw.show(10,False)

In [None]:
links_raw = spark.read.format('csv') \
    .option('encoding', 'UTF-8') \
    .option('header', True) \
    .option('sep', ',') \
    .option('escape', '\"') \
    .schema(schema_links) \
    .load(file_path_links)

In [None]:
links_raw.show(10,False)

In [None]:
ratings_raw = spark.read.format('csv') \
    .option('encoding', 'UTF-8') \
    .option('header', True) \
    .option('sep', ',') \
    .option('escape','\"') \
    .schema(schema_ratings) \
    .load(file_path_ratings)

In [None]:
ratings_raw.show(10, False)

In [None]:
tags_raw = spark.read.format('csv') \
    .option('encoding', 'UTF-8') \
    .option('header', True) \
    .option('sep', ',') \
    .option('escape','\"') \
    .schema(schema_tags) \
    .load(file_path_tags)

In [None]:
tags_raw.show(10, False)

In most cases, prefer loading files in a just-in-time manner to conserve memory and computing resources.  

IRL you'd load a file only when needed - big data means big memory, big processing, big everything but it doesn't mean big bull in a china shop. Brute force is rarely going to be the answer - you've got to learn to be lean in your approach. 

We will next start analysing the data through a series of exercises.

# Problem Set 1  - ```tags.csv```

1. Some tags may have wrapping quotation marks or similar special characters, also leading/trailing spaces etc. Cleanup tags
    * for example, the tag ```"A Christmas Carol"``` becomes ```A Christmas Carol``` etc.

1. List all unique tags found in ```tags.csv```  
    * sort all tags lexically 
    * Also print the execution plan  
    * *[think]* If there are multiple ways of doing this, compare the execution plans  
  
1. Which movies have the most number of tags? 
    * List movieIds in order of # of tags associated  
    * Can we find out which movies have most # of *distinct* tags?
    * Does the ```timestamp``` column have any impact on your report?

1. Which users have added the most number of tags?
    * List userIds in order of # of tags created  
    * Can we find out which users have created most # of *distinct* tags?
    * Does the ```timestamp``` column have any impact on your report?

1. Which users have tagged the most number of movies?
    * Like before, can we find out which users have tagged the most # of *distinct* movies? Does this question make sense in the context of users?
    * Does the ```timestamp``` column have any impact on your report?

1. *[optional, skip on first attept]* We want to find out if there were days of higher activity during the tagging exercise or if the tagging output was more-or-less consistent. 
    * Convert time-stamps to Day-Month-Year. 
    * Find the date range (min-date, max-date) during which the tagging activity took place.
    * Plot number of movies tagged per day during the date range

1. *[optional, skip on first attept]* We want to find out how many users were active every day of the tagging activity. 
    * Plot number of users who tagged at least one movie during the tagging activity date range

## Solutions to Problem Set 1

### Cleanup  
Some tags may have wrapping quotation marks or similar special characters, also leading/trailing spaces etc. Cleanup tags  

* for example, the tag "A Christmas Carol" becomes A Christmas Carol etc.

In [None]:
# Remove wrapping quotation marks from tags

# if there's cleanup needed, 
# we do that first before finding out unique records, 
# chances are uniqueness will be affected if we don't follow this order.

from pyspark.sql.functions import col, lower, regexp_replace, trim

In [None]:
# define a regex pattern to use. 
# ^ indicates start of string, $ indicates end of string
# double quote or single quote or ampersand or bang just after start of string
pattern_start = '^\"|^\'|^&|^#'
# double quote or single quote or ampersand or bang just before end of string
pattern_end =  '\"$|\'$|&$|#$'
pattern = pattern_start+'|'+pattern_end

In [None]:
# create a new tags dataframe with cleaned up data 
tags = tags_raw.withColumn('tag', regexp_replace(trim(col('tag')), pattern, ''))

In [None]:
# quotes stripped and other chars too...
tags_raw.filter(col('tag').contains('Christmas Carol')).show()
tags.filter(col('tag').contains('Christmas Carol')).show()

### List all unique tags found in ```tags.csv```  
    * sort all tags lexically 
    * Also print the execution plan  
    * *[think]* If there are multiple ways of doing this, compare the execution plans  

In [None]:
# unique tags found in tags.csv - prep
# careful - comparision should be case-insensitive 
# because 'Christmas Special' and 'Christmas special' are really the same thing
# create a new column in tags
tags = tags.withColumn('lower tag', lower(col('tag')))
# reorder columns for easy readability
tags = tags.select('userId', 'movieId', 'tag', 'lower tag', 'timestamp')
tags.show(5,False)

In [None]:
# unique tags found in tags.csv - method 1, using distinct()
distinct_tags1 = tags.select('lower tag').distinct()
# 
# let's do the explaining later, so it's easy to compare methods
# distinct_tags.explain(True)
# show 5 rows, do not truncate
distinct_tags1.show(5, False)

In [None]:
# unique tags found in tags.csv - method 2, using groupBy()
# an aggregator like count() results in a dataframe
# distinct_tags2 = tags.select('lower tag').groupBy('lower tag').count()
distinct_tags2 = tags.select('lower tag').groupBy('lower tag').count()
# let's do the explaining later, so it's easy to compare methods
# distinct_tags2.explain(True)
# show 5 rows, no truncate
distinct_tags2.show(5,False)

Execution plan explanations to compare distinct() and groupBy()...  
(uncomment the lines below)

In [None]:
# distinct_tags1.explain(True)

In [None]:
# distinct_tags2.explain(True)

Compare the costs in the two execution plans

In [None]:
# distinct_tags1.explain(mode='cost')

In [None]:
# distinct_tags2.explain(mode='cost')

The two plans look identical:

**distinct()**  
  
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [lower tag#130 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(lower tag#130 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#1415]
      +- HashAggregate(keys=[lower tag#130], functions=[], output=[lower tag#130])
         +- Exchange hashpartitioning(lower tag#130, 200), ENSURE_REQUIREMENTS, [id=#1412]
            +- HashAggregate(keys=[lower tag#130], functions=[], output=[lower tag#130])
               +- Project [lower(regexp_replace(trim(tag#55, None), ^"|^'|^&|^#|"$|'$|&$|#$, , 1)) AS lower tag#130]
                  +- FileScan csv [tag#55] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/.../tags.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<tag:string>
```

**groupBy()**  
  
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [lower tag#130 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(lower tag#130 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#1437]
      +- HashAggregate(keys=[lower tag#130], functions=[count(1)], output=[lower tag#130, count#176L])
         +- Exchange hashpartitioning(lower tag#130, 200), ENSURE_REQUIREMENTS, [id=#1434]
            +- HashAggregate(keys=[lower tag#130], functions=[partial_count(1)], output=[lower tag#130, count#186L])
               +- Project [lower(regexp_replace(trim(tag#55, None), ^"|^'|^&|^#|"$|'$|&$|#$, , 1)) AS lower tag#130]
                  +- FileScan csv [tag#55] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/.../tags.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<tag:string>
```


Spark optimizes both to identical plans / Inner workings for both seem identical in the optimized execution plan.  

So we tend to prefer the one that is easier to read when debugging the code.  

There may be a need for both distinct() and groupBy() in the same piece of code - for readability. We'll see that a few sections later.  

*In some cases, I saw (and I am probably wrong here) that the groupBy() method seems marginally more expensive in terms of memory as compared to distinct(). Log a comment/ticket/PR if you have evidence, insights, ideas to the contrary/in-support.*

#### Sort lexically

In [None]:
# sort unique tags lexically
# ignore case when sorting values 
distinct_tags1 = distinct_tags1.orderBy(col('lower tag').asc())
distinct_tags2 = distinct_tags2.orderBy(col('lower tag').asc())

In [None]:
distinct_tags1.filter(col('lower tag').contains('christmas')).show(50, False)

In [None]:
distinct_tags2.filter(col('lower tag').contains('christmas')).show(50, False)

### Which movies have the most number of tags?  
* List movieIds in order of # of tags associated  
* Can we find out which movies have most # of *distinct* tags?  
* *[optional]* Does the ```timestamp``` column have any impact on your report?  

In [None]:
# movie IDs with most number of tags
movies_by_tag_count = tags.groupBy('movieId').count()
movies_by_tag_count.orderBy(col('count').desc()).show()

These movies have thousands of tags??? that makes no sense... let's see what some of those are:

In [None]:
tags.filter(col('movieId') == 260) \
    .groupBy(lower(col('tag')).alias('lowercase tag')) \
    .count() \
    .orderBy(col('count').desc()) \
    .show(25, False)

Any guesses what this movie could be?

In [None]:
movies_raw.filter(col('movieId') == 260).show(1, False)

No surprises there!

### Which users have added the most number of tags?

* List ```userIds``` in order of # of tags created
* Can we find out which users have created most # of distinct tags?

In [None]:
# users who have created most tags
users_by_count_of_tag_entries = tags.groupBy('userId').count()
users_by_count_of_tag_entries.orderBy(col('count').desc()).show()

Can we find out which users have created most # of distinct tags?

In [None]:
# isolate userId and tags and identify distinct pairs
distinct_tags_by_user = tags.select('userId', 'tag').distinct()
# count distinct tags by user
count_distinct_tags_by_user = distinct_tags_by_user.groupBy('userId').count()
# list count of distinct tags by user
count_distinct_tags_by_user.orderBy(col('count').desc()).show()

*Holy Sith!* 

*183356* tags, *20765* unique tags - UserId ```6550``` is either a beast or probably a bot!  

In most cases we may want to treat this one as an outlier, further analysis may be needed before taking that decision, but it's good to know 6550 exists.  

In [None]:
distinct_tags_by_user.filter(col('userId') =='6550').orderBy(col('tag')).show(50, False)

Yeah! I'd say 6550 is some automated tagging mechanism. 

### Which users have tagged the most number of movies?
* Like before, can we find out which users have tagged the most # of *distinct* movies? Does this question make sense in the context of users?
* *[optional]* Does the ```timestamp``` column have any impact on your report?

In [None]:
# which users have tagged the most number of movies?

# approach:
# create a df that has only users and movies
# this df may have multiple rows for 
# same userId and movieId as each user has given many tags to each movie 
# so only pick distinct records.
# then use groupBy and count as ususal

users_and_movies_in_tags = tags.select('userId', 'movieId').distinct()

users_and_movies_in_tags_count = users_and_movies_in_tags.groupBy('userId') \
    .count() \
    .orderBy(col('count').desc())

users_and_movies_in_tags_count.show(10)

In [None]:
# check: 
# filter for any one userId, check that all movies occur only once
users_and_movies_in_tags.filter(col('userId') == 6550) \
    .groupBy('movieId').count() \
    .orderBy(col('count').desc()) \
    .show(10)

### Tagging activity related analysis

* Convert time-stamps to Day-Month-Year.
* Find the date range (min-date, max-date) during which the tagging activity took place.
* Plot number of movies tagged per day during the date range

In [None]:
from pyspark.sql.functions import from_unixtime, to_date

# convert unix epoch timestamp to date-time style timestamp
tags = tags.withColumn('datetime', from_unixtime(col('timestamp')))
# extract date from date-time timestamp
tags = tags.withColumn('tagging date', to_date(col('datetime')))
tags.show(10, False)

In [None]:
tagging_activity_by_date = tags.groupBy(col('tagging date')).count()
# 
tagging_activity_by_date.orderBy(col('count').desc()).show(5, False)

#### date range during which the tagging activity took place

* (min-date, max-date) 

In [None]:
from pyspark.sql.functions import min, max, avg
# 
tagging_min_date = tagging_activity_by_date.select(min(col('tagging date')))
tagging_max_date = tagging_activity_by_date.select(max(col('tagging date')))
# 
tagging_min_date.show()
tagging_max_date.show()
# tagging_activity_by_date.select(min(col('count'))).show()
# tagging_activity_by_date.select(max(col('count'))).show()
# tagging_activity_by_date.select(avg(col('count'))).show()
tagging_activity_by_date.summary().show()

In [None]:
# setup matplotlib before starting plotting
import matplotlib.pyplot as plt
# jupyter mpl magic
%matplotlib inline

# note: in case you want to use the widget or notebook magic:
# ensure that ipympl is also installed.
# there may be other steps involved as well 
# for e.g. widget works based on nodeJS, so you'll have to config/enable that too

In [None]:
# determine the X and Y lists for plotting.
# converting a spark dataframe column to list:
tagging_activity_x = tagging_activity_by_date \
                        .select(col('tagging date')) \
                        .rdd \
                        .flatMap(lambda x: x) \
                        .collect()

tagging_activity_y = tagging_activity_by_date \
                        .select(col('count')) \
                        .rdd \
                        .flatMap(lambda x: x) \
                        .collect()

In [None]:
plt.figure(figsize=(18,6))
plt.bar(tagging_activity_x,tagging_activity_y)
# Don't need to skew the graph with some really high counts
# limit to 1500
ax = plt.gca()
ax.set_ylim([0, 1500])
plt.title('count of tags by date')
plt.xlabel('date')
plt.ylabel('count')
plt.show()

Seems like the activity peaked extraordinarily around May/June 2018

### Daily active users during the tagging activity.

* Plot number of users who tagged at least one movie during the tagging activity date range

In [None]:
distinct_users_doing_tagging_by_date = tags.select('userId', 'tagging date') \
    .distinct() \
    .groupBy('tagging date') \
    .count()
# 
distinct_users_doing_tagging_by_date.orderBy(col('count').desc()).show(5)
distinct_users_doing_tagging_by_date.summary().show()

In [None]:
# like before, we need to determine X and Y lists.

distinct_users_doing_tagging_by_date_x = distinct_users_doing_tagging_by_date \
                                        .select(col('tagging date')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

distinct_users_doing_tagging_by_date_y = distinct_users_doing_tagging_by_date \
                                        .select(col('count')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

In [None]:
plt.figure(figsize=(18,5))
plt.bar(distinct_users_doing_tagging_by_date_x,distinct_users_doing_tagging_by_date_y)
# limit the y axis, some observations may go beyond the ceiling
ax = plt.gca()
ax.set_ylim([0, 50])
plt.title('Active users by date')
plt.xlabel('date')
plt.ylabel('count')
plt.show()

Seems like while there were peaks and troughs, but visually, we don't see too much variation in the daily active users

## Insights from Problem Set 1

What have we learned from working on Problem Set 1?

1. Try to get a specific schema
1. Clean up data before analysis or aggregation
1. Clearly document weather you need case sensitive or case insensitive comparisions
1. Use ```df.withColumn``` to create new columns
1. ```col().contains()``` is like the ```LIKE``` clause in SQL, partial matches etc.
1. Spark Columns: Only use literal name where it's trivial, otherwise prefer ```col('name')```
1. ```distinct()``` vs ```groupBy()```
1. To convert a column in a spark data frame to a list (for plotting data or other uses) you isolate the column, convert to rdd, flatmap, collect.
  
1. and another thing... use ```spark.catalog.clearCache()``` to clear the cache

In [None]:
# clear cache
spark.catalog.clearCache()

# Problem Set 2  - ```movies.csv```

1. How many unique movies exist in the ```movies.csv``` dataset?  

1. Prepare a yearwise list of movies - Extract the year of release in movies.csv into a new column year_of_release   

1. List all unique genres found in ```movies.csv```, ordered lexically, case insensitive  

1. Prepare a genere wise list of movies - list all the movies for 'Crime', for 'Romance', and so on...  

1. Find number of films associated with each genre - add a new column to the movies dataframe: ```absolute_frequency_of_genre```  

1. Add another column ```num_genres``` and list total number of genres associated with each film  

1. Find out if a movie has both genres associated with it and also has ```(no genres listed)``` - if this is the case, find out how many such movies exist in the data set


## Solutions to Problem Set 2

### How many unique movies exist in the movies.csv dataset?

In [None]:
# this is what the movies_raw data looks like:
movies_raw.show(5)

Let's think about uniqueness 
* Do we look at ```movieId``` for uniqueness or ```title```?
* ```movieId``` sound like the right choice, as it'll keep the data consistent with tags, tag-relevance and ratings etc.
* *What could break here?* Let's try to articulate our assumptions and test them
* Can we assume that *primary keys mean unique movies*?
* Let's test that assumption, if it breaks, we'll know that our solutions must account for multiple ```movieId``` pointing to the same ```title```
* Following from above, can we assume that if the value in ```title``` matches for multiple ```movieId``` values then it's the same movie?
* Again, let's test this by building and reviewing the IMDB links

In [None]:
movies_with_matching_titles = movies_raw.groupby(col('title')).count().filter(col('count') > 1)
movies_with_matching_titles.orderBy(col('count').desc()).show(12)
movies_with_matching_titles.count()

In [None]:
movies_raw.filter(col('title') == 'Casanova (2005)').show()

In [None]:
# links look different
links_raw.filter(col('movieId') == '42015').show()
links_raw.filter(col('movieId') == '128862').show()

In [None]:
# Right join to work out movieIds for all the movies with matching titles
movies_with_multiple_title_matches = movies_raw.join(movies_with_matching_titles, 'title', 'right')

In [None]:
# compare the imdbId values for all movies with matching 'title' values in movies_raw
links_for_movies_with_multiple_title_matches = \
    movies_with_multiple_title_matches.join(links_raw, 'movieId', 'left')

In [None]:
links_for_movies_with_multiple_title_matches.show(10)

In [None]:
# counts are all 1 - therefore all movies with matching titles still are unique movies
links_for_movies_with_multiple_title_matches.groupBy(col('imdbId')).count().orderBy(col('count').desc()).show(5)

# conclusion: all movies in movies.csv are unique.

In [None]:
# number of unique movies in movies.csv
movies_raw.distinct().count()

The [site](https://grouplens.org/datasets/movielens/25m/) also reads:  

"MovieLens 25M movie ratings. Stable benchmark dataset. 25 million ratings and one million tag applications applied to 62,000 movies by 162,000 users. Includes tag genome data with 15 million relevance scores across 1,129 tags. Released 12/2019"

So we seem to be in the right ballpark here.  
Let's continue with our analysis.

### Extract the year of release in movies.csv into a new column year_of_release

In [None]:
# let's build a regex pattern to isolate the year of release
# this should be the last set of parenthesis in the title
# the regex needs a backslash to escape the parenthesis symbols, as these are used to identify groups
open_paren = '\\('
close_paren = '\\)'
# this should have only 4 digits between the parenthesis
year_pattern = '([0-9]{4})'
# the title should end with the closing parenthesis
close_paren = close_paren+'$'
# ignore whitespace
# keep unicode in mind
# global
# options_pattern = '/gxu'
# final pattern: the options_pattern thing is not supported in python  
year_of_release_regex = open_paren+year_pattern+close_paren#+options_pattern
print(year_of_release_regex)

In [None]:
from pyspark.sql.functions import regexp_extract

In [None]:
# our regex has 2 groups, 
# 0 gives the first group match i.e. (1995)
# 1 gives the next group match i.e. 1995
movies_raw.withColumn(
    'year_of_release_0', regexp_extract(col('title'), year_of_release_regex, 0)).withColumn(
    'year_of_release_1', regexp_extract(col('title'), year_of_release_regex, 1)).show(10)

In [None]:
# so we'll use group 1 to create our dataframe
movies = movies_raw.withColumn(
    'year_of_release', regexp_extract(col('title'), year_of_release_regex, 1))

In [None]:
movies.show(10)

### List all unique genres found in movies.csv, ordered lexically, case insensitive

In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

In [None]:
# unique genres found in movies.csv
movie_genres = movies_raw.select(
    explode( # convert each element in an array to a new row
        split( # split the data on pipe and create an arry
            movies_raw.genres, "\|"
        )
    ).alias('genre')
)

In [None]:
unique_movie_genres = movie_genres.distinct()
unique_movie_genres.show()

### Find number of films associated with each genre - absolute_frequency_of_genre  

In [None]:
movie_genres_with_abs_freq = movie_genres.groupBy('genre').count()
movie_genres_with_abs_freq = movie_genres_with_abs_freq.orderBy(col('count').desc()) \
                            .withColumnRenamed('count', 'absolute_frequency_of_genre')

movie_genres_with_abs_freq.show()
# TODO: rename the column 'count' to 'freq'

In [None]:
# convert each column to a list
movie_genres_with_abs_freq_x = movie_genres_with_abs_freq \
                                        .select(col('genre')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()
movie_genres_with_abs_freq_y = movie_genres_with_abs_freq \
                                        .select(col('absolute_frequency_of_genre')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

plt.figure(figsize=(20,5))
plt.bar(movie_genres_with_abs_freq_x,movie_genres_with_abs_freq_y)
plt.title('Absolute Frequencies of Genres')
plt.xlabel('Genre')
plt.ylabel('Number Of Movies')
plt.show()

### List total number of genres associated with each film

* Add another column num_genres and list total number of genres associated with each film

In [None]:
# like before we'll split the genres column on | and count the elements
# 
from pyspark.sql.functions import size

movies = movies.withColumn('num_genres',
                           size(split(col('genres'), '\|')))

In [None]:
movies.show(10, False)

### Prepare a genere wise list of movies

* list all the movies for 'Crime', for 'Romance', and so on...

A naive appoach would be to filter movies for each genre that we know. This is doable as the number of genres we found is small. But it's not flexible and will need revision as new genres come to light. In practice, we may need something that's more dynamic.

In [None]:
romance = movies.where(col('genres').like('%Romance%'))
romance.show()

In [None]:
westerns = movies.where(col('genres').like('%Western%'))
westerns.show()

In [None]:
no_genres_listed = movies.where(col('genres').like('%(no genres listed)%'))
no_genres_listed.show()
no_genres_listed.count()

Yeah...this is not a good approach.  

We want to be able to address as many generes as found in data, even though the list is available, it makes more sense to generate a list for all distinct values in the data

Can we create a column where there's only one 'genre' and the movie is duplicated as many times as the genre?

Spark Joins to the rescue.  

We'll create a ```full outer``` join (aka 'full' or 'outer') - resulting in each row in movies to be repeated once for each genere associated with it. If "127 Hours" has 3 genres "Adventure|Drama|Thriller" then it will now be in 3 rows, one each for "adventure", "drama" and "thriller".

In [None]:
movies_expanded_by_genre = movies.join(unique_movie_genres, col('genres').contains(col('genre')) , 'fullouter')
# movie_genres_with_abs_freq
movies_expanded_by_genre.orderBy(col('title')).show(25, True)

Now we can simply filter by genre from the same dataframe.

In [None]:
romance1 = movies_expanded_by_genre.filter(col('genre') == 'Romance')
romance1.show()

So ```movies_expanded_by_genre``` is it.

### Find out if a movie has both genres associated with it and also has ```(no genres listed)```  

* if this is the case, find out how many such movies exist in the data set

In [None]:
# filter out the movies with (no genres listed)
# where and filter, like and contains - use what is most readable
# also remove the %xxx% for contains...

# 1
# movies.where(col('genres').contains('no genres')).show()

# 2
# movies.where(col('genres').like('%no genres%')).show()

# 3
# movies.filter(col('genres').like('%no genres%')).show()

# 4
movies.filter(col('genres').contains('no genres')).show(10)

In [None]:
# number of records where num_genres is more than 1 for (no genres listed)
movies.filter(col('genres') \
            .contains('no genres')) \
            .filter(col('num_genres')>1) \
            .count()

In [None]:
# alternatively list num_genres in descending order
movies.filter(col('genres') \
            .contains('no genres')) \
            .orderBy(col('num_genres').desc()) \
            .show(10)

So there's no movies where ```(no genres listed)``` is mixed with other genres.

## Insights from Problem Set 2

What have we learned from working on Problem Set 2?

1. Regex based matching, pay attention to the regex group you want to extract
1. Explode (convert array in the column value into rows) and Split (convert string into array based on a token)
1. size() is the pySpark equivalent of len(array)
	* There's like 4 of these that you should [familiarize yourself](https://sparkbyexamples.com/pyspark/pyspark-explode-array-and-map-columns-to-rows/) with: ```explode()```, ```explore_outer()```, ```posexplode()```, ```posexplode_outer()```
1. That DataFrame>Select>RDD>FlatMap>Collect thing for creating lists out of dataframe columns for plotting etc. - it's just tedious, so try to remember it
1. [joi](https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/).[ns](https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-join-two-dataframes.py), [joins](https://www.geeksforgeeks.org/pyspark-join-types-join-two-dataframes/), [joins](https://dzone.com/articles/pyspark-join-explained-with-examples), [can't get](https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-ce289bfc84c9) [enough](https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-2-5b038bc7455b) of 'em. 
1. Also, ```crossJoin``` when you need a cartesian product (cross product) and have no conditions. 
    * Spark >= 3.0. ```spark.sql.crossJoin.enable``` is ```True``` by default, if not, you'll need to ```SparkSession.config("spark.sql.crossJoin.enable",True)```
1. ```where``` and ```filter```, ```like``` and ```contains```

In [None]:
# clear the cache again...
spark.catalog.clearCache()

# Problem Set 3  - ```ratings.csv```

1. Find number of films for each rating, so number of films that have at least one rating of 1, number of films that have at least one rating of 2 and so on...  

1. List user-IDs in order of number of films they have rated, descending.  

1. Are there users who have given multiple ratings to the same film?  

## Solutions to Problem Set 3

### Find number of films for each rating

* so number of films that have at least one rating of 1, number of films that have at least one rating of 2 and so on...

In [None]:
# to refresh, here's what ratings data looks like
ratings_raw.show(10)

In [None]:
absolute_freq_ratings = ratings_raw.groupBy('rating').count()

In [None]:
absolute_freq_ratings.orderBy(col('count').desc()).show()

In [None]:
# convert each column to a list
absolute_freq_ratings_x = absolute_freq_ratings \
                                        .select(col('rating')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()
absolute_freq_ratings_y = absolute_freq_ratings \
                                        .select(col('count')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

plt.figure(figsize=(18,5))
plt.bar(absolute_freq_ratings_x,absolute_freq_ratings_y)
plt.title('Absolute Frequencies of Ratings')
plt.xlabel('Rating')
plt.ylabel('Number Of Movies')
plt.show()

### List user-IDs in order of number of films they have rated, descending

In [None]:
rating_freq_by_user = ratings_raw.groupBy('userId').count()

In [None]:
rating_freq_by_user.orderBy(col('count').desc()).show()

In [None]:
rating_freq_by_user.count()

That's a lot of ratings by a lot of users...  

Some of these like ```72315``` def seem like a bot - or a human whose spent a lot of time regularly watching films - if we estimate a film to be 90 minutes on an average, it comes to about 48303 hours - that's like 5.5 years of 24/7 movie-watching! In practice this would've taken the person 8-10 times longer (considering 3 hours of movies daily - no holidays) - so 44 to 55 years of movies... - yeah, I'll bet this was some automated thing



### Are there users who have given multiple ratings to the same film?

In [None]:
usr_movie_count = ratings_raw.groupBy('userId', 'movieId').count()

In [None]:
usr_movie_count.orderBy(col('count').asc()).show(10)

Doesn't seem like users have rated the same movie multiple times.
*[think]* is there another way to confirm this?

## Insights from Problem Set 3

We are practicing some of the same stuff, however ```ratings``` is a substantially larger dataset - we need to be more careful with joins etc.

In [None]:
# like always, clear the cache
spark.catalog.clearCache()

# Problem Set 4  - mixing things up, ```movies + ratings```

1. Prepare an ```avg_rating``` column for the movies dataframe where you add the average rating that movie has recieved rounded to 4 decimal places
    * If the movie has recieved no rating, add 0

1. Prepare a ```rating_freq``` column for the movies dataframe where you add the number of times the movie recieved a rating
    * If the movie did not recieve any rating, put in 0
    
1. Prepare a ```rating_freq_user``` column for the movied dataframe where you add the number of unique users who rated the film
    * If no one rated the film, put in 0
    
1. How many films where the ```rating_freq``` differs from ```rating_freq_user```? Does this match the analysis we did for ```ratings.csv```?

1. Which genres have recieved the highest number of ratings?

1. What are the 20 highest rated movies in each genre?
    * *[think]* one way to find our 'highest rated' would be by summing all the occurances of 4 and 5 stars for each movie in each genre and sorting from highest to lowest

1. Prepare a list of highly rated movies, present this list by year of release and sorted in alphabetical order by movie title.  
    * "Highly Rated" = movies with atleast 3 instances where users have rated the film a 4 or a 5
    * Expected Columns in the output: ```year of release, movie title, # of 4s, # of 5s```  
    
1. Another approach to 'highly rated', prepare a list of 'highly rated' movies
    * "Highly Rated" = sum of 4 and 5 ratings is the highest across all years
    * Sort this list by year of release

1. *[think]* Can we find "Late Bloomers" or "Cult Films"? 
    * Films that were not highly rated during the year of release or were not well rated initially,but their ratings improve over time. 
    * How can we rank these in descending order of "Cult Status"?


## Solutions to Problem Set 4

### Average rating that movie has recieved rounded to 4 decimal places

* Prepare an ```avg_rating``` column for the movies dataframe where you add the average rating that movie has recieved rounded to 2 places of decimal
    * If the movie has recieved no rating, add 0

In [None]:
movies.show(2)
ratings_raw.show(2)

In [None]:
from pyspark.sql.functions import round

avg_ratings_grpby = ratings_raw.groupBy(col('movieId'))
avg_ratings = avg_ratings_grpby.avg()
avg_ratings = avg_ratings.select(col('movieId'), round(col('avg(rating)'),4))
avg_ratings = avg_ratings.withColumnRenamed("round(avg(rating), 4)", "avg_rating")
avg_ratings = avg_ratings.withColumnRenamed("movieId", "movieId_with_rating")

In [None]:
avg_ratings.show(5)

In [None]:
avg_ratings.describe().show()

In [None]:
movies_with_avg_rating = movies.join \
    (avg_ratings, movies.movieId == avg_ratings.movieId_with_rating, 'inner') \
    .drop(col('movieId_with_rating'))

In [None]:
movies_with_avg_rating.orderBy(col('avg_rating').asc()).show()

In [None]:
movies_with_avg_rating.count()

In [None]:
movies.count()

hold on, why the count is different...?  
Are there movies that have never been rated? Which ones are these?

In [None]:
# leftanti join, aka anti join, all the records in left that don't match right

# joining with movies_with_avg_rating.movieId_with_rating results in ambiguous join 
# (seems to stem from the fact that movies_with_avg_rating has 2 movieId columns, still, why?)
# movies_with_no_ratings = movies \
#         .join(movies_with_avg_rating, 
#               movies.movieId == movies_with_avg_rating.movieId_with_rating, 
#               'leftanti')

# this works
movies_with_no_ratings = movies \
        .join(movies_with_avg_rating, 
              movies.movieId == movies_with_avg_rating.movieId, 
              'leftanti')

# this also works
# join with avg_ratings instead
# movies_with_no_ratings = movies \
#         .join(avg_ratings,
#               movies.movieId == avg_ratings.movieId_with_rating, 'leftanti')

In [None]:
movies_with_no_ratings.show(5, truncate = False)

In [None]:
movies_with_no_ratings.count()

In [None]:
count_all_movies = movies.count()
count_movies_with_avg_rating = movies_with_avg_rating.count()
count_movies_with_no_ratings = movies_with_no_ratings.count()
# test if numbers add up...
do_numbers_add_up = ((count_all_movies - count_movies_with_avg_rating) == count_movies_with_no_ratings)

In [None]:
print('all movies in our dataset = ', count_all_movies)
print('movies that have average ratings = ', count_movies_with_avg_rating)
print('movies with no ratings = ', count_movies_with_no_ratings)
print('do the numbers match? (rated + not rated = total)? ', do_numbers_add_up)

Yeah, seems like there's movies in the dataset that have never been rated. Interesting!

There may be a question around verifying the calculation above vs just matching ```movies``` dataset with ```raw_ratings```... so let's try that real quick too...

In [None]:
movies_not_rated = movies \
        .join(ratings_raw, movies.movieId == ratings_raw.movieId, 'leftanti')

In [None]:
movies_not_rated.show(5, truncate = False)

In [None]:
count_movies_not_rated = movies_not_rated.count()
# test if numbers add up...again
do_numbers_add_up2 = ((count_all_movies - count_movies_with_avg_rating) == count_movies_not_rated)

In [None]:
print('all movies in our dataset = ', count_all_movies)
print('movies that have average ratings = ', count_movies_with_avg_rating)
print('movies with no ratings... part deux = ', count_movies_not_rated)
print('do the numbers match? (rated + not rated = total)? ', do_numbers_add_up2)

So analysis paralysis but we are on the right track - there are about 3376 movies that do not have ratings...

#### Standardized values in a column

In [None]:
# Just for fun (and future needs)
# let's find out mean number of genres
# and mean avg_rating for movies
from pyspark.sql.functions import mean

# quick look at what our dataframe looks like
movies_with_avg_rating.show(5)

In [None]:
movies_with_avg_rating_means = \
    movies_with_avg_rating.agg({'num_genres': 'mean', 'avg_rating': 'mean'}).collect()

In [None]:
print("avg number of genres for a movie: ",movies_with_avg_rating_means[0][0])
print("mean avg_rating of a movie: ",movies_with_avg_rating_means[0][1])

In [None]:
# Ah! what the hell, let's create standardized values for # of genres and avg_rating anyway
# standardized values are where mean = 0 and standard deviation = 1
# from pyspark.sql.functions import mean
from pyspark.sql.functions import stddev

movies_with_avg_rating_mean_and_stddev = movies_with_avg_rating.select \
                                            (mean('num_genres').alias('mean_num_genres'), \
                                             stddev('num_genres').alias('stddev_num_genres'), \
                                             mean('avg_rating').alias('mean_avg_rating'), \
                                             stddev('avg_rating').alias('stddev_avg_rating'))
# let's first see what these look like
movies_with_avg_rating_mean_and_stddev.show()

In [None]:
# roundabout way of getting to the values
mean_num_genres = movies_with_avg_rating_mean_and_stddev.first()['mean_num_genres']
stddev_num_genres = movies_with_avg_rating_mean_and_stddev.first()['stddev_num_genres']
mean_avg_rating = movies_with_avg_rating_mean_and_stddev.first()['mean_avg_rating']
stddev_avg_rating = movies_with_avg_rating_mean_and_stddev.first()['stddev_avg_rating']

# ideally you'd just do: mean_val1, stddev_val1 = df.select(mean('val1'), stddev('val1')).first()

In [None]:
# standardize num_genres
movies_with_avg_rating = movies_with_avg_rating. \
                        withColumn('std_num_genres', \
                                   (col('num_genres') - mean_num_genres)/stddev_num_genres)
# standardize avg_rating
movies_with_avg_rating = movies_with_avg_rating. \
                        withColumn('std_avg_rating', \
                                   (col('avg_rating') - mean_avg_rating)/stddev_avg_rating)

Let's explore the upper and lower bounds of our standardized columns...

In [None]:
movies_with_avg_rating.sort(col('std_num_genres').desc()).show(7)

In [None]:
movies_with_avg_rating.sort(col('std_num_genres').asc()).show(7)

In [None]:
movies_with_avg_rating.sort(col('std_avg_rating').desc()).show(7)

In [None]:
movies_with_avg_rating.sort(col('std_avg_rating').asc()).show(7)

In [None]:
# check

movies_with_avg_rating.describe(['std_num_genres', 'std_avg_rating']).show(10,False)

In [None]:
# also...
mean_std_avg_rating, stddev_std_avg_rating, mean_std_num_genres, stddev_std_num_genres = movies_with_avg_rating.select( \
        mean('std_avg_rating'), stddev('std_avg_rating'), mean('std_num_genres'), stddev('std_num_genres')).first()

In [None]:
print('mean_std_avg_rating: ', mean_std_avg_rating)
print('stddev_std_avg_rating: ', stddev_std_avg_rating)
print('mean_std_num_genres: ', mean_std_num_genres)
print('stddev_std_num_genres: ', stddev_std_num_genres)
print('\n...or fixing for floating point operations, we get...')
print("mean_std_avg_rating: ", str(__builtins__.round(mean_std_avg_rating, 0)))
print("stddev_std_avg_rating: ", str(__builtins__.round(stddev_std_avg_rating, 0)))
print("mean_std_num_genres: ", str(__builtins__.round(mean_std_num_genres, 0)))
print("stddev_std_num_genres: ", str(__builtins__.round(stddev_std_num_genres, 0)))

#### Sidebar: the ```round``` function - ```pyspark.sql``` vs python built-in  
  
You'll notice the use of ```__builtins__``` above.
That's cause we've imported ```pyspark.sql.functions.round``` already, so ```round``` now points to the pyspark 'round' function.   

If we need to use the python round function, we need to specify how to find it... hence we use ```__builtins__```.

_howlarious!_

#### Some graphs to visualize standardized values vs actual values

Playing with some graphs - just to get a visual sense of standardized num_genres and avg_rating

_These plots are taking a long time to render - so the cells in this section would largely be commented out._  
_Uncomment these to see the plots while you go out to get a coffee._  

In [None]:
# movies_with_avg_rating sorted by standardized num_genres
movies_with_avg_rating_std_num_genres_asc = movies_with_avg_rating.sort(col('std_num_genres').asc())
# 
# convert each column to a list
# X
movies_with_avg_rating_movieId_x = movies_with_avg_rating_std_num_genres_asc \
                                        .select(col('movieId')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

# Y
movies_with_avg_rating_std_num_genres_y = movies_with_avg_rating_std_num_genres_asc \
                                        .select(round(col('std_num_genres'), 0)) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()
# Y - num_genres
movies_with_avg_rating_num_genres_y = movies_with_avg_rating_std_num_genres_asc \
                                        .select(col('num_genres')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

In [None]:
plt.figure(figsize=(9,3))
# standardized values
plt.scatter(movies_with_avg_rating_movieId_x,movies_with_avg_rating_std_num_genres_y, color=['#eed0cacc'], s = 42, alpha = 0.25)
# actual values
plt.scatter(movies_with_avg_rating_movieId_x,movies_with_avg_rating_num_genres_y, color=['#71566433'], s = 10, alpha = 0.8)
plt.title('std_num genres vs movieId')
plt.xlabel('movieId')
plt.ylabel('std_num_genres')
plt.show()

In [None]:
# movies_with_avg_rating sorted by standardized avg_rating
movies_with_avg_rating_std_avg_rating_asc = movies_with_avg_rating.sort(round(col('std_avg_rating'),0).asc())
# 
# convert each column to a list
# movies_with_avg_rating
# X
movies_with_avg_rating_movieId_x = movies_with_avg_rating \
                                        .select(col('movieId')) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()
# Y
movies_with_avg_rating_std_avg_rating_y = movies_with_avg_rating \
                                        .select(round(col('std_avg_rating'),0)) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()
# Y - avg_rating
movies_with_avg_rating_avg_rating_y = movies_with_avg_rating \
                                        .select(round(col('avg_rating'),0)) \
                                        .rdd \
                                        .flatMap(lambda x: x) \
                                        .collect()

In [None]:
plt.figure(figsize=(9,3))
# standardized values
plt.bar(movies_with_avg_rating_movieId_x,movies_with_avg_rating_std_avg_rating_y, color=['#2954368C'])
# actual values
plt.bar(movies_with_avg_rating_movieId_x,movies_with_avg_rating_avg_rating_y, color=['#F6E28C48'])
plt.title('std_avg_rating vs movieId')
plt.xlabel('movieId')
plt.ylabel('std_avg_rating')
plt.show()

In [None]:
plt.figure(figsize=(9,3))
# standardized values
plt.scatter(movies_with_avg_rating_std_avg_rating_y,movies_with_avg_rating_std_num_genres_y, s = 42, alpha = 0.25)
# actual values
plt.scatter(movies_with_avg_rating_avg_rating_y,movies_with_avg_rating_num_genres_y, s = 10, alpha = 0.8)
plt.title('std_num_genres vs std_avg_rating')
plt.xlabel('std_avg_rating')
plt.ylabel('std_num_genres')
plt.show()

In [None]:
# let's pivot the graph above...
plt.figure(figsize=(9,3))
# standardized values (but X and Y are swapped)
plt.scatter(movies_with_avg_rating_std_num_genres_y,movies_with_avg_rating_std_avg_rating_y, s = 42, alpha = 0.25)
# actual values (X and Y swapped)
plt.scatter(movies_with_avg_rating_num_genres_y,movies_with_avg_rating_avg_rating_y, s = 10, alpha = 0.8)
plt.title('std_num_genres vs std_avg_rating')
plt.ylabel('std_avg_rating')
plt.xlabel('std_num_genres')
plt.show()

### The number of times the movie recieved a rating

* Prepare a ```rating_freq``` column for the movies dataframe where you add the number of times the movie recieved a rating
    * If the movie did not recieve any rating, put in 0

In [None]:
rating_freq = ratings_raw \
        .groupBy(col('movieId')) \
        .count() \
        .withColumnRenamed('count', 'rating_freq') \
        .withColumnRenamed('movieId', 'movieId_rating_freq')

In [None]:
rating_freq.orderBy(col('count').desc()).show(5)

In [None]:
movies_with_avg_rating = movies_with_avg_rating \
        .join(rating_freq, 
              movies_with_avg_rating.movieId == rating_freq.movieId_rating_freq, 
              'left').drop(col('movieId_rating_freq'))


# movies_with_rating_freq = movies_with_rating_freq.withColumnRenamed('count', 'rating_freq')

In [None]:
movies_with_avg_rating.orderBy(col('rating_freq').desc()).show(5, truncate = True)

### Number of unique users who rated the film

* Prepare a ```rating_freq_user``` column for the movies dataframe where you add the number of unique users who rated the film
    * If no one rated the film, put in 0

In [None]:
spark.catalog.clearCache()

In [None]:
rating_user_freq_aggregate = ratings_raw.groupBy(col('userId'), col('movieId')).count()

In [None]:
rating_user_freq_aggregate.show(5)

In [None]:
rating_user_and_movie = rating_user_freq_aggregate.select(col('userId'), col('movieId'))

In [None]:
rating_user_and_movie.show(5, False)

In [None]:
rating_user_and_movie.count()

In [None]:
rating_user_and_movie.distinct().count()

seems like no user has given multiple votes to the same movie...

### How many films where the ```rating_freq``` differs from ```rating_freq_user```? Does this match the analysis we did for ```ratings.csv```?

*Keeping it around for now - this might turn out to be superfluous / vague*

this question becomes superflouous in light of above... 

In [None]:
# as seen above


### Which genres have recieved the highest number of ratings?

Intuition says that this could match the results of "4.1.4 number of films associated with each genre"

### What are the 20 highest rated movies in each genre?

* one way to find our 'highest rated' would be by summing all the occurances of 4 and 5 stars for each movie in each genre and sorting from highest to lowest

### List of *highly rated* movies

* Prepare a list of highly rated movies, present this list by year of release and sorted in alphabetical order by movie title.
    * "Highly Rated" = movies with atleast 3 instances where users have rated the film a 4 or a 5
    * Expected Columns in the output: ```year of release, movie title, # of 4s, # of 5s```  

### List of *highly rated* movies - Approach 2

* Another approach to 'highly rated', prepare a list of 'highly rated' movies where:
    * "Highly Rated" = sum of 4 and 5 ratings is the highest across all years
    * Sort this list by year of release

### Can we find "Late Bloomers" or "Cult Films"? 

## Insights from Problem Set 4

# Problem Set 7  - Bonus!


1. Writing your Outputs: Persist the new movies dataframe with the added ```avg_rating, rating_freq, rating_freq_user``` columns to local disk in CSV format  

1. Just fun on string operations: Prepare a list of movies that have atleast two vowels except 'e' - sort the list by month and year of video release.  
   * sample output: ```year of video release, month of video release, movie title, # of e, # of vowels that are not e```  
   
1. Combinations of generes in ```movies```: Think about combinations of 2 genres...    
    * What is the frequency of each **pair** of genres? That is how many films belong to each unique pair of genres? 
        * For e.g. how many times do adventure and romance occur together? etc.
    * Are there generes that never occur together? 
    * Are there generes that always occur together? 
        * For e.g. in a multi-genre movie, whenever ```Comedy``` occurs is it always accompanied by ```Adventure```?

1. *[optional]* Is there a way to create a 2D matrix of genres - which clearly displays the absolute frequency of any pair of genres? *(hint: try pivot tables)*  

  

*[think]*: Is there a 'variety' metric? sum of absolute frequencies divided by total absolute frequency?  

### Combinations of generes: Think about combinations of 2 genres...  

* What is the frequency of each genre pair of occuring together? 
    * For e.g. how many times do adventure and romance occur together? etc.
* Are there generes that never occur together? 
* Are there generes that always occur together? 
    * For e.g. in a multi-genre movie, whenever ```Comedy``` occurs is it always accompanied by ```Adventure```?

#### Find all pairs of genres

We'll need to generate a new dataframe which has all **bi**grams (pair of 2 genres occuring together) of genres.

see: [n-grams](https://en.wikipedia.org/wiki/N-gram) aka Q-grams  
Another [note on n-grams](https://web.stanford.edu/~jurafsky/slp3/3.pdf).

#####  **Approach 1**  
  
A seemingly obvious method would be to use ```df.crossJoin``` *(and ```df.withColumnRenamed``` to avoid confusing column names)*...  
Let's try that first and see...  

In [None]:
# first create two columns, resulting in all combinations 
# filter out duplicates (no duplicates - no records with same value in col1 and col2)
# this filters out movies with only one genre defined
movie_genre_bigrams_1 = unique_movie_genres.select(col('genre')) \
    .withColumnRenamed('genre', 'genre1') \
    .crossJoin(unique_movie_genres.select(col('genre')).withColumnRenamed('genre', 'genre2')) \
    .filter(col('genre1') != col('genre2'))

In [None]:
# create another column where genre values are concatenated - just so we are able to see the values clearly
from pyspark.sql.functions import concat_ws

movie_genre_bigrams_1 = movie_genre_bigrams_1.withColumn('2_genre_combos', concat_ws('|', col('genre1'), col('genre2')))

In [None]:
movie_genre_bigrams_1.show()

*OBSERVATION*: A problem with the dataset above is that for the context of our data, ```Crime|Romance``` is the same as ```Romance|Crime```, we need to find an efficient way that our analysis is independent of the order of genres in the concatenated string.  

We need to look for *combinations* not *permutations* as the order of genres is not significant for our purposes.

I have a suspicion that this cross-join based approach may not work for solving the actual question - that how many unique movies exist for every combination of genre... let's try solving it and find out.

##### What is the frequency of each pair of genres? - **Approach 1**

aka How many films belong to each unique pair of genres?  
* For e.g. how many times do adventure and romance occur together? etc.  
  
Wokay... keeping the *observation* in mind, let's see how we'd solve for a list of movies

In [None]:
# all the movies in expanded genre where the genre matches genre1 in co

compound_condition = [(movies_expanded_by_genre['genre'] == movie_genre_bigrams_1['genre1']),
                     (movies_expanded_by_genre['genres'].contains(movie_genre_bigrams_1['genre2']))]
movies_with_bigrams_1 = movies_expanded_by_genre.join(movie_genre_bigrams_1, 
                               compound_condition,
                              'inner') \
                        .orderBy(col('title').asc())

In [None]:
movies_with_bigrams_1.show(15, False)

Yeah...  
As expected, we get twice the number of results. One for ```Thriller|Horror``` and another for ```Horror|Thriller```  


In [None]:
# just for fun
movie_bigram_1_freq = movies_with_bigrams_1.groupBy('genre1', 'genre2').count()

In [None]:
# compare counts of two sets to see if they match
movie_bigram_1_freq.filter(col('genre1')=='Comedy').orderBy(col('count').desc()).show(25)
movie_bigram_1_freq.filter(col('genre1')=='Romance').orderBy(col('count').desc()).show(25)

The counts seem to match but in some cases there's odd numbers here... like ```Romance|Action``` has 543 movies - how does that work?

In [None]:
movies_with_bigrams_1.filter(col('genre1')=='Comedy').filter(col('genre2') == 'Western').select(col('title')).show(50,False)

##### Approach 2

Using the n-gram function?

In [None]:
spark.catalog.clearCache()

In [None]:
# spark.stop()