In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *

import pandas as pd 
import matplotlib.pyplot as plt 
import seaborn as sns 

In [29]:
spark = SparkSession.builder.appName("Chapter_3").getOrCreate()

df = spark.read.format('csv').option('inferSchema', False).option('header', True).option('sep', '|').load('movie_data.csv')
select_columns = ['id','budget','popularity','release_date','revenue','title']
df = df.select(*select_columns)
df.show(10, False)

+-----+------+----------+------------+-------+---------------------------------------+
|id   |budget|popularity|release_date|revenue|title                                  |
+-----+------+----------+------------+-------+---------------------------------------+
|43000|0     |2.503     |1962-05-23  |0      |The Elusive Corporal                   |
|43001|0     |5.51      |1962-11-12  |0      |Sundays and Cybele                     |
|43002|0     |5.62      |1962-05-24  |0      |Lonely Are the Brave                   |
|43003|0     |7.159     |1975-03-12  |0      |F for Fake                             |
|43004|500000|3.988     |1962-10-09  |0      |Long Day's Journey Into Night          |
|43006|0     |3.194     |1962-03-09  |0      |My Geisha                              |
|43007|0     |2.689     |1962-10-31  |0      |Period of Adjustment                   |
|43008|0     |6.537     |1959-03-13  |0      |The Hanging Tree                       |
|43010|0     |4.297     |1962-01-01  |0    

In [3]:
df_with_newcols = df.select('id', 'budget', 'popularity').withColumn('budget_cat', when(df['budget']<10000000,'small').when(df['budget']<100000000,'medium').otherwise('big')).withColumn('ratings',when(df['popularity']<3, 'low').when(df['popularity']<5, 'mid').otherwise('High'))

df_with_newcols.show(5, False)

+-----+------+----------+----------+-------+
|id   |budget|popularity|budget_cat|ratings|
+-----+------+----------+----------+-------+
|43000|0     |2.503     |small     |low    |
|43001|0     |5.51      |small     |High   |
|43002|0     |5.62      |small     |High   |
|43003|0     |7.159     |small     |High   |
|43004|500000|3.988     |small     |mid    |
+-----+------+----------+----------+-------+
only showing top 5 rows



In [4]:
# concat values together
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category', concat(df_with_newcols.budget_cat, df_with_newcols.ratings))
df_with_newcols.show(3, False)

+-----+------+----------+----------+-------+---------------------+
|id   |budget|popularity|budget_cat|ratings|BudgetRating_Category|
+-----+------+----------+----------+-------+---------------------+
|43000|0     |2.503     |small     |low    |smalllow             |
|43001|0     |5.51      |small     |High   |smallHigh            |
|43002|0     |5.62      |small     |High   |smallHigh            |
+-----+------+----------+----------+-------+---------------------+
only showing top 3 rows



In [5]:
# registering a Temporary table
df_with_newcols.registerTempTable('temp_data')

spark.sql('select ratings, count(ratings) from temp_data group by ratings').show(10, False)

+-------+--------------+
|ratings|count(ratings)|
+-------+--------------+
|High   |16856         |
|low    |14865         |
|mid    |12277         |
+-------+--------------+



In [6]:
# using Window functions
df_with_newcols = df_with_newcols.filter((df_with_newcols['popularity'].isNotNull()) & (~isnan(df['popularity'])))

df_with_newcols = df_with_newcols.select('id', 'budget', 'popularity', ntile(10).over(Window.partitionBy().orderBy(df_with_newcols['popularity'].desc())).alias('decile_rank'))

df_with_newcols.groupBy('decile_rank').agg(min('popularity').alias('min_popularity'), max('popularity').alias('max_popularity'), count('popularity')).show()



+-----------+------------------+--------------+-----------------+
|decile_rank|    min_popularity|max_popularity|count(popularity)|
+-----------+------------------+--------------+-----------------+
|          1|             7.402|            99|             4379|
|          2|             5.792|         7.401|             4379|
|          3|             4.792|         5.792|             4379|
|          4|             4.024|         4.792|             4378|
|          5|             3.371|         4.024|             4378|
|          6|             2.779|          3.37|             4378|
|          7|             2.108|         2.779|             4378|
|          8|            10.422|         2.108|             4378|
|          9|             1.389|         10.42|             4378|
|         10|0.6000000000000001|         1.389|             4378|
+-----------+------------------+--------------+-----------------+



In [31]:
df_second_best = df.select('id', 'popularity', 'release_date')
df_second_best = df_second_best.withColumn('release_year', year('release_date')).drop('release_date')
year_window = Window.partitionBy(df_second_best['release_year']).orderBy(df_second_best['popularity'].desc())
df_second_best = df_second_best.select('id', 'popularity', 'release_year', rank().over(year_window).alias('rank'))
df_second_best.filter((df_second_best['release_year']==1970) & (df_second_best['rank']==2)).show()

+---+----------+------------+----+
| id|popularity|release_year|rank|
+---+----------+------------+----+
|651|     9.588|        1970|   2|
+---+----------+------------+----+



In [8]:
# difference btw highest grossing movie of the  year and other movies of same year

# select clumns we need
df_revenue = df.select('id', 'revenue', 'release_date')
# extract YEAR from the date columnn
df_revenue = df_revenue.withColumn('release_year', year('release_date')).drop('release_date')
# define the parition function along the range
windowRev = Window.partitionBy(df_revenue['release_year']).orderBy(df_revenue['revenue'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
# apply the partition function for the revenue differences
revenue_difference = (max(df_revenue['revenue']).over(windowRev) - df_revenue['revenue'])

df_revenue.select('id', 'revenue', 'release_year', revenue_difference.alias('revenue_difference')).show(10, False)

+-----+---------+------------+------------------+
|id   |revenue  |release_year|revenue_difference|
+-----+---------+------------+------------------+
|9660 |6800000  |1959        |0.0               |
|301  |5800000  |1959        |1000000.0         |
|10882|51000000 |1959        |-4.42E7           |
|76863|3745000  |1959        |3055000.0         |
|5544 |3193     |1959        |6796807.0         |
|239  |25000000 |1959        |-1.82E7           |
|4952 |18750000 |1959        |-1.195E7          |
|15944|17658000 |1959        |-1.0858E7         |
|665  |164000000|1959        |-1.572E8          |
|896  |134241   |1959        |6665759.0         |
+-----+---------+------------+------------------+
only showing top 10 rows



In [9]:
# using COLLECT_LIST function
df = df.withColumn('release_year', year('release_date'))
df.filter("title=='The Lost World'").groupBy('title').agg(collect_list('release_year')).show(1, False)


+--------------+------------------------------------+
|title         |collect_list(release_year)          |
+--------------+------------------------------------+
|The Lost World|[1999, 2001, 1925, 1960, 1992, 1998]|
+--------------+------------------------------------+



In [10]:
# sampling in PYSPARK
# with/without replacement, percentage to sample, seed value
df_sample = df.sample(False, 0.4, 11)
df_sample.count()


17729

In [11]:
# stratified sampling
df_strat = df.sampleBy('release_year', fractions={1959:0.2, 1960:0.4, 1961:0.4}, seed=11)
df_strat.count()

260

In [None]:
# save DataFrame 
# df.write.format('csv').option('delimiter', '|').save('output_df')

In [13]:
# PySpark to Pandas
df_pandas = df.toPandas()

df_pandas.head()

Unnamed: 0,id,budget,popularity,release_date,revenue,title,release_year
0,43000,0,2.503,1962-05-23,0,The Elusive Corporal,1962.0
1,43001,0,5.51,1962-11-12,0,Sundays and Cybele,1962.0
2,43002,0,5.62,1962-05-24,0,Lonely Are the Brave,1962.0
3,43003,0,7.159,1975-03-12,0,F for Fake,1975.0
4,43004,500000,3.988,1962-10-09,0,Long Day's Journey Into Night,1962.0


In [14]:
# Pandas to PySpark
df_py = spark.createDataFrame(df_pandas)


In [None]:
# JOIN operation, this is for demonstration purposes, DF_1 doesn't exist
df.join('df_1', df.id == df_1.name, 'inner').printSchema()

In [16]:
# dropping Duplicates
df.dropDuplicates(['title', 'release_year']).count()

43643

In [None]:
histogram_data = df.select('popularity').rdd.flatMap(lambda x: x).histogram(25)
hist_df = pd.DataFrame(list(zip(*histogram_data)), columns=['bin', 'frequency'])
sns.set(rc={'figure.figsize': (12, 8)})
sns.barplot(hist_df['bin'], hist_df['fequency'])
plt.xticks(rotation=45)
plt.show()

# Excercise Questions

In [47]:
df = df.withColumn('release_year', year('release_date')).drop('release_date')

In [51]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_year: integer (nullable = true)



In [105]:
# Question 1: Identify the second most popular movie in 2010 based on popularity.

# the second ROW is the answer
df.select('title').filter(df['release_year'] == 2010).orderBy(df['popularity'].desc()).show(2, False)

+------------------------+
|title                   |
+------------------------+
|Cave of Forgotten Dreams|
|Firebreather            |
+------------------------+
only showing top 2 rows



In [None]:
# Question 2: Identify all title names that are repeated and show the years in which they were repeated.




In [None]:
# Question 3: Identify the top movies by popularity across all years.
df.groupBy('release_year').agg(max(df['popularity'])).show(10, False)

