# Phase 3

### Importing the required libraries

In [1]:
!pip install pyspark



In [2]:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
import time

## Loading with PySpark

In [3]:
start_time_pyspark = time.time()

spark = SparkSession.builder.getOrCreate()

df = spark.read.csv("netflix_titles.csv", header=True)

end_time_pyspark = time.time()

time_difference_pyspark = end_time_pyspark - start_time_pyspark

pandas_df = df.toPandas()

spark.stop()

In [4]:
pandas_df.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."
2,s3,TV Show,Ganglands,Julien Leclercq,"Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...",,"September 24, 2021",2021,TV-MA,1 Season,"Crime TV Shows, International TV Shows, TV Act...",To protect his family from a powerful drug lor...
3,s4,TV Show,Jailbirds New Orleans,,,,"September 24, 2021",2021,TV-MA,1 Season,"Docuseries, Reality TV","Feuds, flirtations and toilet talk go down amo..."
4,s5,TV Show,Kota Factory,,"Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...",India,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, Romantic TV Shows, TV ...",In a city of coaching centers known to train I...


## Loading data using Pandas


In [12]:
start_time = time.time()
test_df_load = pd.read_csv('netflix_titles.csv')
end_time = time.time()
time_difference_pandas = end_time - start_time

## Time difference of loading our dataset with PySpark and Pandas

In [13]:
print("Time for data loading using pandas: ", time_difference_pandas)
print("Time for data loading using pyspark: ", time_difference_pyspark)
print( "Time difference between pyspark and pandas dataframe: ", time_difference_pyspark - time_difference_pandas)

Time for data loading using pandas:  0.13768243789672852
Time for data loading using pyspark:  29.427265644073486
Time difference between pyspark and pandas dataframe:  29.289583206176758


### **Word count using Pandas for country column**

In [7]:
start_time_wordcount_panda = time.time()

combined_text = ' '.join(test_df_load['country'].astype(str).tolist())
combined_text = combined_text.replace(",", " ")
combined_text = combined_text.replace("United States", "UnitedStates")
combined_text = combined_text.replace("United Kingdom", "UnitedKingdom")
combined_text = combined_text.replace("United Arab Emirates", "UnitedArabEmirates")
combined_text = combined_text.replace("Hong Kong", "HongKong")
combined_text = combined_text.replace("New Zealand", "NewZealand")
combined_text = combined_text.replace("Saudi Arabia", "SaudiArabia")
combined_text = combined_text.replace("Czech Republic", "CzechRepublic")
combined_text = combined_text.replace("South Korea", "SouthKorea")
combined_text = combined_text.replace("West Germany", "WestGermany")
combined_text = combined_text.replace("East Germany", "EastGermany")
combined_text = combined_text.replace("Cayman Islands", "CaymanIslands")
combined_text = combined_text.replace("South Africa", "SouthAfrica")
combined_text = combined_text.replace("Soviet Union", "SovietUnion")
combined_text = combined_text.replace("Sri Lanka", "SriLanka")
combined_text = combined_text.replace("Vatican City", "VaticanCity")
combined_text = combined_text.replace("Dominican Republic", "DominicanRepublic")

word_counts = pd.Series(combined_text.split()).value_counts()

end_time_wordcount_panda = time.time()

time_difference_wordcount_panda = end_time_wordcount_panda - start_time_wordcount_panda

In [8]:
word_counts

UnitedStates     3690
India            1046
nan               831
UnitedKingdom     806
Canada            445
                 ... 
Bermuda             1
Ecuador             1
Armenia             1
Mongolia            1
Montenegro          1
Length: 125, dtype: int64

### Word count using SparkContext for country column

In [9]:
scnew = SparkContext("local", "WordCount")

country_rdd = scnew.parallelize(pandas_df['country'].astype(str).tolist())

start_time_wordcount_sparkc = time.time()
cleaned_rdd = country_rdd.map(lambda line: line.replace(",", " ")
                                    .replace("United States", "UnitedStates")
                                    .replace("United Kingdom", "UnitedKingdom")
                                    .replace("United Arab Emirates", "UnitedArabEmirates")
                                    .replace("Hong Kong", "HongKong")
                                    .replace("New Zealand", "NewZealand")
                                    .replace("Saudi Arabia", "SaudiArabia")
                                    .replace("Czech Republic", "CzechRepublic")
                                    .replace("South Korea", "SouthKorea")
                                    .replace("West Germany", "WestGermany")
                                    .replace("East Germany", "EastGermany")
                                    .replace("Cayman Islands", "CaymanIslands")
                                    .replace("South Africa", "SouthAfrica")
                                    .replace("Soviet Union", "SovietUnion")
                                    .replace("Sri Lanka", "SriLanka")
                                    .replace("Vatican City", "VaticanCity")
                                    .replace("Dominican Republic", "DominicanRepublic"))


word_counts = cleaned_rdd.flatMap(lambda line: line.split()) \
                         .map(lambda word: (word, 1)) \
                         .reduceByKey(lambda a, b: a + b) \
                         .sortBy(lambda x: x[1], ascending=False)

word_counts = word_counts.collect()
end_time_wordcount_sparkc = time.time()

scnew.stop()

time_difference_wordcount_sparkc = end_time_wordcount_sparkc - start_time_wordcount_sparkc


In [10]:
word_counts

[('UnitedStates', 3676),
 ('India', 1046),
 ('None', 832),
 ('UnitedKingdom', 805),
 ('Canada', 445),
 ('France', 392),
 ('Japan', 318),
 ('SouthKorea', 231),
 ('Spain', 230),
 ('Germany', 224),
 ('Mexico', 169),
 ('China', 162),
 ('Australia', 160),
 ('Egypt', 117),
 ('Turkey', 113),
 ('HongKong', 105),
 ('Nigeria', 101),
 ('Italy', 100),
 ('Brazil', 97),
 ('Argentina', 91),
 ('Belgium', 90),
 ('Indonesia', 90),
 ('Taiwan', 89),
 ('Philippines', 83),
 ('Thailand', 70),
 ('SouthAfrica', 62),
 ('Colombia', 52),
 ('Netherlands', 50),
 ('Denmark', 48),
 ('Ireland', 46),
 ('Sweden', 42),
 ('Singapore', 41),
 ('Poland', 41),
 ('UnitedArabEmirates', 37),
 ('NewZealand', 33),
 ('Lebanon', 31),
 ('Israel', 30),
 ('Norway', 30),
 ('Chile', 29),
 ('Russia', 27),
 ('Malaysia', 26),
 ('Pakistan', 24),
 ('CzechRepublic', 22),
 ('Switzerland', 19),
 ('Romania', 14),
 ('Uruguay', 14),
 ('SaudiArabia', 13),
 ('Austria', 12),
 ('Luxembourg', 12),
 ('Finland', 11),
 ('Greece', 11),
 ('Hungary', 11),
 ('

### Time difference for word count using Pandas vs SparkContext for country column


In [11]:
print("Time for word count for country using pandas: ", time_difference_wordcount_panda)
print("Time for word count country using SparkContext: ", time_difference_wordcount_sparkc)
print( "Time difference between pyspark and pandas dataframe: ", time_difference_wordcount_sparkc - time_difference_wordcount_panda)

Time for word count for country using pandas:  0.011105775833129883
Time for word count country using SparkContext:  3.609203577041626
Time difference between pyspark and pandas dataframe:  3.598097801208496
