In [14]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [16]:
import pyspark.sql.functions

In [17]:
from pyspark import SparkContext as sc

In [18]:
print(sc.version)

<property object at 0x7f5a7437d620>


In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("NetflixEDA").getOrCreate()

In [20]:
spark

Not sure why the appname is set to default, that shouldn't be the case.

In [31]:
import pandas as pd
df1 = pd.read_csv("/content/netflix price in different countries.csv")

In [32]:
df1.head()

Unnamed: 0,Country,Total Library Size,No. of TV Shows,No. of Movies,Cost Per Month - Basic ($),Cost Per Month - Standard ($),Cost Per Month - Premium ($)
0,Argentina,4760,3154,1606,3.74,6.3,9.26
1,Austria,5640,3779,1861,9.03,14.67,20.32
2,Bolivia,4991,3155,1836,7.99,10.99,13.99
3,Bulgaria,6797,4819,1978,9.03,11.29,13.54
4,Chile,4994,3156,1838,7.07,9.91,12.74


In [22]:
PATH = "/content/netflix price in different countries.csv"

In [23]:
data = spark.read.csv(PATH, inferSchema = True, header = True)

In [24]:
data.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Total Library Size: integer (nullable = true)
 |-- No. of TV Shows: integer (nullable = true)
 |-- No. of Movies: integer (nullable = true)
 |-- Cost Per Month - Basic ($): double (nullable = true)
 |-- Cost Per Month - Standard ($): double (nullable = true)
 |-- Cost Per Month - Premium ($): double (nullable = true)



In [25]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

In [33]:
manualSchema = StructType([

    StructField("Country", StringType(), True),
    StructField("Total Library Size", IntegerType(), True),
    StructField("No. of TV Shows", IntegerType(), True),
    StructField("No. of Movies", IntegerType(), True),
    StructField("Cost Per Month - Basic ($)", FloatType(), True),
    StructField("Cost Per Month - Standard ($)", FloatType(), True),
    StructField("Cost Per Month - Premium ($)", FloatType(), True),
])

In [34]:
df = spark.read.csv(PATH, schema=manualSchema, header=True)

In [35]:
type(df)

pyspark.sql.dataframe.DataFrame

In [36]:
new_names = [

    'country',
    'totalLibrarySize',
    'numberTVShows',
    'numberMovies',
    'monthlyCostBasic',
    'monthlyCostStandard',
    'monthlyCostPremium'
]

df = df.toDF(*new_names)

In [37]:
df.head(5)

[Row(country='Argentina', totalLibrarySize=4760, numberTVShows=3154, numberMovies=1606, monthlyCostBasic=3.740000009536743, monthlyCostStandard=6.300000190734863, monthlyCostPremium=9.260000228881836),
 Row(country='Austria', totalLibrarySize=5640, numberTVShows=3779, numberMovies=1861, monthlyCostBasic=9.029999732971191, monthlyCostStandard=14.670000076293945, monthlyCostPremium=20.31999969482422),
 Row(country='Bolivia', totalLibrarySize=4991, numberTVShows=3155, numberMovies=1836, monthlyCostBasic=7.989999771118164, monthlyCostStandard=10.989999771118164, monthlyCostPremium=13.989999771118164),
 Row(country='Bulgaria', totalLibrarySize=6797, numberTVShows=4819, numberMovies=1978, monthlyCostBasic=9.029999732971191, monthlyCostStandard=11.289999961853027, monthlyCostPremium=13.539999961853027),
 Row(country='Chile', totalLibrarySize=4994, numberTVShows=3156, numberMovies=1838, monthlyCostBasic=7.070000171661377, monthlyCostStandard=9.90999984741211, monthlyCostPremium=12.739999771118

In [38]:
df.limit(5).toPandas()

Unnamed: 0,country,totalLibrarySize,numberTVShows,numberMovies,monthlyCostBasic,monthlyCostStandard,monthlyCostPremium
0,Argentina,4760,3154,1606,3.74,6.3,9.26
1,Austria,5640,3779,1861,9.03,14.67,20.32
2,Bolivia,4991,3155,1836,7.99,10.99,13.99
3,Bulgaria,6797,4819,1978,9.03,11.29,13.54
4,Chile,4994,3156,1838,7.07,9.91,12.74


In [39]:
df.describe().show()

+-------+---------+-----------------+------------------+------------------+------------------+-------------------+------------------+
|summary|  country| totalLibrarySize|     numberTVShows|      numberMovies|  monthlyCostBasic|monthlyCostStandard|monthlyCostPremium|
+-------+---------+-----------------+------------------+------------------+------------------+-------------------+------------------+
|  count|       65|               65|                65|                65|                65|                 65|                65|
|   mean|     null|5314.415384615385|3518.9538461538464|1795.4615384615386|  8.36846140714792|  11.98999995451707|15.612922998575064|
| stddev|     null|980.3226333124293| 723.0105555671636| 327.2797483099835|1.9378186644783795| 2.8639787613790824| 4.040672256237366|
|    min|Argentina|             2274|              1675|               373|              1.97|                3.0|              4.02|
|    max|Venezuela|             7325|              5234|      

In [40]:
df.select( "country").show(df.count())

+--------------+
|       country|
+--------------+
|     Argentina|
|       Austria|
|       Bolivia|
|      Bulgaria|
|         Chile|
|      Colombia|
|    Costa Rica|
|       Croatia|
|       Czechia|
|       Ecuador|
|       Estonia|
|        France|
|       Germany|
|     Gibraltar|
|        Greece|
|     Guatemala|
|      Honduras|
|     Hong Kong|
|       Iceland|
|         India|
|        Israel|
|         Italy|
|         Japan|
| Liechtenstein|
|      Malaysia|
|        Mexico|
|       Moldova|
|        Monaco|
|        Norway|
|      Paraguay|
|          Peru|
|   Philippines|
|        Poland|
|       Romania|
|        Russia|
|    San Marino|
|      Slovakia|
|  South Africa|
|   South Korea|
|        Taiwan|
|      Thailand|
|        Turkey|
|       Ukraine|
|       Uruguay|
|     Venezuela|
|       Belgium|
|     Singapore|
|       Finland|
|        Latvia|
|   New Zealand|
|       Hungary|
|      Portugal|
|   Netherlands|
|        Sweden|
|        Canada|
|     Lithuani

In [41]:
df.count()

65

In [42]:
import geopandas as gpd
import matplotlib.pyplot as plt
import pycountry

ModuleNotFoundError: ignored

In [43]:
!pip install pycountry

Collecting pycountry
  Downloading pycountry-22.3.5.tar.gz (10.1 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/10.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/10.1 MB[0m [31m35.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━[0m [32m8.3/10.1 MB[0m [31m120.9 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m10.1/10.1 MB[0m [31m131.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m88.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pycountry
  Building wheel for pycountry (pyproject.toml) ... [?25l[?2

In [44]:
import pycountry

In [45]:
countries = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
countries.head()

  countries = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))


Unnamed: 0,pop_est,continent,name,iso_a3,gdp_md_est,geometry
0,889953.0,Oceania,Fiji,FJI,5496,"MULTIPOLYGON (((180.00000 -16.06713, 180.00000..."
1,58005463.0,Africa,Tanzania,TZA,63177,"POLYGON ((33.90371 -0.95000, 34.07262 -1.05982..."
2,603253.0,Africa,W. Sahara,ESH,907,"POLYGON ((-8.66559 27.65643, -8.66512 27.58948..."
3,37589262.0,North America,Canada,CAN,1736425,"MULTIPOLYGON (((-122.84000 49.00000, -122.9742..."
4,328239523.0,North America,United States of America,USA,21433226,"MULTIPOLYGON (((-122.84000 49.00000, -120.0000..."


In [46]:
countries_data = df.select( "country").toPandas()
countries_data.head()

Unnamed: 0,country
0,Argentina
1,Austria
2,Bolivia
3,Bulgaria
4,Chile


In [47]:
print(countries_data.shape)
countries_data.head()

(65, 1)


Unnamed: 0,country
0,Argentina
1,Austria
2,Bolivia
3,Bulgaria
4,Chile


In [51]:
print(df.toPandas())

          country  totalLibrarySize  numberTVShows  numberMovies  \
0       Argentina              4760           3154          1606   
1         Austria              5640           3779          1861   
2         Bolivia              4991           3155          1836   
3        Bulgaria              6797           4819          1978   
4           Chile              4994           3156          1838   
..            ...               ...            ...           ...   
60        Ireland              6486           4515          1971   
61    Switzerland              5506           3654          1852   
62      Australia              6114           4050          2064   
63        Denmark              4558           2978          1580   
64  United States              5818           3826          1992   

    monthlyCostBasic  monthlyCostStandard  monthlyCostPremium  
0               3.74             6.300000            9.260000  
1               9.03            14.670000           20.

In [53]:
1from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-------+----------------+-------------+------------+----------------+-------------------+------------------+
|country|totalLibrarySize|numberTVShows|numberMovies|monthlyCostBasic|monthlyCostStandard|monthlyCostPremium|
+-------+----------------+-------------+------------+----------------+-------------------+------------------+
|      0|               0|            0|           0|               0|                  0|                 0|
+-------+----------------+-------------+------------+----------------+-------------------+------------------+



In [54]:
no_str_df = df.drop( "country")

In [57]:
!pip install pandas_profiling

Collecting pandas_profiling
  Downloading pandas_profiling-3.6.6-py2.py3-none-any.whl (324 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/324.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m324.4/324.4 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ydata-profiling (from pandas_profiling)
  Downloading ydata_profiling-4.3.1-py2.py3-none-any.whl (352 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m353.0/353.0 kB[0m [31m32.1 MB/s[0m eta [36m0:00:00[0m
Collecting visions[type_image_path]==0.7.5 (from ydata-profiling->pandas_profiling)
  Downloading visions-0.7.5-py3-none-any.whl (102 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m102.7/102.7 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
Collecting htmlmin==0.1.12 (from ydata-profiling->pandas_profiling)
  Downloading htmlmin-0.1.12.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
C

In [None]:
from pandas_profiling import ProfileReport

profile = ProfileReport(no_str_df.toPandas(), title="Report", explorative=True)
profile.to_widgets()

  from pandas_profiling import ProfileReport


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render widgets:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
from ydata_profiling import ProfileReport

profile = ProfileReport(no_str_df.toPandas(), title="Report", explorative=True)
profile.to_widgets()



Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render widgets:   0%|          | 0/1 [00:00<?, ?it/s]

#Pearson's Correlation

The Pearson correlation coefficient is calculated as follows:

rp=COV(X,Y)/(std(X)∗std(Y))

#where COV is the covariance between the 2 variables and std the standard deviation.

#PySpark ML

In [61]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

vector_col = "corr_vars"
assembler = VectorAssembler(inputCols=no_str_df.columns, outputCol=vector_col)
df_vector = assembler.transform(no_str_df).select(vector_col)
corr_matrix = Correlation.corr(df_vector, vector_col).collect()[0][0].toArray().tolist()

In [62]:
corr_matrix_df = spark.createDataFrame(corr_matrix, no_str_df.columns)
corr_matrix_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    totalLibrarySize|       numberTVShows|        numberMovies|    monthlyCostBasic| monthlyCostStandard|  monthlyCostPremium|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 1.0|  0.9711459692193751|   0.849957408017941|-0.06287684999485418|-0.05519986931782358|-0.07152105693862007|
|  0.9711459692193751|                 1.0|  0.6997860988691629|-0.00381400892363...|0.002284882447594...|-0.01109980833468...|
|   0.849957408017941|  0.6997860988691629|                 1.0|-0.17991345555167165|-0.17039146376748404|-0.18971058430966609|
|-0.06287684999485418|-0.00381400892363...|-0.17991345555167165|                 1.0|  0.8985348251609334|  0.8209140310212802|
|-0.05519986931782358|0.002284882447594...|-0.17039146376748404|  0.8985348251609334|                 1.

In [63]:
plot_corr = corr_matrix_df.toPandas()
plot_corr.index = corr_matrix_df.columns
plot_corr.style.background_gradient(cmap='Blues')

Unnamed: 0,totalLibrarySize,numberTVShows,numberMovies,monthlyCostBasic,monthlyCostStandard,monthlyCostPremium
totalLibrarySize,1.0,0.971146,0.849957,-0.062877,-0.0552,-0.071521
numberTVShows,0.971146,1.0,0.699786,-0.003814,0.002285,-0.0111
numberMovies,0.849957,0.699786,1.0,-0.179913,-0.170391,-0.189711
monthlyCostBasic,-0.062877,-0.003814,-0.179913,1.0,0.898535,0.820914
monthlyCostStandard,-0.0552,0.002285,-0.170391,0.898535,1.0,0.978768
monthlyCostPremium,-0.071521,-0.0111,-0.189711,0.820914,0.978768,1.0


 If we needed to train a statiscal model using this features, this would be a problem as some algorithms like linear regression suffer from multicolinearity. We could either drop the totalLibrarySize variable or, alternatively, drop numberTVShows and numberMovies

In [64]:
from pyspark.sql.functions import avg

In [65]:
cols = [col('monthlyCostBasic'), col('monthlyCostStandard'), col('monthlyCostPremium')]
avgColsFunc = sum(x for x in cols) / len(cols)

In [66]:
avgColsFunc

Column<'((((monthlyCostBasic + 0) + monthlyCostStandard) + monthlyCostPremium) / 3)'>

In [67]:
dfWithAvg = df.withColumn('AverageSubsCost', avgColsFunc)
dfWithAvg.show(5)

+---------+----------------+-------------+------------+----------------+-------------------+------------------+------------------+
|  country|totalLibrarySize|numberTVShows|numberMovies|monthlyCostBasic|monthlyCostStandard|monthlyCostPremium|   AverageSubsCost|
+---------+----------------+-------------+------------+----------------+-------------------+------------------+------------------+
|Argentina|            4760|         3154|        1606|            3.74|                6.3|              9.26| 6.433333079020183|
|  Austria|            5640|         3779|        1861|            9.03|              14.67|             20.32|14.673333485921225|
|  Bolivia|            4991|         3155|        1836|            7.99|              10.99|             13.99|10.990000406901041|
| Bulgaria|            6797|         4819|        1978|            9.03|              11.29|             13.54|11.286666870117188|
|    Chile|            4994|         3156|        1838|            7.07|           

In [68]:
dfWithAvg.select("country", "AverageSubsCost").orderBy(col("AverageSubsCost").desc()).limit(10).show()

+-------------+------------------+
|      country|   AverageSubsCost|
+-------------+------------------+
|Liechtenstein|20.099999745686848|
|  Switzerland|20.099999745686848|
|      Denmark|15.546666463216146|
|       France|15.240000406901041|
|      Belgium|15.240000406901041|
|       Israel|15.050000508626303|
|       Sweden|14.933333079020182|
|      Austria|14.673333485921225|
|   San Marino|14.673333485921225|
|    Gibraltar|14.673333485921225|
+-------------+------------------+



In [69]:
dfWithAvgRel = dfWithAvg.withColumn('AverageSubsCostPerTitle', dfWithAvg.AverageSubsCost / dfWithAvg.totalLibrarySize)
dfWithAvgRel.show(5)

+---------+----------------+-------------+------------+----------------+-------------------+------------------+------------------+-----------------------+
|  country|totalLibrarySize|numberTVShows|numberMovies|monthlyCostBasic|monthlyCostStandard|monthlyCostPremium|   AverageSubsCost|AverageSubsCostPerTitle|
+---------+----------------+-------------+------------+----------------+-------------------+------------------+------------------+-----------------------+
|Argentina|            4760|         3154|        1606|            3.74|                6.3|              9.26| 6.433333079020183|   0.001351540562819366|
|  Austria|            5640|         3779|        1861|            9.03|              14.67|             20.32|14.673333485921225|   0.002601654873390288|
|  Bolivia|            4991|         3155|        1836|            7.99|              10.99|             13.99|10.990000406901041|   0.002201963615888808|
| Bulgaria|            6797|         4819|        1978|            9.0

In [70]:
# Top 10 countries with most expensive subscriptions in relative terms
dfWithAvgRel.select("country", "AverageSubsCostPerTitle") \
    .orderBy(col("AverageSubsCostPerTitle").desc()).limit(10).show()

+-------------+-----------------------+
|      country|AverageSubsCostPerTitle|
+-------------+-----------------------+
|Liechtenstein|   0.006594488105540305|
|   San Marino|   0.006352092418147716|
|      Croatia|   0.004963353944642562|
|  Switzerland|   0.003650562975969...|
|       Sweden|   0.003424291006425...|
|      Denmark|   0.003410852668542...|
|      Finland|   0.003348166234315...|
|      Belgium|   0.003054108297976...|
|       Norway|   0.002934334556121287|
|      Moldova|   0.002866819118648003|
+-------------+-----------------------+



In [71]:
# Top 10 countries with less expensive subscriptions in absolute terms
dfWithAvg.select("country", "AverageSubsCost") \
    .orderBy(col("AverageSubsCost").asc()).limit(10).show()

+------------+-----------------+
|     country|  AverageSubsCost|
+------------+-----------------+
|      Turkey|2.996666590372721|
|       India|5.950000127156575|
|   Argentina|6.433333079020183|
|    Colombia|7.033333460489909|
|      Brazil|7.226666768391927|
|     Ukraine|8.463333129882812|
|        Peru| 8.56000010172526|
| Philippines|9.140000025431315|
|South Africa|9.630000432332357|
|       Chile| 9.90666643778483|
+------------+-----------------+



In [72]:
# Top 10 countries with most expensive subscriptions in relative terms
dfWithAvgRel.select("country", "AverageSubsCostPerTitle") \
    .orderBy(col("AverageSubsCostPerTitle").asc()).limit(10).show()

+-----------+-----------------------+
|    country|AverageSubsCostPerTitle|
+-----------+-----------------------+
|     Turkey|   6.459725351094463E-4|
|      India|   0.001018312532458767|
|  Argentina|   0.001351540562819366|
|   Colombia|   0.001409203257962314|
|Philippines|   0.001436655143890...|
|     Brazil|   0.001453472801366035|
|    Hungary|   0.001555781574090...|
|    Czechia|   0.001568600738143487|
|    Ukraine|   0.001586081920892581|
|   Slovakia|   0.001604359185517...|
+-----------+-----------------------+

