In [1]:
import pyspark
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,StringType
import re
import time
sc = pyspark.SparkContext(appName="test")

spark = SparkSession(sc)

moviesdf = spark.read.csv("/home/spark/Documents/spark/Dataset/ml-latest-small/movies.csv" , header=True)
ratingsdf = spark.read.csv("/home/spark/Documents/spark/Dataset/ml-latest-small/ratings.csv" , header=True)
linksdf = spark.read.csv("/home/spark/Documents/spark/Dataset/ml-latest-small/links.csv" , header=True)
tagsdf = spark.read.csv("/home/spark/Documents/spark/Dataset/ml-latest-small/tags.csv" , header=True)

# Problem 1 

## How many “Drama” movies (movies with the "Drama" genre) are there? 

In [2]:
moviesdf.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [3]:
moviesdf.filter(col("genres").contains('Drama')).count()

4361

In [4]:
moviesdf.filter(col("genres").contains('Drama')).show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|     11|American Presiden...|Comedy|Drama|Romance|
|     14|        Nixon (1995)|               Drama|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sense and Sensibi...|       Drama|Romance|
|     20|  Money Train (1995)|Action|Comedy|Cri...|
|     22|      Copycat (1995)|Crime|Drama|Horro...|
|     24|       Powder (1995)|        Drama|Sci-Fi|
|     25|Leaving Las Vegas...|       Drama|Romance|
|     26|      Othello (1995)|               Drama|
|     27| Now and Then (1995)|      Children|Drama|
|     28|   Persuasion (1995)|       Drama|Romance|
|     29|City of Lost Chil...|Adventure|Drama|F...|
|     30|Shanghai Triad (Y...|         Crime|Drama|
|     31|Dangerous Minds (...|               Drama|
|     34|         Babe (1995)|      Children|Drama|
|     36|Dea

### Solution

In [5]:
moviesdf.filter(col("genres").contains('Drama')).count()

4361

# Problem 2

## How many unique movies are rated, how many are not rated?

In [6]:
ratingsdf.count()

100836

In [7]:
ratingsdf.where(col("rating").isNotNull()).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [70]:
ratingsdf.select("movieId").distinct().show()

+-------+
|movieId|
+-------+
|    296|
|   1090|
| 115713|
|   3210|
|  88140|
|    829|
|   2088|
|   2294|
|   4821|
|  48738|
|   3959|
|  89864|
|   2136|
|    691|
|   3606|
| 121007|
|   6731|
|  27317|
|  26082|
| 100553|
+-------+
only showing top 20 rows



##  SOlution

In [9]:
ratingsdf.select("movieId").distinct().count()

9724

# Problem 3

## Who give the most ratings, how many rates did he make?

In [10]:
ratingsdf.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|          1000129365|
|    max|                99|           99992|               5.0|           999873731|
+-------+------------------+----------------+------------------+--------------------+



In [11]:
ratingsdf.where(col("userId").isNotNull()).count()

100836

In [12]:
ratingsdf.cube("userId").count().sort(f.desc('count')).where(col('userId').isNotNull()).show()

+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
|   249| 1046|
|   387| 1027|
|   182|  977|
|   307|  975|
|   603|  943|
|   298|  939|
|   177|  904|
|   318|  879|
|   232|  862|
|   480|  836|
+------+-----+
only showing top 20 rows



## Solution

In [13]:
user_who_rated_most = ratingsdf.cube("userId").count().sort(f.desc('count')).where(col('userId').isNotNull()).first()
print("User {} was the most aactive user with {} rating provided".format(user_who_rated_most['userId'],user_who_rated_most['count']))

User 414 was the most aactive user with 2698 rating provided


# Problem 4

## Compute min, average, max rating per movie.

## Solution

In [14]:
ratingsdf.groupby(col('movieId')).agg(f.min('rating'),f.avg('rating'),f.max('rating')).show()

+-------+-----------+------------------+-----------+
|movieId|min(rating)|       avg(rating)|max(rating)|
+-------+-----------+------------------+-----------+
| 100553|        4.5|               4.5|        4.5|
| 102684|        3.5|              3.75|        4.0|
|   1090|        1.0| 3.984126984126984|        5.0|
| 112911|        0.5|               2.0|        4.0|
| 115713|        0.5|3.9107142857142856|        5.0|
| 117630|        1.0|               1.0|        1.0|
| 119655|        1.0|              2.25|        3.5|
| 120478|        3.5| 4.333333333333333|        5.0|
| 121007|        4.0|               4.0|        4.0|
|   1572|        2.5|               3.0|        3.5|
| 158813|        1.0|               2.0|        3.0|
| 173535|        4.5|               4.5|        4.5|
|   2069|        3.5|              4.25|        5.0|
|   2088|        1.0|               2.5|        4.0|
|   2136|        0.5|2.4642857142857144|        5.0|
|   2162|        1.0|               2.5|      

# Problem 5

## Output dataset containing users that have rated a movie but not tagged it.

In [59]:
users_who_rated_and_not_tagged = ratingsdf.join(tagsdf,['movieId'],how="left")

In [60]:
users_who_rated_and_not_tagged.count()

285762

In [61]:
users_who_rated_and_not_tagged.show(5)

+-------+------+------+---------+------+-----+----------+
|movieId|userId|rating|timestamp|userId|  tag| timestamp|
+-------+------+------+---------+------+-----+----------+
|      1|     1|   4.0|964982703|   567|  fun|1525286013|
|      1|     1|   4.0|964982703|   474|pixar|1137206825|
|      1|     1|   4.0|964982703|   336|pixar|1139045764|
|      3|     1|   4.0|964981247|   289|  old|1143424860|
|      3|     1|   4.0|964981247|   289|moldy|1143424860|
+-------+------+------+---------+------+-----+----------+
only showing top 5 rows



In [62]:
users_who_rated_and_not_tagged.filter(col('tag')==None).show(5)

+-------+------+------+---------+------+---+---------+
|movieId|userId|rating|timestamp|userId|tag|timestamp|
+-------+------+------+---------+------+---+---------+
+-------+------+------+---------+------+---+---------+



# Problem 6

## Output dataset containing users that have rated AND tagged a movie.

In [23]:

users_who_rated_and_tagged = ratingsdf.join(tagsdf,['movieId'],how="inner")

users_who_rated_and_tagged.filter(col('rating').isNull()).show()

+-------+------+------+---------+------+---+---------+
|movieId|userId|rating|timestamp|userId|tag|timestamp|
+-------+------+------+---------+------+---+---------+
+-------+------+------+---------+------+---+---------+



# Problem 7

## Describe how you would ﬁnd the release year for a movie (refer to the readme for information).

In [16]:
def get_year(movieName):
    result = re.findall(r'.*([1-3][0-9]{3})',movieName)
    if result:
        return int(result[0])
    else:
        return None
    
get_year_udf = udf(lambda z: get_year(z),IntegerType())

##  Solution

In [17]:
moviesdf.select('movieId','title',get_year_udf('title').alias('Year')).show()

+-------+--------------------+----+
|movieId|               title|Year|
+-------+--------------------+----+
|      1|    Toy Story (1995)|1995|
|      2|      Jumanji (1995)|1995|
|      3|Grumpier Old Men ...|1995|
|      4|Waiting to Exhale...|1995|
|      5|Father of the Bri...|1995|
|      6|         Heat (1995)|1995|
|      7|      Sabrina (1995)|1995|
|      8| Tom and Huck (1995)|1995|
|      9| Sudden Death (1995)|1995|
|     10|    GoldenEye (1995)|1995|
|     11|American Presiden...|1995|
|     12|Dracula: Dead and...|1995|
|     13|        Balto (1995)|1995|
|     14|        Nixon (1995)|1995|
|     15|Cutthroat Island ...|1995|
|     16|       Casino (1995)|1995|
|     17|Sense and Sensibi...|1995|
|     18|   Four Rooms (1995)|1995|
|     19|Ace Ventura: When...|1995|
|     20|  Money Train (1995)|1995|
+-------+--------------------+----+
only showing top 20 rows



# Problem 8

## Enrich movies dataset with extract the release year. Output the enriched dataset.

## SOlution

In [18]:
moviesdf_with_year = moviesdf.withColumn('Year',get_year_udf(col('title')))

In [19]:
moviesdf_with_year.show(5)

+-------+--------------------+--------------------+----+
|movieId|               title|              genres|Year|
+-------+--------------------+--------------------+----+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|
|      5|Father of the Bri...|              Comedy|1995|
+-------+--------------------+--------------------+----+
only showing top 5 rows



# Problem 9

##  Output dataset showing the number of movies per Genre per Year (movies will be counted many times if it's associated with multiple genres). 

In [None]:
moviesdf_genres_with_year.groupBy('genre','Year')

In [None]:
moviesdf_with_year.show(5)

In [None]:
moviesdf_with_year.describe().show()

In [None]:
moviesdf_with_year.summary().show()

# Problem 10

## Describe how you would write tests to ensure that all movies have a release year? Write these tests

In [None]:
movies_with_year =  moviesdf_with_year.select('year').where(col('year').isNotNull()).count()

In [None]:
movies_without_release_year = moviesdf_with_year.select('year').where(col('year').isNull()).count()

In [None]:
movies_without_release_year

# Problem 11

##  Write tests to ensure that the rating & tag happened on or after the year that the movie was released (here we can only check the release year against the year of rating & tag).

In [2]:
def epoch_to_datetime(x):
    return time.localtime(int(x)).tm_year

In [3]:
epoch_to_datetime_udf = udf(lambda z: epoch_to_datetime(z) ,IntegerType())

In [4]:
ratingsdf_with_year = ratingsdf.withColumn("ratings_year",epoch_to_datetime_udf(col('timestamp')))
tagsdf_with_year = tagsdf.withColumn("tags_year",epoch_to_datetime_udf(col('timestamp')))

In [24]:
ratingsdf_with_year.join( tagsdf_with_year, ['movieId'],how='full').show(5)

+-------+------+------+----------+------------+------+-------+----------+---------+
|movieId|userId|rating| timestamp|ratings_year|userId|    tag| timestamp|tags_year|
+-------+------+------+----------+------------+------+-------+----------+---------+
| 100553|   105|   4.5|1446572398|        2015|  null|   null|      null|     null|
| 100553|   318|   4.5|1426353247|        2015|  null|   null|      null|     null|
| 102684|   249|   3.5|1376654552|        2013|  null|   null|      null|     null|
| 102684|   380|   4.0|1494709199|        2017|  null|   null|      null|     null|
|   1090|     1|   4.0| 964984018|        2000|   474|Vietnam|1138137990|     2006|
+-------+------+------+----------+------------+------+-------+----------+---------+
only showing top 5 rows



In [26]:
tagsdf.filter((col('movieId')==1090) & (col('userId')==1)).show()

+------+-------+---+---------+
|userId|movieId|tag|timestamp|
+------+-------+---+---------+
+------+-------+---+---------+



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 33284)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/home/spark/Documents/spark/spark-2.4.5-bin-hadoop2.7/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/home/spark/Documents/spark/spark-2.4.5-bin-hadoop2.7/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/home/spark/Documents/spark/spark-2.4.5-bin-hadoop2.7/python/pyspark/accumulators.py", line 245, in ac

In [None]:
ratingsdf_with_year.show(5)

In [None]:
tagsdf_with_year.show(5)

In [None]:
moviesdf_with_year.show(5)

In [None]:
moviesdf_genres_with_year = moviesdf_with_year.withColumn("genres", f.explode(f.split("genres","[|]")))

In [None]:
moviesdf_genres_with_year.show(5)

In [None]:
moviesdf_genres_with_year.groupBy('genres','year').count().show()

## With Genre

In [None]:
moviesandratingsdf = moviesdf_genres_with_year.join(ratingsdf_with_year , moviesdf_genres_with_year.movieId==ratingsdf_with_year.movieId).drop(ratingsdf_with_year.movieId)

##  Without Genre

In [None]:
moviesandratingsdf = moviesdf_with_year.join(ratingsdf_with_year , moviesdf_with_year.movieId==ratingsdf_with_year.movieId).drop(ratingsdf_with_year.movieId)

In [None]:
moviesandratingsdf.show(5)

###  Movies whose ratings were given after release

In [None]:
moviesandratingsdf.filter(col('Year')<=col('ratings_year')).show(5)

###  Movies whose ratings were given before release

In [None]:
moviesandratingsdf.filter(col('Year')>col('ratings_year')).show(5)

In [None]:
tagsdf_with_year.show(5)

## Movies Ang Tags Together

In [20]:
moviesandtagsdf = moviesdf_genres_with_year.join(tagsdf_with_year , moviesdf_genres_with_year.movieId==tagsdf_with_year.movieId).drop(tagsdf_with_year.movieId)

NameError: name 'moviesdf_genres_with_year' is not defined

In [None]:
moviesandtagsdf.show(5)

### Movies whose ratings were given after release

In [None]:
moviesandtagsdf.filter(col('Year')<=col('tags_year')).show(5)

### Movies whose ratings were given before release

In [None]:
moviesandtagsdf.filter(col('Year')>col('tags_year')).show(5)

# Problem 12

##  Write tests to ensure that at least 50% of movies have more than one genres.

In [None]:
moviesdf_genres_with_year.show(5)

In [None]:
moviesandtagsdf.select('movieId').distinct().count()

In [None]:
moviesdf_genres_with_year.groupBy('movieId').count().filter(col('count') > 1).show()

## Movies with more than 1 Genres

In [None]:
moviesdf_genres_with_year.groupBy('movieId').count().filter(col('count') > 1).count()

In [None]:
movies_with_atleast_1_genres = moviesdf_genres_with_year.groupBy('movieId').count().filter(col('count') > 1).count()

In [None]:
moviesdf_genres_with_year.groupBy('movieId').count().show()

### All Movies with Genres Count

In [None]:
all_movies = moviesdf.count()

In [None]:
all_movies

###  So % of movies with atleast 1 Genres

In [None]:
(movies_with_atleast_1_genres/all_movies)*100

In [None]:
df.filter((col("rating") == 5 )).select('movieId','rating').distinct().show()

In [None]:
moviesdf = spark.read.csv("/home/spark/Documents/spark/Dataset/ml-25m/movies.csv" , header=True)

In [None]:
moviesdf.columns

In [None]:
moviesdf.filter((col("movieId").isin([1,2,3,4]))).show()

In [None]:
moviesdf.filter((col("genres").isin(['Comedy,Romance']))).show()

In [None]:
moviesdf.filter((col("genres").isin(['Comedy,Romance']))).show()

In [None]:
moviesdf.where(filter_func(['comedy','romance'],(col('geners'))))

In [None]:
ratingsdf.groupBy('movieId').count().show(5)

In [None]:
ratingsdf.groupBy('movieId').sum().show(5)

In [None]:
moviesdf.select('title').show(5)