In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
!ls /usr/lib/jvm

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [None]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import sys, tempfile, urllib.request
from pyspark.sql.functions import *

In [None]:
BASE_DIR = '/tmp'
CORONA_DATA_FILE=os.path.join(BASE_DIR, 'corona_data.csv')
TWITTER_DATA_FILE=os.path.join(BASE_DIR, 'twitter_data.csv')

In [None]:
corona_data=urllib.request.urlretrieve('https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/corona_dataset_latest.csv',CORONA_DATA_FILE)
tweet_data=urllib.request.urlretrieve('https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/tweets.csv',TWITTER_DATA_FILE)

In [None]:
!ls /tmp

blockmgr-0564c31a-3dda-485b-ad3b-6497aeb1dc5e
corona_data.csv
dap_multiplexer.f4bfde816064.root.log.INFO.20210729-134839.55
dap_multiplexer.INFO
debugger_250i81c8km
hsperfdata_root
initgoogle_syslog_dir.0
spark-041fa921-463a-4a8c-ac15-773e20be038a
spark-aa1a9606-db59-41a0-9cdd-2de0a19a5525
twitter_data.csv


In [None]:
corona_df=spark.read.option("inferSchema","true").csv("/tmp/corona_data.csv",header=True)
#.csv() can be replaced based on the file & location it's being imported from. It can be .parque for parque files and it
#can also be an S3 URL, or a HDFS location
corona_df.show()

+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|_c0|           State|             Country|     Lat|     Long|      Date|Confirmed|Death|Recovered|       state_cleaned|City|
+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|  0|            null|            Thailand|    15.0|    101.0|2020-01-22|        2|    0|        0|             Bangkok|null|
|  1|            null|               Japan|    36.0|    138.0|2020-01-22|        2|    0|        0|             Hiraide|null|
|  2|            null|           Singapore|  1.2833| 103.8333|2020-01-22|        0|    0|        0|           Singapore|null|
|  3|            null|               Nepal| 28.1667|    84.25|2020-01-22|        0|    0|        0|           Kathmandu|null|
|  4|            null|            Malaysia|     2.5|    112.5|2020-01-22|        0|    0|        0|             Sarawa

In [None]:
corona_df.count()

28143

In [None]:
twitter_df=spark.read.option("inferSchema","true").csv("/tmp/twitter_data.csv",header=True)
#without inferSchema, spark will set all data as one single string. Setting this to true, asks Spark to understand the
#schema & create a dataframe out of the csv file
twitter_df.show()

+---+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|            location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|               Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|                -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          

In [None]:
twitter_df.count()

1000

#### Below we see some Transformations. Transformations create new dataframes as outputs. The dataframes are immutable. Transformations are functions that take our raw data & create a new aggregated view of the raw data, after running some transformations. The new dataframes are created as essentially transformation is creating a DAG & it tells the SparkContext the plan for executing the transformation when the actual ACTION is submitted. ####

In [None]:
twitter_df.filter("country='USA'").show()
# show() is an ACTION variable & runs the TRANSFORMATION filter()

+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|            location|            entities|           sentiment|country|
+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|  2|null| #Coronavirus tes...|         cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|    USA|
|  3|null| Get ready for ma...|   InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|    USA|
|  5|null| COVID-19 update ...| StewartNgilana|Durban | Port Eli...|[('Italy', 'GPE')...|{'neg': 0.178, 'n...|    USA|
|  6|null| It’s painful to ...|      BWheatnyc|             Florida|                  []|{'neg': 0.098, 'n...|    USA|
|  8|null| Questions about ...|    straightj23|        Columbus, OH|[('NAfME', 'CARDI...|{'neg': 0.0, 'neu...|    USA|
| 13|null| We’re the heck w...| harrytiffanyiv| 

In [None]:
twitter_df.filter("country='USA' and location like 'New%'").show()

+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|          location|            entities|           sentiment|country|
+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|     New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
|228|null|This is a very co...|baskingntheGlow|     New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|   New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
|261|null|Too early ...?  #...|      HJeppesen|      New York, NY|                ['']|{'neg': 0.0, 'neu...|    USA|
|275|null|  The Republican ...|  GenetBataiile|New Hampshire, US

In [None]:
tw_filter_df=twitter_df.filter("country='USA'")
tw_filter_df.show()

+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|            location|            entities|           sentiment|country|
+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|  2|null| #Coronavirus tes...|         cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|    USA|
|  3|null| Get ready for ma...|   InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|    USA|
|  5|null| COVID-19 update ...| StewartNgilana|Durban | Port Eli...|[('Italy', 'GPE')...|{'neg': 0.178, 'n...|    USA|
|  6|null| It’s painful to ...|      BWheatnyc|             Florida|                  []|{'neg': 0.098, 'n...|    USA|
|  8|null| Questions about ...|    straightj23|        Columbus, OH|[('NAfME', 'CARDI...|{'neg': 0.0, 'neu...|    USA|
| 13|null| We’re the heck w...| harrytiffanyiv| 

In [None]:
tw_filter_df.explain()
#explaining the Transformation DAG

== Physical Plan ==
*(1) Filter (isnotnull(country#133) AND (country#133 = USA))
+- FileScan csv [_c0#126,geo#127,text#128,user#129,location#130,entities#131,sentiment#132,country#133] Batched: false, DataFilters: [isnotnull(country#133), (country#133 = USA)], Format: CSV, Location: InMemoryFileIndex[file:/tmp/twitter_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,USA)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...




In [None]:
tw_filter_usanyc_df=twitter_df.filter("country='USA' and location like 'New%'")

In [None]:
tw_filter_usanyc_df.show()

+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|          location|            entities|           sentiment|country|
+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|     New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
|228|null|This is a very co...|baskingntheGlow|     New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|   New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
|261|null|Too early ...?  #...|      HJeppesen|      New York, NY|                ['']|{'neg': 0.0, 'neu...|    USA|
|275|null|  The Republican ...|  GenetBataiile|New Hampshire, US

In [None]:
tw_filter_locN_df=twitter_df.filter(col("location").startswith("N"))
tw_filter_locN_df.show(5)

+---+----+--------------------+-------------+--------------------+--------------------+--------------------+---------+
|_c0| geo|                text|         user|            location|            entities|           sentiment|  country|
+---+----+--------------------+-------------+--------------------+--------------------+--------------------+---------+
|  3|null| Get ready for ma...| InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|      USA|
|  7|null| 📽️Friends, I wi...|      LorseaR|New South Wales, ...|[('Twitter', 'GPE...|{'neg': 0.123, 'n...|Australia|
| 31|null| I ordered Alex J...|     rcgillan|       New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|      USA|
| 36|null| Before #coronavi...| Frankapistan|       Nashville, TN|[('China', 'GPE')...|{'neg': 0.144, 'n...|      USA|
| 49|null| This week we are...|JamesWithers3|  New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|      USA|
+---+----+--------------------+-------------+----

In [None]:
tw_filter_locN_df.explain()

== Physical Plan ==
*(1) Filter (isnotnull(location#130) AND StartsWith(location#130, N))
+- FileScan csv [_c0#126,geo#127,text#128,user#129,location#130,entities#131,sentiment#132,country#133] Batched: false, DataFilters: [isnotnull(location#130), StartsWith(location#130, N)], Format: CSV, Location: InMemoryFileIndex[file:/tmp/twitter_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(location), StringStartsWith(location,N)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...




In [None]:
twitter_df.first()
#first() returns the first row as a row

Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria')

In [None]:
twitter_df.take(5)
#take() returns the first n number of rows as rows, where 'n' is an i/p parameter

[Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria'),
 Row(_c0=1, geo=None, text='"BREAKING: ""this is disappointing', user=' but i took the test"". Arsenal\'s Mikel Arteta tests positive for #coronavirus', location='  -… "', entities='JerryfranksonJF', sentiment='Abuja, Nigeria', country='"[(""Arsenal\'s Mikel Arteta""'),
 Row(_c0=2, geo=None, text=' #Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…', user='cek422', location='Pennsylvania, USA', entities='[]', sentiment="{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}", country='USA'),
 Row(_c0=3, geo=None, text=' Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…', user='InfectiousDz', locatio

In [None]:
x = spark.sparkContext.parallelize([1,4,8])
y = x.flatMap(lambda x: (x, x*x))
print (x.collect())
print (y.collect())

[1, 4, 8]
[1, 1, 4, 16, 8, 64]


In [None]:
x = spark.sparkContext.parallelize([1,4,8])
y = x.map(lambda x: (x, x*x))
print (x.collect())
print (y.collect())

[1, 4, 8]
[(1, 1), (4, 16), (8, 64)]


In [None]:
x

ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274

In [None]:
twitter_df.select("text").show() #this is similar SQL SELECT

+--------------------+
|                text|
+--------------------+
|What is God sayin...|
|"BREAKING: ""this...|
| #Coronavirus tes...|
| Get ready for ma...|
| The #coronavirus...|
| COVID-19 update ...|
| It’s painful to ...|
| 📽️Friends, I wi...|
| Questions about ...|
|How they’re deali...|
| BREAKING: Democr...|
| “If we close dow...|
| I pity the poor ...|
| We’re the heck w...|
| I don't think sh...|
| Well written, so...|
| 1/2 CDC Director...|
| In all seriousne...|
| Wash your hands....|
| #CoronaVirusCana...|
+--------------------+
only showing top 20 rows



In [None]:
twitter_df.select("text","user").show()

+--------------------+--------------------+
|                text|                user|
+--------------------+--------------------+
|What is God sayin...|          petodinice|
|"BREAKING: ""this...| but i took the t...|
| #Coronavirus tes...|              cek422|
| Get ready for ma...|        InfectiousDz|
| The #coronavirus...|          vic_gibson|
| COVID-19 update ...|      StewartNgilana|
| It’s painful to ...|           BWheatnyc|
| 📽️Friends, I wi...|             LorseaR|
| Questions about ...|         straightj23|
|How they’re deali...|       _______coolio|
| BREAKING: Democr...|      champagneaylin|
| “If we close dow...|       YorkLawLondon|
| I pity the poor ...|      BeesonMargaret|
| We’re the heck w...|      harrytiffanyiv|
| I don't think sh...|         grammyheath|
| Well written, so...|      barbara_ellena|
| 1/2 CDC Director...|               fatal|
| In all seriousne...|          pwjkmiller|
| Wash your hands....|        Mrrandy123RP|
| #CoronaVirusCana...|           

In [None]:
twitter_df.rdd.map(lambda line: line.text.split(" ")).take(5)
#.rdd helps leverage the underlying RDD api to perform certain tasks that might not be possible at dataframe level
# here we are trying to split the contents of the 'text' column into individual words based on space between them. 
# We then display the contents of the 'text' column, as a list, from the first 5 rows. 

[['What', 'is', 'God', 'saying', 'to', 'us', 'about', '#coronavirus', '?'],
 ['"BREAKING:', '""this', 'is', 'disappointing'],
 ['',
  '#Coronavirus',
  'testing',
  'must',
  'be',
  'made',
  'free',
  'to',
  'the',
  'public',
  'if',
  'we',
  'are',
  'going',
  'to',
  'understand',
  'the',
  'scope',
  'of',
  'this',
  'crisis.',
  'Anything',
  'le…'],
 ['',
  'Get',
  'ready',
  'for',
  'mass',
  'event',
  'crowd',
  'cancellations',
  'across',
  'the',
  'World',
  'starting',
  'this',
  'weekend:',
  'cricket',
  'in',
  '#Australia',
  'in',
  'empty',
  'st…'],
 ['',
  'The',
  '#coronavirus',
  'pandemic',
  'is',
  'revealing',
  'just',
  'how',
  'closely',
  'we',
  'are',
  'all',
  'bound',
  'together...[A',
  'thread]',
  '']]

In [None]:
twitter_df.rdd.flatMap(lambda line: line.text.split(" ")).take(100)
#with flatMap, there are no seperate lists for the individual rows from the 'text' column. 
#All the text across the top 100 rows are tokenized, based on space, into a gigantic list.

['What',
 'is',
 'God',
 'saying',
 'to',
 'us',
 'about',
 '#coronavirus',
 '?',
 '"BREAKING:',
 '""this',
 'is',
 'disappointing',
 '',
 '#Coronavirus',
 'testing',
 'must',
 'be',
 'made',
 'free',
 'to',
 'the',
 'public',
 'if',
 'we',
 'are',
 'going',
 'to',
 'understand',
 'the',
 'scope',
 'of',
 'this',
 'crisis.',
 'Anything',
 'le…',
 '',
 'Get',
 'ready',
 'for',
 'mass',
 'event',
 'crowd',
 'cancellations',
 'across',
 'the',
 'World',
 'starting',
 'this',
 'weekend:',
 'cricket',
 'in',
 '#Australia',
 'in',
 'empty',
 'st…',
 '',
 'The',
 '#coronavirus',
 'pandemic',
 'is',
 'revealing',
 'just',
 'how',
 'closely',
 'we',
 'are',
 'all',
 'bound',
 'together...[A',
 'thread]',
 '',
 '',
 'COVID-19',
 'update',
 'as',
 'of',
 'this',
 'morning:1.',
 'Death',
 'toll',
 'in',
 'Italy',
 'passes',
 '1,0002.',
 "Arsenal's",
 'head',
 'coach',
 'Arteta',
 'tests',
 'positive3.',
 'US…',
 '',
 'It’s',
 'painful',
 'to',
 'say,',
 'but',
 'as',
 'an']

In [None]:
corona_df.filter("Country='US'").sort(col("Date"),ascending=False)

DataFrame[_c0: int, State: string, Country: string, Lat: double, Long: double, Date: string, Confirmed: int, Death: int, Recovered: int, state_cleaned: string, City: string]

In [None]:
corona_df.filter("Country='US'").sort(col("Date"),ascending=False).show()

+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|      Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20|     1524|   83|        0|      Washington|      Washington|
|27784|         Arizona|     US|           33.7298|-111.4312|2020-03-20|       78|    0|        0|         Arizona|         Arizona|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20|     1177|   23|        0|      California|      California|
|27767|   Massachusetts|     US|           42.2302| -71.5301|2020-03-

In [None]:
corona_df.filter("Country='US'").orderBy(col("Date"),ascending=False).show()

+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|      Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20|     1524|   83|        0|      Washington|      Washington|
|27784|         Arizona|     US|           33.7298|-111.4312|2020-03-20|       78|    0|        0|         Arizona|         Arizona|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20|     1177|   23|        0|      California|      California|
|27767|   Massachusetts|     US|           42.2302| -71.5301|2020-03-

In [None]:
corona_df.filter("Country='US'").sortWithinPartitions([col("Date"),col("Confirmed")],ascending=False).show()

+-----+--------------+-------+-------+---------+----------+---------+-----+---------+--------------+--------------+
|  _c0|         State|Country|    Lat|     Long|      Date|Confirmed|Death|Recovered| state_cleaned|          City|
+-----+--------------+-------+-------+---------+----------+---------+-----+---------+--------------+--------------+
|27765|      New York|     US|42.1657| -74.9481|2020-03-20|     8310|   42|        0|      New York|      New York|
|27764|    Washington|     US|47.4009|-121.4905|2020-03-20|     1524|   83|        0|    Washington|    Washington|
|27766|    California|     US|36.1162|-119.6816|2020-03-20|     1177|   23|        0|    California|    California|
|27773|    New Jersey|     US|40.2989|  -74.521|2020-03-20|      890|   11|        0|    New Jersey|    New Jersey|
|27776|      Illinois|     US|40.3495| -88.9861|2020-03-20|      585|    5|        0|      Illinois|      Illinois|
|27772|       Florida|     US|27.7663| -81.6868|2020-03-20|      563|   

In [None]:
corona_df.describe().show()

+-------+-----------------+---------+-----------+------------------+------------------+----------+------------------+------------------+------------------+-------------+-----------+
|summary|              _c0|    State|    Country|               Lat|              Long|      Date|         Confirmed|             Death|         Recovered|state_cleaned|       City|
+-------+-----------------+---------+-----------+------------------+------------------+----------+------------------+------------------+------------------+-------------+-----------+
|  count|            28143|    19116|      28143|             28143|             28143|     28143|             28143|             28143|             28143|        28143|      14573|
|   mean|          14071.0|     null|       null|30.965553459118834|-34.57031257861667|      null|161.88245744945456| 5.494368048893153| 60.17290267562094|         null|       null|
| stddev|8124.328649186959|     null|       null|19.365472826597646| 80.78375872452575|   

In [None]:
corona_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Death: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- state_cleaned: string (nullable = true)
 |-- City: string (nullable = true)



In [None]:
corona_df.filter('Confirmed>10000').sort(col('Confirmed'),ascending=False).show()

+-----+-----+-------+-------+--------+----------+---------+-----+---------+-------------+----+
|  _c0|State|Country|    Lat|    Long|      Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+-----+-------+-------+--------+----------+---------+-----+---------+-------------+----+
|27820|Hubei|  China|30.9756|112.2707|2020-03-20|    67800| 3133|    58382|        Hubei|null|
|27343|Hubei|  China|30.9756|112.2707|2020-03-19|    67800| 3130|    57682|        Hubei|null|
|26866|Hubei|  China|30.9756|112.2707|2020-03-18|    67800| 3122|    56927|        Hubei|null|
|26389|Hubei|  China|30.9756|112.2707|2020-03-17|    67799| 3111|    56003|        Hubei|null|
|25912|Hubei|  China|30.9756|112.2707|2020-03-16|    67798| 3099|    55142|        Hubei|null|
|25435|Hubei|  China|30.9756|112.2707|2020-03-15|    67794| 3085|    54288|        Hubei|null|
|24958|Hubei|  China|30.9756|112.2707|2020-03-14|    67790| 3075|    52960|        Hubei|null|
|24481|Hubei|  China|30.9756|112.2707|2020-03-13| 

In [None]:
corona_df.filter('Confirmed>10000').approxQuantile('Confirmed',[0.25,0.5,0.75,0.9,0.95],0.9)

[10075.0, 10075.0, 10075.0, 10075.0, 67800.0]

In [None]:
corona_df.agg({"Date":"max"}).collect()
#collect shows the data in row format while show() displayed it in a tabular format

[Row(max(Date)='2020-03-20')]

In [None]:
corona_df.agg({"Date":"max"}).show()

+----------+
| max(Date)|
+----------+
|2020-03-20|
+----------+



In [None]:
corona_df.agg({"Date":"max","confirmed":"max"}).collect()
#performing aggregation on multiple columns

[Row(max(confirmed)=67800, max(Date)='2020-03-20')]

In [None]:
max_df = corona_df.agg({"Date":"max"})
max_df.show()

+----------+
| max(Date)|
+----------+
|2020-03-20|
+----------+



In [None]:
import pyspark.sql.functions as F
corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")).show()
#corona_df.groupBy("Country","State_cleaned").agg(F.max("Date")).show()
#getting the Max. date for each country,state combination. The sql max() function is different from the previous max() 
#function that we saw. The previous example showed us the max date across the dataframe, whereas here we want to see
#Max. date for each country,state combination

+--------------+--------------------+----------+
|       Country|       State_cleaned|      Date|
+--------------+--------------------+----------+
|      Cameroon|             Yaounde|2020-03-20|
|         China|             Qinghai|2020-03-20|
|        Cyprus|             Nicosia|2020-03-20|
|            US|            Michigan|2020-03-20|
|      Portugal|              Lisbon|2020-03-20|
|            US|            Colorado|2020-03-20|
|United Kingdom|      Cayman Islands|2020-03-20|
|         China|              Hainan|2020-03-20|
|            US|            Missouri|2020-03-20|
|     Australia|Australian Capita...|2020-03-20|
|            US|                Guam|2020-03-20|
|        France|             Reunion|2020-03-20|
|      Colombia|        Cundinamarca|2020-03-20|
|          Cuba|              Havana|2020-03-20|
|     Mauritius|          Port Louis|2020-03-20|
|       Ukraine|                Kiev|2020-03-20|
|         Benin|          Porto-Novo|2020-03-20|
|   Switzerland|    

In [None]:
corona_df.join(corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")),
               on=["Country","State_cleaned","Date"],how="inner").show()
# alias("Date") gives the new aggregated column the name of Date. This can then be used to join with the primary dataframe,
# using the other columns in corona_df. so 2 dfs are corona_df & (country,state,date)
# inner join helps in getting all the columns from the dataframe corona_df, where the data matches with (country,state,date)               

+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|      Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawak|2020-03-20|27670|            null|     2.5|    112.5|     103

In [None]:
corona_df.join(corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")),
               on=["Country","State_cleaned","Date"],how="inner").sort(col('Confirmed'),ascending=False).show(10)

+--------------+----------------+----------+-----+--------------+-------+-------------------+---------+-----+---------+--------+
|       Country|   state_cleaned|      Date|  _c0|         State|    Lat|               Long|Confirmed|Death|Recovered|    City|
+--------------+----------------+----------+-----+--------------+-------+-------------------+---------+-----+---------+--------+
|         China|           Hubei|2020-03-20|27820|         Hubei|30.9756|           112.2707|    67800| 3133|    58382|    null|
|         Italy|            Rome|2020-03-20|27682|          null|   43.0|               12.0|    47021| 4032|     4440|    null|
|         Spain|          Toledo|2020-03-20|27684|          null|   40.0|               -4.0|    20410| 1043|     1588|    null|
|       Germany|          Berlin|2020-03-20|27677|          null|   51.0|                9.0|    19848|   67|      180|    null|
|          Iran|          Tehran|2020-03-20|27821|          null|   32.0|               53.0|    

In [None]:
corona_df.join(corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")),
               on=["Country","State_cleaned","Date"],how="full_outer").sort(col('Confirmed'),ascending=False).show(10)

+-------+-------------+----------+-----+-----+-------+--------+---------+-----+---------+----+
|Country|state_cleaned|      Date|  _c0|State|    Lat|    Long|Confirmed|Death|Recovered|City|
+-------+-------------+----------+-----+-----+-------+--------+---------+-----+---------+----+
|  China|        Hubei|2020-03-20|27820|Hubei|30.9756|112.2707|    67800| 3133|    58382|null|
|  China|        Hubei|2020-03-18|26866|Hubei|30.9756|112.2707|    67800| 3122|    56927|null|
|  China|        Hubei|2020-03-19|27343|Hubei|30.9756|112.2707|    67800| 3130|    57682|null|
|  China|        Hubei|2020-03-17|26389|Hubei|30.9756|112.2707|    67799| 3111|    56003|null|
|  China|        Hubei|2020-03-16|25912|Hubei|30.9756|112.2707|    67798| 3099|    55142|null|
|  China|        Hubei|2020-03-15|25435|Hubei|30.9756|112.2707|    67794| 3085|    54288|null|
|  China|        Hubei|2020-03-14|24958|Hubei|30.9756|112.2707|    67790| 3075|    52960|null|
|  China|        Hubei|2020-03-13|24481|Hubei|30.9

In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

ws = Window().partitionBy("Country","State_cleaned").orderBy(col("Date").desc())
#The above is partitioning the data by Country & State, and then ordering these partitioned data by Date, with the latest
#date on top (desc() function does this)

corona_df.withColumn("row_num",row_number().over(ws)).show()
#This assigns a row number for each row of the dataframe, when it is partitioned using the above Window spec(ws).
#Thus each row displays a combination of (Country,State) for a given date. Each such row is assigned a row number.
#The row with the latest date for a given (Country,State) combination is assigned row number 1, the subsequent date is 
#assigned row number 2, and so on & so forth.
# withColumn() adds a new column to the dataframe

+-----+-----+--------+------------------+-------+----------+---------+-----+---------+-------------+----+-------+
|  _c0|State| Country|               Lat|   Long|      Date|Confirmed|Death|Recovered|state_cleaned|City|row_num|
+-----+-----+--------+------------------+-------+----------+---------+-----+---------+-------------+----+-------+
|27745| null|Cameroon|3.8480000000000003|11.5021|2020-03-20|       20|    0|        0|      Yaounde|null|      1|
|27268| null|Cameroon|3.8480000000000003|11.5021|2020-03-19|       13|    0|        0|      Yaounde|null|      2|
|26791| null|Cameroon|3.8480000000000003|11.5021|2020-03-18|       10|    0|        0|      Yaounde|null|      3|
|26314| null|Cameroon|3.8480000000000003|11.5021|2020-03-17|       10|    0|        0|      Yaounde|null|      4|
|25837| null|Cameroon|3.8480000000000003|11.5021|2020-03-16|        4|    0|        0|      Yaounde|null|      5|
|25360| null|Cameroon|3.8480000000000003|11.5021|2020-03-15|        2|    0|        0|  

In [None]:
corona_df.withColumn("row_num",row_number().over(ws)).where(col("row_num")==1).show()

+-----+--------------------+--------------+------------------+---------+----------+---------+-----+---------+--------------------+--------+-------+
|  _c0|               State|       Country|               Lat|     Long|      Date|Confirmed|Death|Recovered|       state_cleaned|    City|row_num|
+-----+--------------------+--------------+------------------+---------+----------+---------+-----+---------+--------------------+--------+-------+
|27745|                null|      Cameroon|3.8480000000000003|  11.5021|2020-03-20|       20|    0|        0|             Yaounde|    null|      1|
|27859|             Qinghai|         China|           35.7452|  95.9956|2020-03-20|       18|    0|       18|             Qinghai|    null|      1|
|27762|                null|        Cyprus|           35.1264|  33.4299|2020-03-20|       67|    0|        0|             Nicosia|    null|      1|
|27812|            Michigan|            US|           43.3266| -84.5361|2020-03-20|      552|    3|        0|   

In [None]:
corona_max_df = corona_df.join(corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")),
               on=["Country","State_cleaned","Date"],how="inner")
corona_max_df.show()

+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|      Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawak|2020-03-20|27670|            null|     2.5|    112.5|     103

In [None]:
corona_df.groupby("Country").pivot("Date").agg(F.sum("Confirmed")).show()
#This is going to show the increasing number of cases across countries on a daily basis. The pivot() turns the rows into
#columns and shows the dates in columns with the number of cases aggregating on a daily basis due to the agg() function
#and sum() on Confirmed column. 

+-----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|    Country|2020-01-22|2020-01-23|2020-01-24|2020-01-25|2020-01-26|2020-01-27|2020-01-28|2020-01-29|2020-01-30|2020-01-31|2020-02-01|2020-02-02|2020-02-03|2020-02-04|2020-02-05|2020-02-06|2020-02-07|2020-02-08|2020-02-09|2020-02-10|2020-02-11|2020-02-12|2020-02-13|2020-02-14|2020-02-15|2020-02-16|2020-02-17|2020-02-18|2020-02-19|2020-

In [None]:
corona_df.filter("Country=='US'").crosstab("State","Date").show()
#crosstab() is very useful for categorical columns. In this case, it displays the State vs Date picture, and shows a value
#of 1 wherever there is an instance against that row-column combination

+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|         State_Date|2020-01-22|2020-01-23|2020-01-24|2020-01-25|2020-01-26|2020-01-27|2020-01-28|2020-01-29|2020-01-30|2020-01-31|2020-02-01|2020-02-02|2020-02-03|2020-02-04|2020-02-05|2020-02-06|2020-02-07|2020-02-08|2020-02-09|2020-02-10|2020-02-11|2020-02-12|2020-02-13|2020-02-14|2020-02-15|2020-02-16|2020-02-17|2020-02-18|

In [None]:
corona_max_df.groupBy("Country").agg({'Confirmed':'sum'}).show()
#This shows the aggregated values of confirmed cases across countries. For countries having multiple states, it will show
#multiple rows

+-----------+--------------+
|    Country|sum(Confirmed)|
+-----------+--------------+
|       Chad|             1|
|   Paraguay|            13|
|     Russia|           253|
|    Senegal|            38|
|     Sweden|          1639|
| Cabo Verde|             1|
|     Guyana|             7|
|Philippines|           230|
|   Djibouti|             1|
|  Singapore|           385|
|   Malaysia|          1030|
|       Fiji|             1|
|     Turkey|           359|
|       Iraq|           208|
|    Germany|         19848|
|   Cambodia|            51|
|Afghanistan|            24|
|     Jordan|            85|
|   Maldives|            13|
|     Rwanda|            17|
+-----------+--------------+
only showing top 20 rows



In [None]:
corona_max_df.groupBy("Country").agg({"Confirmed":"sum","Recovered":"sum","Death":'sum'}).orderBy("sum(Confirmed)",
                                                                                                  ascending=False).show()

+--------------+--------------+----------+--------------+
|       Country|sum(Recovered)|sum(Death)|sum(Confirmed)|
+--------------+--------------+----------+--------------+
|         China|         71266|      3253|         81250|
|         Italy|          4440|      4032|         47021|
|         Spain|          1588|      1043|         20410|
|       Germany|           180|        67|         19848|
|          Iran|          6745|      1433|         19644|
|            US|             0|       244|         19100|
|        France|            12|       450|         12726|
|  Korea, South|          1540|        94|          8652|
|   Switzerland|            15|        54|          5294|
|United Kingdom|            67|       178|          4014|
|   Netherlands|             2|       107|          3003|
|       Austria|             9|         6|          2388|
|       Belgium|             1|        37|          2257|
|        Norway|             1|         7|          1914|
|        Swede

In [None]:
corona_df.filter("Country=='Italy'").sort("Date",ascending=False).show()

+-----+-----+-------+----+----+----------+---------+-----+---------+-------------+----+
|  _c0|State|Country| Lat|Long|      Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+-----+-------+----+----+----------+---------+-----+---------+-------------+----+
|27682| null|  Italy|43.0|12.0|2020-03-20|    47021| 4032|     4440|         Rome|null|
|27205| null|  Italy|43.0|12.0|2020-03-19|    41035| 3405|     4440|         Rome|null|
|26728| null|  Italy|43.0|12.0|2020-03-18|    35713| 2978|     4025|         Rome|null|
|26251| null|  Italy|43.0|12.0|2020-03-17|    31506| 2503|     2941|         Rome|null|
|25774| null|  Italy|43.0|12.0|2020-03-16|    27980| 2158|     2749|         Rome|null|
|25297| null|  Italy|43.0|12.0|2020-03-15|    24747| 1809|     2335|         Rome|null|
|24820| null|  Italy|43.0|12.0|2020-03-14|    21157| 1441|     1966|         Rome|null|
|24343| null|  Italy|43.0|12.0|2020-03-13|    17660| 1266|     1439|         Rome|null|
|23866| null|  Italy|43.0|12.0|2

In [None]:
corona_max_df.withColumn("Active",
                         corona_max_df.Confirmed-corona_max_df.Recovered-corona_max_df.Death).sort("Active",
                                                                                                   ascending=False).show()
#Create a new column ACTIVE & display the active cases (Confirmed-Recovered-Death), by sorting using the new column, in a 
#descending fashion                                                                                                   

+--------------+----------------+----------+-----+--------------+-------+-------------------+---------+-----+---------+----------+------+
|       Country|   state_cleaned|      Date|  _c0|         State|    Lat|               Long|Confirmed|Death|Recovered|      City|Active|
+--------------+----------------+----------+-----+--------------+-------+-------------------+---------+-----+---------+----------+------+
|         Italy|            Rome|2020-03-20|27682|          null|   43.0|               12.0|    47021| 4032|     4440|      null| 38549|
|       Germany|          Berlin|2020-03-20|27677|          null|   51.0|                9.0|    19848|   67|      180|      null| 19601|
|         Spain|          Toledo|2020-03-20|27684|          null|   40.0|               -4.0|    20410| 1043|     1588|      null| 17779|
|        France|          France|2020-03-20|27823|        France|46.2276|             2.2137|    12612|  450|       12|      null| 12150|
|          Iran|          Tehran|2

In [None]:
corona_actv_df=corona_max_df.withColumn("Active",
                                        corona_max_df.Confirmed-corona_max_df.Recovered-corona_max_df.Death)
corona_actv_df.show(10)

+---------+----------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+------+
|  Country|   state_cleaned|      Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|Active|
+---------+----------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+------+
| Thailand|         Bangkok|2020-03-20|27666|            null|    15.0|    101.0|      322|    1|       42|null|   279|
|    Japan|         Hiraide|2020-03-20|27667|            null|    36.0|    138.0|      963|   33|      191|null|   739|
|Singapore|       Singapore|2020-03-20|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|   261|
|    Nepal|       Kathmandu|2020-03-20|27669|            null| 28.1667|    84.25|        1|    0|        1|null|     0|
| Malaysia|         Sarawak|2020-03-20|27670|            null|     2.5|    112.5|     1030|    3|       87|null|   940|
|   Canada|British Columbia|2020-03-20|2

In [None]:
corona_actv_df.groupBy("Country").sum("Active").orderBy("sum(Active)",ascending=False).show(10)
#Showing the top-most countries with the largest Active cases.

+--------------+-----------+
|       Country|sum(Active)|
+--------------+-----------+
|         Italy|      38549|
|       Germany|      19601|
|            US|      18856|
|         Spain|      17779|
|        France|      12264|
|          Iran|      11466|
|  Korea, South|       7018|
|         China|       6731|
|   Switzerland|       5225|
|United Kingdom|       3769|
+--------------+-----------+
only showing top 10 rows

