# DATA ETL PROCESS AND DATA MODELING

# Creating Spark Session in spark invironment

In [171]:
## Creating Spark Session 
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

In [172]:
## Loading the Json file into the spark 
from pyspark.sql import functions as F
## Spark Dataframe
df = spark.read.json('C:\\Users\\Santhosh\\Desktop\\imdb.json')

In [173]:
## Selecting array column from the given dataset
from pyspark.sql import functions as F
df1 = df.select(
    F.array(F.expr("title.*")).alias("title"))

In [174]:
## To get the schemas of a dataframe
df1.printSchema()

root
 |-- title: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [177]:
## Read the Json file in pandas in pyspark
df = spark.read.json('C:\\Users\\Santhosh\\Desktop\\imdb.json')

## Problems and Challenges while reading json file in pyspark

## Data Reading or Importing Json file Data in the Pandas 

In [178]:
## Importing the pandas library in pyspark
import pandas as pd
## Read the Json file in the pyspark
df = pd.read_json('C:\\Users\\Santhosh\\Desktop\\imdb.json')

In [179]:
## Read the IMDB Dataset in the pyspark
df = pd.read_json('C:\\Users\\Santhosh\\Desktop\\imdb_processed.json')

In [180]:
## To see the dataset
df

Unnamed: 0,title,year,kind,genre,rating,vote,country,language,cast,director,composer,writer,runtime
0,Dinosaur Planet,2003,tv mini series,"[Documentary, Animation, Family]",7.7,474.0,[United States],[English],"[Christian Slater, Scott Sampson]",,,"[Mike Carrol, Mike Carroll, Georgann Kane]",50.0
1,Get Up and Dance!,1994,video movie,[Family],8.1,18.0,[United States],[English],"[Paula Abdul, Aurorah Allain, Bill Bohl, Bob G...",[Steve Purcell],,,83.0
2,8 Man,1992,movie,"[Action, Sci-Fi]",5.5,93.0,[Japan],[Japanese],"[Kai Shishido, Etsushi Takahashi, Sachiko Ayas...",[Yasuhiro Horiuchi],[Carole King],"[Kazumasa Hirai, Jirô Kuwata, Junko Suzuki]",
3,What the #$*! Do We (K)now!?,2004,movie,"[Documentary, Comedy, Drama, Fantasy, Mystery,...",5.3,13432.0,[United States],"[English, German, Spanish]","[Marlee Matlin, Elaine Hendrix, John Ross Bowi...","[William Arntz, Betsy Chasse, Mark Vicente]",[Christopher Franke],"[William Arntz, Betsy Chasse, Matthew Hoffman]",60.0
4,Class of Nuke 'Em High Part II: Subhumanoid Me...,1991,movie,"[Comedy, Horror, Sci-Fi]",4.5,2177.0,[United States],[English],"[Brick Bronsky, Lisa Gaye, Leesa Rowland, Mich...",[Eric Louzil],[Bob Mithoff],"[Lloyd Kaufman, Carl Morano, Matt Unger]",96.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
8446,All Monsters Attack,1969,movie,"[Adventure, Family, Fantasy]",3.9,4384.0,[Japan],[Japanese],"[Tomonori Yazaki, Hideyo Amamoto, Sachio Sakai...","[Ishirô Honda, Jun Fukuda, Kengo Furusawa]",[Kunio Miyauchi],[Shin'ichi Sekizawa],
8447,Fidel Castro,2005,episode,"[Documentary, Biography, History]",6.4,44.0,,"[English, Spanish]","[Rena Baskin, Fulgencio Batista, James Blight,...","[Adriana Bosch, M Pink Christofalo]",[Mason Daring],[Adriana Bosch],
8448,Epoch,2001,tv movie,"[Sci-Fi, Thriller]",4.9,2376.0,[United States],[English],"[David Keith, Stephanie Niznik, Ryan O'Neal, B...",[Matt Codd],[Richard McHugh],"[Jonathan Raymond, Jonathan Raymond, Phillip J...",
8449,The Company,2003,movie,"[Drama, Music, Romance]",6.3,6501.0,"[United States, Germany, United Kingdom]",[English],"[Neve Campbell, Malcolm McDowell, James Franco...",[Robert Altman],[Van Dyke Parks],"[Neve Campbell, Barbara Turner, Barbara Turner]",


In [181]:
## To get the type of dataframe
type(df)

pandas.core.frame.DataFrame

In [182]:
## To get the shape of the dataframe
df.shape

(8451, 13)

In [183]:
## To assign the ID with index number column to the existing dataframe
df['ID'] = df.index

In [185]:
# Other CSV options
## Exporting the required datasets from pyspark to local folder
df.to_csv('C:/Users/Santhosh/Desktop/Rawdata_Movies.csv', header = True)

## Creating Explode Dataframes

In [186]:
## To explode the title column to transform the array column to multiple rows in pyspark
df1 = df.explode('title')

In [187]:
## To get the shape of the dataframe
df.shape

(8451, 14)

In [188]:
## To explode the year column to transform the array column to multiple rows in pyspark
df2 = df.explode('year')

In [189]:
## To get the shape of the dataframe
df2.shape

(8451, 14)

In [190]:
## To explode the kind column to transform the array column to multiple rows in pyspark
df3 = df.explode('kind')

In [191]:
## To get the shape of the data
df3.shape

(8451, 14)

In [192]:
## To explode the genre column to transform the array column to multiple rows in pyspark
df4 = df.explode('genre')

In [193]:
## To get the shape of the dataframe
df4.shape

(22093, 14)

In [194]:
## To explode the country column to transform the array column to multiple rows in pyspark
df5 = df.explode('country')

In [195]:
## To get the shape of the dataframe
df5.shape

(10785, 14)

In [196]:
## To explode the language column to transform the array column to multiple rows in pyspark
df6 = df.explode('language')

In [197]:
## To get the shape of the dataframe
df6.shape

(11199, 14)

In [198]:
## To explode the cast column to transform the array column to multiple rows in pyspark
df7 = df.explode('cast')

In [199]:
## To get the shape of the data
df7.shape

(264345, 14)

In [200]:
## To explode the director column to transform the array column to multiple rows in pyspark
df8 = df.explode('director')

In [201]:
## To get the shape of the dataframe
df8.shape

(9428, 14)

In [202]:
## To explode the composer column to transform the array column to multiple rows in pyspark
df9 = df.explode('composer')

In [203]:
## To get the shape of the dataframe
df9.shape

(9938, 14)

In [204]:
## To explode the writer column to transform the array column to multiple rows in pyspark
df10 = df.explode('writer')

In [205]:
## To get the shape of the dataframe
df10.shape

(16628, 14)

## Appending Multiple Dataframes
## To get the number of records for each of the dataframe

In [206]:
## Getting the No of records and no of columns for all the dataframes
dfs = (df1, df2, df3, df4, df5,df6,df7,df8,df9,df10)
for df in (dfs):
    print(df.shape)

(8451, 14)
(8451, 14)
(8451, 14)
(22093, 14)
(10785, 14)
(11199, 14)
(264345, 14)
(9428, 14)
(9938, 14)
(16628, 14)


In [None]:
## Now appending the all the dataframes 

df_final = df1.append([df2, df3,df4,df5,df6,df7,df8,df9,df10])

In [208]:
## Get the dimension of the dataframe
df_final.shape

(369769, 14)

## Merging Multiple Dataframes to Make Final Dataset

In [211]:
## To get the selected columns from the given dataframe
df1_map = df1[['title', 'year', 'kind','rating', 'vote','runtime', 'ID']]

In [214]:
# inner join
df1_map_genre = pd.merge(df1_map, df4[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','genre']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [215]:
## Get the Shape of the Dataframe
df1_map_genre.shape

(22093, 10)

In [217]:
# inner join
df1_map_country = pd.merge(df1_map_genre, df5[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','country']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [218]:
## Get the shape of the Dataframe
df1_map_country.shape

(28527, 13)

In [None]:
# inner join
df1_map_language = pd.merge(df1_map_country, df6[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','language']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [221]:
## Get the shape of the Dataframe
df1_map_language.shape

(41168, 16)

In [223]:
# inner join
df1_map_cast = pd.merge(df1_map_language, df7[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','cast']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [224]:
## Get the Shape of the Dataframe
df1_map_cast.shape

(1589822, 19)

In [None]:
# inner join
df1_map_director = pd.merge(df1_map_cast, df8[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','director']],  on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [226]:
## Get the Shape of the Dataframe
df1_map_director.shape

(1734773, 22)

In [228]:
# inner join
df1_map_composer = pd.merge(df1_map_director, df9[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','composer']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [229]:
## Get the shape of the Data
df1_map_composer.shape

(2092388, 25)

In [None]:
# inner join
df1_map_writer = pd.merge(df1_map_composer, df10[['title', 'year', 'kind', 'rating', 'vote', 'runtime', 'ID','writer']], on=['title', 'year', 'kind', 'rating', 'ID'], how='inner')

In [231]:
## Get the shape of the data
df1_map_writer.shape

(4830458, 28)

In [233]:
Maindata = df1_map_writer

In [235]:
Maindata1 = Maindata[['title', 'year', 'kind', 'rating', 'vote_x', 'runtime_x', 'ID','genre','country','language','cast','director','composer','writer']]

In [238]:
## Selecting the required columns in a given dataframe
Maindata2 = Maindata1[['title', 'year', 'kind', 'rating', 'vote_x', 'runtime_x', 'ID',
       'genre', 'country', 'language', 'cast', 'director', 'composer',
       'writer']]

## Removing the Duplicate Records from a given Dataframe

In [242]:
newdf = Maindata2.drop_duplicates()

In [243]:
newdf.shape

(4370062, 20)

In [246]:
new_df = new_df[~new_df.duplicated(subset=['title', 'year', 'kind', 'ID','genre', 'country', 'language', 'cast', 'director', 'composer','writer'], keep=False)].copy()

## Exporting required Dataframe in pandas

In [None]:
final_Data.to_csv('C:\\Users\\Santhosh\\Desktop\\Final_Data_Model.csv', index = True)

## Perform Group of Aggregations to reduce the duplicate records

In [255]:
Data_Agg = new_df.groupby(['ID','title', 'year','kind','genre','country','language','cast','director','composer','writer'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [258]:
## Calculating the average rating with the selected columns 
Data_Agg1 = new_df.groupby(['ID','title', 'year','genre']).agg({'rating': ['mean']}).reset_index()

In [259]:
## Converting the grouped data into dataframe
Data_Agg2 = pd.DataFrame(Data_Agg1)

In [363]:
## Exporting the output file 
import csv
Data_Agg2.to_csv('C:\\Users\\Santhosh\\Desktop\\Movies.csv',index=False,quoting=csv.QUOTE_ALL)

## Perform Group of aggregations for multiple columns

In [263]:
Maindata1 = new_df.groupby(['ID','title', 'year','kind','genre','country'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [264]:
Maindata1.shape

(26352, 8)

In [265]:
Maindata2 = new_df.groupby(['ID','title', 'year','kind','genre','language'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [266]:
Maindata2.shape

(27450, 8)

In [267]:
Maindata3 = new_df.groupby(['ID','title', 'year','kind','genre','cast'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [268]:
Maindata3.shape

(704761, 8)

In [269]:
Maindata4 = new_df.groupby(['ID','title', 'year','kind','genre','director'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [270]:
Maindata4.shape

(20309, 8)

In [271]:
Maindata5 = new_df.groupby(['ID','title', 'year','kind','genre','composer'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [272]:
Maindata5.shape

(19286, 8)

In [273]:
Maindata6 = new_df.groupby(['ID','title', 'year','kind','genre','writer'],as_index = False).agg({'rating': ['mean']}).reset_index()

In [274]:
Maindata6.shape

(36094, 8)

## Data Extraction and Remove Duplicate part is done

## Now it's time to explore more things in Pyspark

## Perform Data Pre Processing and Data ETL and Reporting And Analysis in Pyspark

In [275]:
from pyspark.sql.types import StructType
from pyspark.sql.types import *

## Preparing the Schemas for optimizing the dataframes in Pyspark

In [277]:
schema = StructType(
    [
        StructField("Ser_No", IntegerType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("kind", StringType(), True),
        StructField("rating", StringType(), True),
        StructField("vote", IntegerType(), True),
        StructField("runtime", IntegerType(), True),
        StructField("ID", IntegerType(), True),
        StructField("genre", StringType(), True),
        StructField("country", StringType(), True),
        StructField("language", StringType(), True),
        StructField("cast", StringType(), True),
        StructField("director", StringType(), True),
        StructField("composer", StringType(), True),
        StructField("writer", StringType(), True),
        
    ]
)


## Data Extraction Part

## Read the data into the Spark Environment

In [278]:
## Read the file in the Spark environment
Movie_df = spark.read.csv('C:\\Users\\Santhosh\\Desktop\\Rawdata_Movies_Final_Data2.csv',schema = schema , header = True)

In [279]:
## Get the print Schema of the dataframe
Movie_df.printSchema()

root
 |-- Ser_No: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- kind: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- vote: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- director: string (nullable = true)
 |-- composer: string (nullable = true)
 |-- writer: string (nullable = true)



In [280]:
## To get the content of the dataframe
Movie_df.show()

+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|Ser_No|               title|year|          kind|rating| vote|runtime| ID|genre|country|language|cast|director|composer|writer|
+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|     0|     Dinosaur Planet|2003|tv mini series|   7.7|  474|     50|  0|    0|      0|       0|   0|       0|       0|     0|
|     1|   Get Up and Dance!|1994|   video movie|   8.1|   18|     83|  1|    0|      0|       0|   0|       0|       0|     0|
|     2|               8 Man|1992|         movie|   5.5|   93|   null|  2|    0|      0|       0|   0|       0|       0|     0|
|     3|What the #$*! Do ...|2004|         movie|   5.3|13432|     60|  3|    0|      0|       0|   0|       0|       0|     0|
|     4|Class of Nuke 'Em...|1991|         movie|   4.5| 2177|     96|  4|    0|      0|       0|   0|  

In [282]:
## Get the columns with datatypes in a datagrame
Movie_df.dtypes

[('Ser_No', 'int'),
 ('title', 'string'),
 ('year', 'int'),
 ('kind', 'string'),
 ('rating', 'string'),
 ('vote', 'int'),
 ('runtime', 'int'),
 ('ID', 'int'),
 ('genre', 'string'),
 ('country', 'string'),
 ('language', 'string'),
 ('cast', 'string'),
 ('director', 'string'),
 ('composer', 'string'),
 ('writer', 'string')]

In [283]:
## To get the content of the dataframe
Movie_df.show(2)

+------+-----------------+----+--------------+------+----+-------+---+-----+-------+--------+----+--------+--------+------+
|Ser_No|            title|year|          kind|rating|vote|runtime| ID|genre|country|language|cast|director|composer|writer|
+------+-----------------+----+--------------+------+----+-------+---+-----+-------+--------+----+--------+--------+------+
|     0|  Dinosaur Planet|2003|tv mini series|   7.7| 474|     50|  0|    0|      0|       0|   0|       0|       0|     0|
|     1|Get Up and Dance!|1994|   video movie|   8.1|  18|     83|  1|    0|      0|       0|   0|       0|       0|     0|
+------+-----------------+----+--------------+------+----+-------+---+-----+-------+--------+----+--------+--------+------+
only showing top 2 rows



In [284]:
# extracting number of rows from the Dataframe
row = Movie_df.count()
#extracting number of columns from the Dataframe
col = len(Movie_df.columns)
#printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (352867, 15)
Number of Rows are: 352867
Number of Columns are: 15


In [285]:
## To get the print schema of the dataframe
Movie_df.printSchema()

root
 |-- Ser_No: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- kind: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- vote: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- director: string (nullable = true)
 |-- composer: string (nullable = true)
 |-- writer: string (nullable = true)



In [286]:
## Replacing the null values with zeros in a dataframes
Movie_df1 = Movie_df.fillna(value = 0, subset=["genre","country","language","cast","director","composer","writer"]) \
   .show()

+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|Ser_No|               title|year|          kind|rating| vote|runtime| ID|genre|country|language|cast|director|composer|writer|
+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|     0|     Dinosaur Planet|2003|tv mini series|   7.7|  474|     50|  0|    0|      0|       0|   0|       0|       0|     0|
|     1|   Get Up and Dance!|1994|   video movie|   8.1|   18|     83|  1|    0|      0|       0|   0|       0|       0|     0|
|     2|               8 Man|1992|         movie|   5.5|   93|   null|  2|    0|      0|       0|   0|       0|       0|     0|
|     3|What the #$*! Do ...|2004|         movie|   5.3|13432|     60|  3|    0|      0|       0|   0|       0|       0|     0|
|     4|Class of Nuke 'Em...|1991|         movie|   4.5| 2177|     96|  4|    0|      0|       0|   0|  

## Check the Meta Data Properties

In [287]:
# Retrieving data from 0th row 
Info = Movie_df.collect()[0][0:]

In [288]:
# first row - second column
print("first row - second column  :",
      Movie_df.collect()[0][1])
  
# Third  row - Third column
print("Third  row - Third column  :",
      Movie_df.collect()[2][1])
  
# Third  row - Third column
print("Third  row - Third column  :",
      Movie_df.collect()[2][2])

first row - second column  : Dinosaur Planet
Third  row - Third column  : 8 Man
Third  row - Third column  : 1992


## Calculating Basic Descriptive Statistics to understand data

In [289]:
## Getting the count of the dataframe

In [290]:
## Get the count
from pyspark.sql.functions import count
Movie_df.select(count(Movie_df.title)).show()

+------------+
|count(title)|
+------------+
|      352867|
+------------+



In [291]:
## Get the total Number of records
Movie_df.select(count(Movie_df.title),count(Movie_df.year)).show()

+------------+-----------+
|count(title)|count(year)|
+------------+-----------+
|      352867|     352867|
+------------+-----------+



In [292]:
## Using aggregations
Movie_df.agg({'title':'count','year':'count'}).show()

+-----------+------------+
|count(year)|count(title)|
+-----------+------------+
|     352867|      352867|
+-----------+------------+



In [293]:
## Getting the count of the year in a dataframes
Movie_df.groupBy('year').count().show()

+----+-----+
|year|count|
+----+-----+
|1959| 4037|
|1990| 5444|
|1975| 1887|
|1977| 2644|
|1924|   59|
|2003|26160|
|1974| 3366|
|1927|  400|
|1955|  948|
|1978| 2903|
|1925|  111|
|1961|  804|
|1942|  535|
|1944|  428|
|1939|  522|
|1922|   65|
|1952|  991|
|1956|  944|
|1934|  217|
|1988| 6225|
+----+-----+
only showing top 20 rows



## Preparing  reports to get insights from the data

In [294]:
## find and count the null, None, NaN of all the Dataframe colunmns
from pyspark.sql.functions import col,isnan,when,count
Movie_df.select([count(when(isnan(composer) | col(composer).isNull(),composer)).alias(composer) for composer in Movie_df.columns]).show()

+------+-----+----+----+------+----+-------+---+-----+-------+--------+----+--------+--------+------+
|Ser_No|title|year|kind|rating|vote|runtime| ID|genre|country|language|cast|director|composer|writer|
+------+-----+----+----+------+----+-------+---+-----+-------+--------+----+--------+--------+------+
|     0|    0|   0|   0|  4162|4162|  76041|  0|    0|      0|       0|   0|       0|       0|     0|
+------+-----+----+----+------+----+-------+---+-----+-------+--------+----+--------+--------+------+



In [295]:
## Count the no.of zeros in all columns of dataframe
import pyspark.sql.functions as F
df_zero = Movie_df.select([F.count(F.when(Movie_df[c] == 0,c)).alias(c) for c in Movie_df.columns])
df_zero.limit(2).toPandas().head()

Unnamed: 0,Ser_No,title,year,kind,rating,vote,runtime,ID,genre,country,language,cast,director,composer,writer
0,8,0,0,0,0,0,0,13,330841,342381,342067,88693,344750,345658,337486


In [296]:
Movie_df.filter(col("runtime") == 0).count()

0

In [297]:
## Count the no.of Non- zeros in all columns of dataframe
import pyspark.sql.functions as F
df_zero = Movie_df.select([F.count(F.when(Movie_df[c] != 0,c)).alias(c) for c in Movie_df.columns])
df_zero.limit(2).toPandas().head()

Unnamed: 0,Ser_No,title,year,kind,rating,vote,runtime,ID,genre,country,language,cast,director,composer,writer
0,352859,266,352867,0,348705,348705,276826,352854,0,0,0,0,0,0,0


In [298]:
Movie_df.filter(col("runtime") != 0).count()

276826

In [299]:
## Claculating Percentage for misiing values in each every column in a dataframe: Generic solution for all columns
missing_df = Movie_df.select([count(when(isnan(c) | col(c).isNull(),c))/count(lit(1)).alias(c) for c in Movie_df.columns])
missing_df.limit(2).toPandas().head()

Unnamed: 0,(count(CASE WHEN (isnan(Ser_No) OR (Ser_No IS NULL)) THEN Ser_No END) / count(1) AS Ser_No),(count(CASE WHEN (isnan(title) OR (title IS NULL)) THEN title END) / count(1) AS title),(count(CASE WHEN (isnan(year) OR (year IS NULL)) THEN year END) / count(1) AS year),(count(CASE WHEN (isnan(kind) OR (kind IS NULL)) THEN kind END) / count(1) AS kind),(count(CASE WHEN (isnan(rating) OR (rating IS NULL)) THEN rating END) / count(1) AS rating),(count(CASE WHEN (isnan(vote) OR (vote IS NULL)) THEN vote END) / count(1) AS vote),(count(CASE WHEN (isnan(runtime) OR (runtime IS NULL)) THEN runtime END) / count(1) AS runtime),(count(CASE WHEN (isnan(ID) OR (ID IS NULL)) THEN ID END) / count(1) AS ID),(count(CASE WHEN (isnan(genre) OR (genre IS NULL)) THEN genre END) / count(1) AS genre),(count(CASE WHEN (isnan(country) OR (country IS NULL)) THEN country END) / count(1) AS country),(count(CASE WHEN (isnan(language) OR (language IS NULL)) THEN language END) / count(1) AS language),(count(CASE WHEN (isnan(cast) OR (cast IS NULL)) THEN cast END) / count(1) AS cast),(count(CASE WHEN (isnan(director) OR (director IS NULL)) THEN director END) / count(1) AS director),(count(CASE WHEN (isnan(composer) OR (composer IS NULL)) THEN composer END) / count(1) AS composer),(count(CASE WHEN (isnan(writer) OR (writer IS NULL)) THEN writer END) / count(1) AS writer)
0,0.0,0.0,0.0,0.0,0.011795,0.011795,0.215495,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [300]:
## Calculating the percentage for zeros for each and every column in a given dataframe
Zeros_Perc = Movie_df.select([(count(when(Movie_df[c] == 0,c))/count(lit(1))).alias(c) for c in Movie_df.columns])

Zeros_Perc.limit(2).toPandas().head()

Unnamed: 0,Ser_No,title,year,kind,rating,vote,runtime,ID,genre,country,language,cast,director,composer,writer
0,2.3e-05,0.0,0.0,0.0,0.0,0.0,0.0,3.7e-05,0.93758,0.970283,0.969394,0.25135,0.976997,0.97957,0.956411


In [301]:
## Calculating the percentage for zeros for each and every column in a given dataframe
Non_Zeros_Perc = Movie_df.select([(count(when(Movie_df[c] != 0,c))/count(lit(1))).alias(c) for c in Movie_df.columns])

Non_Zeros_Perc.limit(2).toPandas().head()

Unnamed: 0,Ser_No,title,year,kind,rating,vote,runtime,ID,genre,country,language,cast,director,composer,writer
0,0.999977,0.000754,1.0,0.0,0.988205,0.988205,0.784505,0.999963,0.0,0.0,0.0,0.0,0.0,0.0,0.0


## Removing the Tailing and Leading Spaces in a given Dataframe

In [303]:
## Removing the white spaces for required columns in a given dataframe
Movie_df.withColumn("title", trim(col("title"))).show()
Movie_df.withColumn("year", trim(col("title"))).show()
Movie_df.withColumn("kind", trim(col("kind"))).show()
Movie_df.withColumn("genre", trim(col("genre"))).show()
Movie_df.withColumn("rating", trim(col("rating"))).show()
Movie_df.withColumn("vote", trim(col("vote"))).show()
Movie_df.withColumn("country", trim(col("country"))).show()
Movie_df.withColumn("language", trim(col("language"))).show()
Movie_df.withColumn("cast", trim(col("cast"))).show()
Movie_df.withColumn("director", trim(col("director"))).show()
Movie_df.withColumn("composer", trim(col("composer"))).show()
Movie_df.withColumn("writer", trim(col("writer"))).show()
Movie_df.withColumn("runtime", trim(col("runtime"))).show()

+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|Ser_No|               title|year|          kind|rating| vote|runtime| ID|genre|country|language|cast|director|composer|writer|
+------+--------------------+----+--------------+------+-----+-------+---+-----+-------+--------+----+--------+--------+------+
|     0|     Dinosaur Planet|2003|tv mini series|   7.7|  474|     50|  0|    0|      0|       0|   0|       0|       0|     0|
|     1|   Get Up and Dance!|1994|   video movie|   8.1|   18|     83|  1|    0|      0|       0|   0|       0|       0|     0|
|     2|               8 Man|1992|         movie|   5.5|   93|   null|  2|    0|      0|       0|   0|       0|       0|     0|
|     3|What the #$*! Do ...|2004|         movie|   5.3|13432|     60|  3|    0|      0|       0|   0|       0|       0|     0|
|     4|Class of Nuke 'Em...|1991|         movie|   4.5| 2177|     96|  4|    0|      0|       0|   0|  

## Converting the columns into respective actual format to optimize the datatypes

In [305]:
## Converting data types functions in pyspark dataframes
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, BooleanType, DateType,IntegerType

Convert_df = Movie_df.withColumn("title",col("title").cast(StringType()))\
.withColumn("year",col("year").cast(IntegerType()))\
.withColumn("kind",col("kind").cast(StringType()))\
.withColumn("genre",col("genre").cast(StringType()))\
.withColumn("rating",col("rating").cast(IntegerType()))\
.withColumn("country",col("country").cast(StringType()))\
.withColumn("language",col("language").cast(StringType()))\
.withColumn("cast",col("cast").cast(StringType()))\
.withColumn("director",col("director").cast(StringType()))\
.withColumn("composer",col("composer").cast(StringType()))\
.withColumn("writer",col("writer").cast(StringType()))

In [306]:
## Checking the Schemas for all columns in a dataframe
Convert_df.printSchema()

root
 |-- Ser_No: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- kind: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- vote: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- country: string (nullable = true)
 |-- language: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- director: string (nullable = true)
 |-- composer: string (nullable = true)
 |-- writer: string (nullable = true)



In [307]:
Convert_df = Convert_df

## Removing the Zero's from selected columns in given dataframe

In [308]:
## Removing the zeros(rows) for selected columns and filtering the required data
Convert_df1 = Convert_df.filter((Convert_df.genre != '0') | (Convert_df.country != '0')\
                                |(Convert_df.language != '0') | (Convert_df.cast != '0')\
                                |(Convert_df.director != '0') | (Convert_df.composer != '0')|(Convert_df.writer != '0'))

## Removing the Special Charactors for all the columns in a dataframe

In [312]:
import pyspark.sql.functions as F
import re

Convert_df_final1 = Convert_df_final.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in Convert_df_final.columns])

In [313]:
Convert_df_final1.show()

+-----+--------------------+----+--------------+------+-----+-------+---+-----------+-------+--------+----+--------+--------+------+
|SerNo|               title|year|          kind|rating| vote|runtime| ID|      genre|country|language|cast|director|composer|writer|
+-----+--------------------+----+--------------+------+-----+-------+---+-----------+-------+--------+----+--------+--------+------+
|    0|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|Documentary|      0|       0|   0|       0|       0|     0|
|    1|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|  Animation|      0|       0|   0|       0|       0|     0|
|    2|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|     Family|      0|       0|   0|       0|       0|     0|
|    3|   Get Up and Dance!|1994|   video movie|     8|   18|     83|  1|     Family|      0|       0|   0|       0|       0|     0|
|    4|               8 Man|1992|         movie|     5|   93|   null|

In [314]:
import pyspark.sql.functions as F
Convert_df_final1 = Convert_df_final1.select([F.col(col).alias(col.replace('\#|\$|\*|\!', '')) for col in Convert_df_final1.columns])

In [315]:
Convert_df_final1.withColumn("title",regexp_replace(col("title"), "/^#$*!+/", ""))

DataFrame[SerNo: int, title: string, year: int, kind: string, rating: int, vote: int, runtime: int, ID: int, genre: string, country: string, language: string, cast: string, director: string, composer: string, writer: string]

In [316]:
Convert_df_final1.show()

+-----+--------------------+----+--------------+------+-----+-------+---+-----------+-------+--------+----+--------+--------+------+
|SerNo|               title|year|          kind|rating| vote|runtime| ID|      genre|country|language|cast|director|composer|writer|
+-----+--------------------+----+--------------+------+-----+-------+---+-----------+-------+--------+----+--------+--------+------+
|    0|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|Documentary|      0|       0|   0|       0|       0|     0|
|    1|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|  Animation|      0|       0|   0|       0|       0|     0|
|    2|     Dinosaur Planet|2003|tv mini series|     7|  474|     50|  0|     Family|      0|       0|   0|       0|       0|     0|
|    3|   Get Up and Dance!|1994|   video movie|     8|   18|     83|  1|     Family|      0|       0|   0|       0|       0|     0|
|    4|               8 Man|1992|         movie|     5|   93|   null|

## Removing the Duplicated Records from the Dataframe

In [318]:
## Get Distinct Rows (By Comparing All Columns)
distinctDF = Convert_df_final1.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)

Distinct count: 338193
+-----+------------------------------------------------+----+--------------+------+-----+-------+----+-----------+-------+--------+----+--------+--------+------+
|SerNo|title                                           |year|kind          |rating|vote |runtime|ID  |genre      |country|language|cast|director|composer|writer|
+-----+------------------------------------------------+----+--------------+------+-----+-------+----+-----------+-------+--------+----+--------+--------+------+
|108  |Elfen Lied                                      |2004|tv mini series|8     |31414|160    |36  |Horror     |0      |0       |0   |0       |0       |0     |
|111  |Elfen Lied                                      |2004|tv mini series|8     |31414|160    |36  |Thriller   |0      |0       |0   |0       |0       |0     |
|265  |The Song Remains the Same                       |1976|movie         |7     |8088 |94     |102 |Music      |0      |0       |0   |0       |0       |0     |
|313 

In [319]:
#Drop duplicates
df2 = Convert_df_final1.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)

Distinct count: 338193
+-----+------------------------------------------------+----+--------------+------+-----+-------+----+-----------+-------+--------+----+--------+--------+------+
|SerNo|title                                           |year|kind          |rating|vote |runtime|ID  |genre      |country|language|cast|director|composer|writer|
+-----+------------------------------------------------+----+--------------+------+-----+-------+----+-----------+-------+--------+----+--------+--------+------+
|108  |Elfen Lied                                      |2004|tv mini series|8     |31414|160    |36  |Horror     |0      |0       |0   |0       |0       |0     |
|111  |Elfen Lied                                      |2004|tv mini series|8     |31414|160    |36  |Thriller   |0      |0       |0   |0       |0       |0     |
|265  |The Song Remains the Same                       |1976|movie         |7     |8088 |94     |102 |Music      |0      |0       |0   |0       |0       |0     |
|313 

In [321]:
#Drop duplicates on selected columns
dropDisDF = Convert_df_final1.dropDuplicates(["title","year","kind","rating","genre","country","language","cast","director","composer","writer"])
print("Distinct count  : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

Distinct count  : 320313
+-----+-------------------------------------+----+--------------+------+------+-------+----+---------+-------+--------+----+--------+--------+------+
|SerNo|title                                |year|kind          |rating|vote  |runtime|ID  |genre    |country|language|cast|director|composer|writer|
+-----+-------------------------------------+----+--------------+------+------+-------+----+---------+-------+--------+----+--------+--------+------+
|43   |Meat Loaf: Bat Out of Hell           |1999|episode       |7     |286   |55     |13  |Music    |0      |0       |0   |0       |0       |0     |
|163  |Arachnid                             |2001|movie         |3     |2853  |82     |59  |Action   |0      |0       |0   |0       |0       |0     |
|277  |Stellvia                             |2003|tv series     |6     |128   |85     |105 |Adventure|0      |0       |0   |0       |0       |0     |
|321  |Blue Seed Beyond                     |1997|movie         |6     |67 

In [322]:
dropDisDF.count()

320313

In [323]:
type(dropDisDF)

pyspark.sql.dataframe.DataFrame

In [324]:
Final_df = dropDisDF

In [325]:
Final_df.count()

320313

## Till here I have done all the Data Pre processing steps and ETL Process

## Calculating Descriptive statistics or Summary of the Dataframe

In [326]:
## Calculating the descriptive statistics 
describe_df = Final_df.select('rating','vote','runtime').describe()

In [327]:
## Getting the Output Summary of the selected columns in a dataframe
describe_df.limit(10).toPandas().head()

Unnamed: 0,summary,rating,vote,runtime
0,count,317792.0,317792.0,252007.0
1,mean,6.1699098781593,31466.856966191724,100.1195284257977
2,stddev,1.2592262614229537,119186.1387313618,72.18991281994407
3,min,1.0,5.0,2.0
4,max,9.0,2461873.0,1620.0


## Perform Group by Aggregation Operations

In [329]:
## Calculating the statistics for the required columns in a dataframes
# Mean value of each group
Final_df.groupby('title').agg({'rating': 'mean','vote':'mean','runtime':'mean','title':'count'}).show()

+--------------------+-----------+------------+------------+---------+
|               title|avg(rating)|avg(runtime)|count(title)|avg(vote)|
+--------------------+-----------+------------+------------+---------+
|         Deep Rising|        6.0|        60.0|          35|  35393.0|
|             Stardom|        5.0|        91.0|         128|   1717.0|
|  A Woman Is a Woman|        7.0|       106.0|          24|  16402.0|
|The Beatles Antho...|        9.0|        null|          27|   8567.0|
|           Cybermutt|        3.0|       111.0|          24|    197.0|
|Bulldog Drummond'...|        6.0|        95.0|          29|    534.0|
|The Real World Yo...|        4.0|        22.0|          12|     76.0|
|In Search of Dr. ...|        7.0|       100.0|          64|    503.0|
|            Dogville|        8.0|        97.0|          58| 143450.0|
|            On Guard|        7.0|        81.0|          79|   3455.0|
|        Camille 2000|        5.0|        null|          22|    697.0|
|Deton

In [330]:
# Mean value of each group
Convert_df_final1.groupby('year').agg({'rating': 'mean','vote':'mean','runtime':'mean','year':'count'}).show()

+----+-----------+------------------+------------------+------------------+
|year|count(year)|       avg(rating)|      avg(runtime)|         avg(vote)|
+----+-----------+------------------+------------------+------------------+
|1959|       3821| 8.373949579831933|  97.0478017114193| 58392.87841386555|
|1990|       5251|6.2623486450124926|114.10917144346014| 22517.33365366135|
|1975|       1814|  6.73634859349145| 94.89138576779027| 31819.39492553778|
|1977|       2565| 6.117738791423002| 102.2224987555998|16874.275633528265|
|1924|         55|6.2727272727272725|              null|2153.2727272727275|
|2003|      24852| 6.048133555384053| 97.75456814777833|31343.479467078272|
|1974|       3227| 6.621141253507951| 98.97070161912104|8180.8316183348925|
|1927|        393| 7.977099236641221| 99.30348258706468| 47964.53944020356|
|1955|        920| 7.067391304347826|105.79371316306484| 13086.15652173913|
|1978|       2800| 6.357857142857143| 91.85831533477322| 36694.08285714286|
|1925|      

In [331]:
# Mean value of each group
Final_df.groupby('country').agg({'rating': 'mean','vote':'mean','runtime':'mean','country':'count'}).show()

+--------------+--------------+------------------+------------------+------------------+
|       country|count(country)|       avg(rating)|      avg(runtime)|         avg(vote)|
+--------------+--------------+------------------+------------------+------------------+
|        Russia|            14|               6.5|              88.0| 5943.071428571428|
|        Sweden|            59| 6.620689655172414| 97.54347826086956|16884.431034482757|
|   Philippines|            17|              4.75| 95.66666666666667|         1110.3125|
|       Germany|           290| 6.166089965397924|100.66666666666667| 37413.52249134948|
|        France|           613| 6.396072013093289|105.82494969818913|23750.504091653027|
|        Greece|             6| 6.333333333333333| 78.66666666666667|3188.1666666666665|
|             0|        310213| 6.169680413944983|100.13187898963974|31820.346613959926|
|        Taiwan|            52|5.9423076923076925|100.88372093023256| 9099.942307692309|
|      Slovakia|     

In [332]:
# Mean value of each group
Final_df.groupby('language').agg({'rating': 'mean','vote':'mean','runtime':'mean','language':'count'}).show()

+---------+------------------+------------------+---------------+------------------+
| language|       avg(rating)|      avg(runtime)|count(language)|         avg(vote)|
+---------+------------------+------------------+---------------+------------------+
|   Bangla| 7.571428571428571|              89.8|              7|  9944.42857142857|
|     Urdu| 6.833333333333333|              93.0|             18|17898.277777777777|
|Aidoukrou|               5.5|             127.0|              2|             208.5|
|  Turkish| 6.647058823529412| 88.93333333333334|             17|38328.294117647056|
|     None| 6.793103448275862| 91.10714285714286|             34|14955.724137931034|
|    Latin| 6.653846153846154| 88.49090909090908|             78| 93453.03846153847|
|  Persian|              6.75|              80.8|             17|        26113.8125|
|  Aramaic|               7.5|              93.5|              2|          124840.5|
|     Thai|5.9655172413793105|112.73684210526316|             29|

In [334]:
# Mean value of each group
Final_df.groupby('cast').agg({'rating': 'mean','vote':'mean','runtime':'mean','cast':'count'}).show()

+-------------------+-----------+-----------------+------------------+------------------+
|               cast|count(cast)|      avg(rating)|      avg(runtime)|         avg(vote)|
+-------------------+-----------+-----------------+------------------+------------------+
|      Eugene Burger|          1|              8.0|              60.0|              19.0|
|   Patrick Robinson|          1|              4.0|              51.0|            6151.0|
|    Stafford Morgan|          3|              6.0|              90.5|119372.66666666667|
|    Hanja Kochansky|          1|              6.0|              null|            3188.0|
|       Oliver Platt|          7|6.428571428571429|128.57142857142858|26637.714285714286|
|   Nastassja Kinski|         12|              5.5| 81.83333333333333|            3149.5|
|      Oscar Sanchez|          1|              6.0|              60.0|           45395.0|
|   Derrick O'Connor|          6|              6.5|              62.0|20994.833333333332|
|  Richard

In [335]:
# Mean value of each group
Final_df.groupby('director').agg({'rating': 'mean','vote':'mean','runtime':'mean','director':'count'}).show()

+--------------------+---------------+-----------------+------------------+------------------+
|            director|count(director)|      avg(rating)|      avg(runtime)|         avg(vote)|
+--------------------+---------------+-----------------+------------------+------------------+
|          Rob Bowman|              1|              7.0|              40.0|          103467.0|
|         John Milius|              2|              6.0|             119.0|            5874.0|
|  Aleksandr Proshkin|              1|              7.0|             146.0|            2119.0|
|       Anne Goursaud|              2|              3.5|              48.5|            5107.0|
|   Wash Westmoreland|              1|              5.0|             111.0|            2929.0|
|      Nicholas Meyer|              4|             6.75|109.33333333333333|          26681.25|
|           Chi Chang|              2|              5.0|             120.0|             210.5|
|     Hussein Erkenov|              1|            

In [336]:
# Mean value of each group
Final_df.groupby('composer').agg({'rating': 'mean','vote':'mean','runtime':'mean','composer':'count'}).show()

+----------------+---------------+-----------------+------------------+-----------------+
|        composer|count(composer)|      avg(rating)|      avg(runtime)|        avg(vote)|
+----------------+---------------+-----------------+------------------+-----------------+
|    Chris Thomas|              1|              6.0|              null|            325.0|
|     Brian Tyler|              3|4.666666666666667|             101.5|           3332.0|
|      Romeo Díaz|              6|              6.5|              79.2|           6190.0|
|        John Nau|              2|              4.0|             105.5|           3769.0|
|   James Bernard|             14|6.142857142857143| 84.58333333333333|           6451.5|
|     Mark Thomas|              3|              5.0|              99.0|5118.666666666667|
|   Bill Weisbach|             11|7.090909090909091|              85.5|59.54545454545455|
|     Jack Beaver|              1|              7.0|              null|          54484.0|
|   Johnny

In [337]:
# Mean value of each group
Final_df.groupby('writer').agg({'rating': 'mean','vote':'mean','runtime':'mean','writer':'count'}).show()

+--------------------+-----------------+------------------+-------------+------------------+
|              writer|      avg(rating)|      avg(runtime)|count(writer)|         avg(vote)|
+--------------------+-----------------+------------------+-------------+------------------+
|      Gail Willumsen|              7.0|             105.5|            2|              96.0|
|      Bennett Tramer|4.333333333333333|             186.0|            3|            2232.0|
|      Pamela Wallace|              7.0|              81.5|            2|           47334.5|
|         Helen Craig|              5.5|             106.0|            2|             213.0|
|         Keith Lango|              6.0|              94.0|            2|             173.0|
|         John Milius|7.333333333333333|              30.0|            3|          231043.0|
|           Ken Burns|              8.5|              93.5|            2|            2603.0|
|    Joseph W. Savino|              5.0|              25.0|           

In [339]:
import pyspark.sql.functions as F
from pyspark.sql.functions import desc

Final_df.groupBy('title').count().sort(desc('count')).show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|The Ten Commandments|  384|
| A Night to Remember|  372|
|           Malcolm X|  364|
|The Greatest Show...|  270|
|    Any Given Sunday|  257|
|The Kentucky Frie...|  248|
|The Watermelon Woman|  247|
|            Zatoichi|  233|
|Star Wars: Episod...|  228|
|Mr. Smith Goes to...|  226|
|             Traffic|  219|
|    The Razor's Edge|  208|
|         Russian Ark|  207|
|            Trekkies|  204|
|             Kingpin|  196|
|Born on the Fourt...|  194|
|    Dawn of the Dead|  193|
|    The Replacements|  187|
|100 Years of the ...|  185|
|   Gods and Generals|  183|
+--------------------+-----+
only showing top 20 rows



In [340]:
Final_df.groupBy('year').count().sort(desc('count')).show()

+----+-----+
|year|count|
+----+-----+
|2003|24255|
|2002|23356|
|2001|22628|
|2004|22434|
|2000|20130|
|1999|17933|
|1998|14688|
|1997|12465|
|1996|11467|
|2005|10070|
|1995| 9173|
|1994| 8182|
|1993| 7648|
|1992| 6247|
|1988| 5513|
|1991| 5325|
|1989| 4983|
|1990| 4942|
|1987| 4709|
|1985| 4407|
+----+-----+
only showing top 20 rows



In [341]:
Final_df.groupBy('genre').count().sort(desc('count')).show()

+-----------+------+
|      genre| count|
+-----------+------+
|          0|299426|
|      Drama|  3467|
|     Comedy|  2302|
|    Romance|  1449|
|     Action|  1365|
|Documentary|  1350|
|   Thriller|  1293|
|      Crime|  1008|
|  Adventure|   994|
|      Music|   864|
|     Sci-Fi|   845|
|  Animation|   800|
|     Family|   779|
|     Horror|   760|
|    Fantasy|   721|
|    Mystery|   634|
|  Biography|   448|
|    History|   417|
|        War|   337|
|    Musical|   316|
+-----------+------+
only showing top 20 rows



In [342]:
Final_df.groupBy('country').count().sort(desc('count')).show()

+--------------+------+
|       country| count|
+--------------+------+
|             0|310213|
| United States|  4473|
|United Kingdom|  1286|
|         Japan|   659|
|        France|   613|
|        Canada|   521|
|     Hong Kong|   294|
|       Germany|   290|
|         Italy|   275|
|         India|   254|
|     Australia|   141|
|         Spain|   140|
|  West Germany|    95|
|        Mexico|    64|
|        Sweden|    59|
|         China|    58|
|   Netherlands|    56|
|       Ireland|    55|
|        Taiwan|    52|
|   South Korea|    51|
+--------------+------+
only showing top 20 rows



In [343]:
Final_df.groupBy('language').count().sort(desc('count')).show()

+----------+------+
|  language| count|
+----------+------+
|         0|309879|
|   English|  6255|
|    French|   666|
|  Japanese|   622|
|   Spanish|   375|
|    German|   331|
|   Italian|   310|
| Cantonese|   265|
|     Hindi|   248|
|  Mandarin|   181|
|   Russian|   134|
|     Latin|    78|
|    Arabic|    61|
|    Korean|    53|
|Portuguese|    53|
|    Hebrew|    52|
|   Swedish|    49|
|    Polish|    38|
|      None|    34|
|    Danish|    30|
+----------+------+
only showing top 20 rows



In [344]:
Final_df.groupBy('director').count().sort(desc('count')).show()

+--------------------+------+
|            director| count|
+--------------------+------+
|                   0|312378|
|          Kevin Dunn|    23|
|      Akira Kurosawa|    18|
|        David Mallet|    18|
|           Jing Wong|    17|
|        Jim Wynorski|    16|
|Beth McCarthy-Miller|    16|
|    Federico Fellini|    15|
|           Tom Clegg|    15|
|       Takashi Miike|    14|
|          Johnnie To|    14|
|       Werner Herzog|    14|
|    Alfred Hitchcock|    14|
|     Lawrence Jordan|    13|
|          Andrew Lau|    13|
|        Ishirô Honda|    13|
|         Dave Wilson|    13|
|   Sammo Kam-Bo Hung|    12|
|   François Truffaut|    12|
|Rainer Werner Fas...|    12|
+--------------------+------+
only showing top 20 rows



In [345]:
Final_df.groupBy('writer').count().sort(desc('count')).show()

+------------------+------+
|            writer| count|
+------------------+------+
|                 0|306863|
|         Jing Wong|    29|
|   Caroline Graham|    27|
|     Sydney Newman|    27|
|      Joel Hodgson|    25|
|      Stephen King|    23|
|    Akira Toriyama|    21|
| Michael J. Nelson|    21|
|Arthur Conan Doyle|    19|
|   Agatha Christie|    17|
|        Tony Geiss|    16|
|    Akira Kurosawa|    16|
|    Trace Beaulieu|    16|
|      Takao Koyama|    15|
|     Werner Herzog|    14|
|  Federico Fellini|    14|
|     Yôsuke Kuroda|    13|
|  Elizabeth George|    13|
|       Kazuo Koike|    13|
|  Bernard Cornwell|    13|
+------------------+------+
only showing top 20 rows



In [347]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
windowDept = Window.partitionBy("genre").orderBy(col("rating").desc())
Final_df.select('genre','rating').withColumn("row",row_number().over(windowDept)) \
  .filter(col("row") <= 2) \
  .show()

+-----------+------+---+
|      genre|rating|row|
+-----------+------+---+
|          0|     9|  1|
|          0|     9|  2|
|     Action|     9|  1|
|     Action|     9|  2|
|  Adventure|     9|  1|
|  Adventure|     9|  2|
|  Animation|     9|  1|
|  Animation|     9|  2|
|  Biography|     9|  1|
|  Biography|     8|  2|
|     Comedy|     9|  1|
|     Comedy|     9|  2|
|      Crime|     9|  1|
|      Crime|     9|  2|
|Documentary|     9|  1|
|Documentary|     9|  2|
|      Drama|     9|  1|
|      Drama|     9|  2|
|     Family|     9|  1|
|     Family|     9|  2|
+-----------+------+---+
only showing top 20 rows



In [348]:
# Max value of each group
Final_df.groupby('title').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+--------------------+---------------+-----------+------------+---------+
|               title|count(composer)|max(rating)|max(runtime)|max(vote)|
+--------------------+---------------+-----------+------------+---------+
|         Deep Rising|             35|          6|          60|    35393|
|             Stardom|            128|          5|          91|     1717|
|  A Woman Is a Woman|             24|          7|         106|    16402|
|The Beatles Antho...|             27|          9|        null|     8567|
|           Cybermutt|             24|          3|         111|      197|
|Bulldog Drummond'...|             29|          6|          95|      534|
|The Real World Yo...|             12|          4|          22|       76|
|In Search of Dr. ...|             64|          7|         100|      503|
|            Dogville|             58|          8|          97|   143450|
|            On Guard|             79|          7|          81|     3455|
|        Camille 2000|             22|

In [349]:
# Max value of each group
Final_df.groupby('year').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+----+---------------+-----------+------------+---------+
|year|count(composer)|max(rating)|max(runtime)|max(vote)|
+----+---------------+-----------+------------+---------+
|1959|           1349|          9|         246|   254334|
|1990|           4942|          9|         701|   347102|
|1975|           1803|          8|         181|   517321|
|1977|           2558|          8|         195|   191830|
|1924|             55|          7|        null|     2504|
|2003|          24255|          9|        1335|  1708569|
|1974|           2578|          8|         240|    23682|
|1927|            200|          8|         105|   166632|
|1955|            913|          8|         178|    63838|
|1978|           2748|          8|         171|   253148|
|1925|            106|          8|         241|    55411|
|1961|            748|          8|         192|   117088|
|1942|            518|          8|         128|    33657|
|1944|            418|          8|         137|    44752|
|1939|        

In [350]:
# Max value of each group
Final_df.groupby('genre').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+-----------+---------------+-----------+------------+---------+
|      genre|count(composer)|max(rating)|max(runtime)|max(vote)|
+-----------+---------------+-----------+------------+---------+
|      Crime|           1008|          9|        1380|  1907147|
|    Romance|           1449|          9|         701|  1091624|
|   Thriller|           1293|          9|         960|  1080665|
|  Adventure|            994|          9|         780|  1708569|
|          0|         299426|          9|        1620|  2461873|
|      Drama|           3467|          9|        1380|  2461873|
|        War|            337|          9|         545|  1291015|
|Documentary|           1350|          9|        1620|   142175|
| Reality-TV|             36|          9|         188|    21330|
|     Family|            779|          9|         883|   982922|
|    Fantasy|            721|          9|         883|  1708569|
|  Game-Show|              8|          7|         100|    21330|
|    History|            

In [351]:
# Max value of each group
Final_df.groupby('country').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+--------------+---------------+-----------+------------+---------+
|       country|count(composer)|max(rating)|max(runtime)|max(vote)|
+--------------+---------------+-----------+------------+---------+
|        Russia|             14|          7|         107|    19878|
|        Sweden|             59|          8|         360|   175207|
|   Philippines|             17|          7|         180|     4710|
|       Germany|            290|          9|         778|   724608|
|        France|            613|          9|        1620|  1080665|
|        Greece|              6|          7|         113|     6611|
|             0|         310213|          9|        1620|  2461873|
|        Taiwan|             52|          8|         180|   260618|
|      Slovakia|              2|          7|          94|     3484|
|     Argentina|             30|          7|         195|    99100|
|       Belgium|             41|          7|         180|    75817|
| United States|           4473|          9|    

In [352]:
# Max value of each group
Final_df.groupby('language').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+---------+---------------+-----------+------------+---------+
| language|count(composer)|max(rating)|max(runtime)|max(vote)|
+---------+---------------+-----------+------------+---------+
|   Bangla|              7|          8|         120|    28376|
|     Urdu|             18|          8|         160|   128128|
|Aidoukrou|              2|          7|         149|      341|
|  Turkish|             17|          8|         143|   279303|
|     None|             34|          8|         241|   166632|
|    Latin|             78|          9|         185|  1702698|
|  Persian|             17|          8|         118|   165876|
|  Aramaic|              2|          8|         100|   222681|
|     Thai|             29|          8|         375|   320553|
|Icelandic|              5|          7|         240|     9537|
|        0|         309879|          9|        1620|  2461873|
|   Quenya|              1|          8|         105|  1708569|
|  Klingon|              5|          8|         115|   

In [353]:
# Max value of each group
Final_df.groupby('director').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+--------------------+---------------+-----------+------------+---------+
|            director|count(composer)|max(rating)|max(runtime)|max(vote)|
+--------------------+---------------+-----------+------------+---------+
|          Rob Bowman|              1|          7|          40|   103467|
|         John Milius|              2|          7|         119|     6782|
|  Aleksandr Proshkin|              1|          7|         146|     2119|
|       Anne Goursaud|              2|          4|          91|     5772|
|   Wash Westmoreland|              1|          5|         111|     2929|
|      Nicholas Meyer|              4|          7|         145|    72988|
|           Chi Chang|              2|          5|         120|      254|
|     Hussein Erkenov|              1|          5|          91|      413|
|      Martin Howells|              1|          7|        null|       88|
|       Martyn Atkins|              6|          8|         156|      302|
|       Denis Mueller|              1|

In [354]:
# Max value of each group
Final_df.groupby('writer').agg({'rating': 'max','vote':'max','runtime':'max','composer':'count'}).show()

+--------------------+---------------+-----------+------------+---------+
|              writer|count(composer)|max(rating)|max(runtime)|max(vote)|
+--------------------+---------------+-----------+------------+---------+
|      Gail Willumsen|              2|          7|         112|      138|
|      Bennett Tramer|              3|          5|         265|     3675|
|      Pamela Wallace|              2|          7|          98|    90461|
|         Helen Craig|              2|          6|         123|      416|
|         Keith Lango|              2|          6|          94|      173|
|         John Milius|              3|          8|          30|   629404|
|           Ken Burns|              2|          9|          96|     4028|
|    Joseph W. Savino|              1|          5|          25|     1612|
|      Nicholas Meyer|              3|          7|         150|   124316|
|        Ray Santilli|              2|          7|         102|       65|
|        Charles Dyer|              1|