# Download and Installation

In [0]:
import pandas as pd

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()   # Will search for Spark and set in the system path

In [0]:
# to work with spark we need a spark context
from pyspark.sql import SparkSession

# Here we are telling that SparkSession.builder.master is local since we dont have distributed environment
# Both driver and executer node will be local colab environment
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Here we are providing other configurations which we usually provide during Spark Summit
spark.conf.set("spark.executor.memory", "4g")   # How much executer memory is allocated
spark.conf.set("spark.driver.memory", "4g")   # How much driver memory is allocated
spark.conf.set("spark.memory.fraction", "0.9")   # What memory fraction to allocate

In [0]:
import sys, tempfile, urllib
from pyspark.sql.functions import *

In [0]:
BASE_DIR = "/tmp"
CORONA_DATA_FILE = os.path.join(BASE_DIR, "corona_data.csv")
TWITTER_DATA_FILE = os.path.join(BASE_DIR, "twitter_data.csv")

In [0]:
corona_data = urllib.request.urlretrieve("https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/corona_dataset_latest.csv", CORONA_DATA_FILE)

In [0]:
twitter_data = urllib.request.urlretrieve("https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/tweets.csv", TWITTER_DATA_FILE)

In [12]:
! ls /tmp

blockmgr-4a9a81a3-767c-4370-a37b-9449e4963d15
corona_data.csv
hsperfdata_root
spark-31e4db0a-8c1f-4037-9827-0bc227cdef93
spark-60b0bc4b-bcd0-4ecb-83ba-93f7b7850a18
twitter_data.csv


In [0]:
corona_df = spark.read.option("inferSchema", "true").csv(CORONA_DATA_FILE, header=True)

In [14]:
corona_df.show(5)

+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|_c0|State|  Country|    Lat|    Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|  0| null| Thailand|   15.0|   101.0|2020-01-22 00:00:00|        2|    0|        0|      Bangkok|null|
|  1| null|    Japan|   36.0|   138.0|2020-01-22 00:00:00|        2|    0|        0|      Hiraide|null|
|  2| null|Singapore| 1.2833|103.8333|2020-01-22 00:00:00|        0|    0|        0|    Singapore|null|
|  3| null|    Nepal|28.1667|   84.25|2020-01-22 00:00:00|        0|    0|        0|    Kathmandu|null|
|  4| null| Malaysia|    2.5|   112.5|2020-01-22 00:00:00|        0|    0|        0|      Sarawak|null|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
only showing top 5 rows



In [15]:
corona_df.count()

28143

In [0]:
twitter_df = spark.read.option("inferSchema", "true").csv(TWITTER_DATA_FILE, header=True)

In [17]:
twitter_df.show(5)

+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|         location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|            Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|             -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|              NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          vic_gibson|          

In [18]:
twitter_df.count()

1000

# Theory

**RDD and DataFrame**

Natively Spark works with RDD (Resilient Distributed Dataset). RDD is the base foundation of Spark.
But to make Spark user friendly, Spark new versions provided simpler interface similar to pandas DF and thus we have dataframes. But at the backend RDD functions only are executed when we run some functions of this simpler interface.

**Transformation**

* Transformation is how you want to transform your input dataframe to an output dataframe.

* Spark Transformation is a function that produces new dataframe from the existing dataframe. It takes dataframe as input and produces one or more dataframe as output. Each time it creates new dataframe when we apply any transformation. Thus, the input dataframe, cannot be changed since dataframe are immutable in nature.

* Applying transformation build an RDD lineage, with the entire parent RDDs of the final RDD(s). RDD lineage, also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is **Directed Acyclic Graph (DAG)** of the entire parent RDDs of RDD.

 
* Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().
After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).


**Actions**

* Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point Action is performed. When the Action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.

*  An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task.

# Hands-on Basics

In [19]:
twitter_df.show(5)

+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|         location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|            Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|             -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|              NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          vic_gibson|          

In [20]:
corona_df.show(5)

+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|_c0|State|  Country|    Lat|    Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|  0| null| Thailand|   15.0|   101.0|2020-01-22 00:00:00|        2|    0|        0|      Bangkok|null|
|  1| null|    Japan|   36.0|   138.0|2020-01-22 00:00:00|        2|    0|        0|      Hiraide|null|
|  2| null|Singapore| 1.2833|103.8333|2020-01-22 00:00:00|        0|    0|        0|    Singapore|null|
|  3| null|    Nepal|28.1667|   84.25|2020-01-22 00:00:00|        0|    0|        0|    Kathmandu|null|
|  4| null| Malaysia|    2.5|   112.5|2020-01-22 00:00:00|        0|    0|        0|      Sarawak|null|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
only showing top 5 rows



In [21]:
# This is an transformation. And as you can see it created a new DataFrame
twitter_df.filter("country='USA'")

DataFrame[_c0: int, geo: string, text: string, user: string, location: string, entities: string, sentiment: string, country: string]

In [22]:
# This is an action which will be applied to the result of transformation
twitter_df.filter("country='USA'").show(5)

+---+----+--------------------+--------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|          user|            location|            entities|           sentiment|country|
+---+----+--------------------+--------------+--------------------+--------------------+--------------------+-------+
|  2|null| #Coronavirus tes...|        cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|    USA|
|  3|null| Get ready for ma...|  InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|    USA|
|  5|null| COVID-19 update ...|StewartNgilana|Durban | Port Eli...|[('Italy', 'GPE')...|{'neg': 0.178, 'n...|    USA|
|  6|null| It’s painful to ...|     BWheatnyc|             Florida|                  []|{'neg': 0.098, 'n...|    USA|
|  8|null| Questions about ...|   straightj23|        Columbus, OH|[('NAfME', 'CARDI...|{'neg': 0.0, 'neu...|    USA|
+---+----+--------------------+--------------+----------

In [23]:
# Give records where country is USA and lacation starts with word NEW

twitter_df.filter("country = 'USA' and location like '%New%'").show(5)

+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|            location|            entities|           sentiment|country|
+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|       New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|  New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
| 62|null|Fear will kill #C...| RobertPPurcell|norther New Jerse...|                ['']|{'neg': 0.798, 'n...|    USA|
|228|null|This is a very co...|baskingntheGlow|       New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|     New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
+---+----+--------------------+---------------+-

**HOW PLAN IS MADE TO EXECUTE TRANSFORMATION**

In [0]:
tw_filter_df = twitter_df.filter("country = 'USA'")

In [25]:
tw_filter_df.explain()

== Physical Plan ==
*(1) Project [_c0#103, geo#104, text#105, user#106, location#107, entities#108, sentiment#109, country#110]
+- *(1) Filter (isnotnull(country#110) && (country#110 = USA))
   +- *(1) FileScan csv [_c0#103,geo#104,text#105,user#106,location#107,entities#108,sentiment#109,country#110] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/twitter_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,USA)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...


Read this plan from bottom and go up. It shows how Spark is going to execute particular transformation.

* First step is scan CSV at the given location and other details like Schema
* Second we have Filter transformation applied
* Lastly it shows how to Project the filter results

In [26]:
# Action first() will give first record from the dataframe. The output will be a row object. You can take it and extract values
tw_filter_df.first()

Row(_c0=2, geo=None, text=' #Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…', user='cek422', location='Pennsylvania, USA', entities='[]', sentiment="{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}", country='USA')

In [27]:
# This Action will show 5 records
tw_filter_df.take(5)

[Row(_c0=2, geo=None, text=' #Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…', user='cek422', location='Pennsylvania, USA', entities='[]', sentiment="{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}", country='USA'),
 Row(_c0=3, geo=None, text=' Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…', user='InfectiousDz', location='NYC', entities="[('World', 'ORG'), ('this weekend', 'DATE'), '']", sentiment="{'neg': 0.085, 'neu': 0.798, 'pos': 0.117, 'compound': 0.1779}", country='USA'),
 Row(_c0=5, geo=None, text=" COVID-19 update as of this morning:1. Death toll in Italy passes 1,0002. Arsenal's head coach Arteta tests positive3. US…", user='StewartNgilana', location='Durban | Port Elizabeth', entities="[('Italy', 'GPE'), ('1,0002', 'CARDINAL'), ('Arsenal', 'ORG'), ('Arteta', 'ORG'), ('US', 'GPE')]", sentiment="{'neg': 0.178, 'neu': 

# Narrow and Wide transformation Theory

**How RDDs are represented**

RDDs are made up of 4 parts:

* **Partitions**: Atomic pieces of the dataset. One or many per compute node.
* **Dependencies**: Models relationship between this RDD and its partitions with the RDD(s) it was derived from. (Note that the dependencies maybe modeled per partition as shown below)
* A **function** for computing the dataset based on its parent RDDs.
* **Metadata** about it partitioning scheme and data placement.

![alt text](https://drive.google.com/uc?id=1zT_K8iXNq2ha6o55YAS-2N2V4L8ANbfB)

[Image Source](https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies)

![alt text](https://drive.google.com/uc?id=10DpIcHssJkIMBbKK0Dn6hJsd5qkptLvo)

[Image Source](https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies)

Transformations cause shuffles, and can have 2 kinds of dependencies:



**Narrow dependencies**: Each partition of the parent RDD is used by at most one partition of the child RDD.

`[parent RDD partition] ---> [child RDD partition]`

Fast! No shuffle necessary. Optimizations like pipelining possible. Thus transformations which have narrow dependencies are fast.


**Wide dependencies**: Each partition of the parent RDD may be used by multiple child partitions

In [27]:
                       ---> [child RDD partition 1]
[parent RDD partition] ---> [child RDD partition 2]
                       ---> [child RDD partition 3]

SyntaxError: ignored

Slow! Shuffle necessary for all or some data over the network. Thus transformations which have narrow dependencies are slow.

**Narrow dependencies Vs. Wide dependencies**

![alt text](https://drive.google.com/uc?id=1PlxquvMiUIZlNtXLpDhgrZOprAyZX4HV)

[Image Source](https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies)

# RDD Examples

We Will see some **transformation with RDDs**. Though we will never use RDDS as we have Dataframe interface.

In [28]:
x = spark.sparkContext.parallelize([1,4,8])         # When we do parallelize, we are telling spark to distribute the data in the cluster
y = x.flatMap(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

ERROR! Session/line number was not unique in database. History logging moved to new session 59
[1, 4, 8]
[1, 1, 4, 16, 8, 64]


In [29]:
x = spark.sparkContext.parallelize([1,4,8])         # When we do parallelize, we are telling spark to distribute the data in the cluster
y = x.map(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

[1, 4, 8]
[(1, 1), (4, 16), (8, 64)]


There is the difference between the map and flatMap function.
Flatmap flattens the whole output removing all the internal lists
and tuples. Whereas map will keep the internal things.

# More hands-on

In [33]:
twitter_df.show(1)

+---+----+--------------------+----------+--------+--------------------+--------------------+-------+
|_c0| geo|                text|      user|location|            entities|           sentiment|country|
+---+----+--------------------+----------+--------+--------------------+--------------------+-------+
|  0|null|What is God sayin...|petodinice|   Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|Nigeria|
+---+----+--------------------+----------+--------+--------------------+--------------------+-------+
only showing top 1 row



In [34]:
corona_df.show(1)

+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
|_c0|State| Country| Lat| Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
|  0| null|Thailand|15.0|101.0|2020-01-22 00:00:00|        2|    0|        0|      Bangkok|null|
+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
only showing top 1 row



In [37]:
twitter_df.select("text").take(2)

[Row(text='What is God saying to us about #coronavirus ?'),
 Row(text='"BREAKING: ""this is disappointing')]

In [38]:
twitter_df.select("text").show(2)

+--------------------+
|                text|
+--------------------+
|What is God sayin...|
|"BREAKING: ""this...|
+--------------------+
only showing top 2 rows



In [40]:
twitter_df.select("text", "user").show(2)

+--------------------+--------------------+
|                text|                user|
+--------------------+--------------------+
|What is God sayin...|          petodinice|
|"BREAKING: ""this...| but i took the t...|
+--------------------+--------------------+
only showing top 2 rows



In [52]:
# Here we have same RDD map and flatMap using dataframe interface
# here line means a row. We are taking each row, then going to the "text" column
# of that row and performing the split on the resultant cells
twitter_df.rdd.map(lambda line: line.text.split(" ")).take(3)

[['What', 'is', 'God', 'saying', 'to', 'us', 'about', '#coronavirus', '?'],
 ['"BREAKING:', '""this', 'is', 'disappointing'],
 ['',
  '#Coronavirus',
  'testing',
  'must',
  'be',
  'made',
  'free',
  'to',
  'the',
  'public',
  'if',
  'we',
  'are',
  'going',
  'to',
  'understand',
  'the',
  'scope',
  'of',
  'this',
  'crisis.',
  'Anything',
  'le…']]

In [48]:
# As expected this will flatten all the internal lists into a huge list
twitter_df.rdd.flatMap(lambda line: line.text.split(" ")).take(100)

['What',
 'is',
 'God',
 'saying',
 'to',
 'us',
 'about',
 '#coronavirus',
 '?',
 '"BREAKING:',
 '""this',
 'is',
 'disappointing',
 '',
 '#Coronavirus',
 'testing',
 'must',
 'be',
 'made',
 'free',
 'to',
 'the',
 'public',
 'if',
 'we',
 'are',
 'going',
 'to',
 'understand',
 'the',
 'scope',
 'of',
 'this',
 'crisis.',
 'Anything',
 'le…',
 '',
 'Get',
 'ready',
 'for',
 'mass',
 'event',
 'crowd',
 'cancellations',
 'across',
 'the',
 'World',
 'starting',
 'this',
 'weekend:',
 'cricket',
 'in',
 '#Australia',
 'in',
 'empty',
 'st…',
 '',
 'The',
 '#coronavirus',
 'pandemic',
 'is',
 'revealing',
 'just',
 'how',
 'closely',
 'we',
 'are',
 'all',
 'bound',
 'together...[A',
 'thread]',
 '',
 '',
 'COVID-19',
 'update',
 'as',
 'of',
 'this',
 'morning:1.',
 'Death',
 'toll',
 'in',
 'Italy',
 'passes',
 '1,0002.',
 "Arsenal's",
 'head',
 'coach',
 'Arteta',
 'tests',
 'positive3.',
 'US…',
 '',
 'It’s',
 'painful',
 'to',
 'say,',
 'but',
 'as',
 'an']

In [53]:
corona_df.show(2)

+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
|_c0|State| Country| Lat| Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
|  0| null|Thailand|15.0|101.0|2020-01-22 00:00:00|        2|    0|        0|      Bangkok|null|
|  1| null|   Japan|36.0|138.0|2020-01-22 00:00:00|        2|    0|        0|      Hiraide|null|
+---+-----+--------+----+-----+-------------------+---------+-----+---------+-------------+----+
only showing top 2 rows



In [62]:
corona_df.filter("Country='US'").sort(col("Date"), ascending=False).show()

+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|               Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|      Washington|      Washington|
|27784|         Arizona|     US|           33.7298|-111.4312|2020-03-20 00:00:00|       78|    0|        0|         Arizona|         Arizona|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|      California|      California|
|27767

In [63]:
# orderBy and sort are same. But are quite expensive. They both are wide Transformation
# Data has to be shuffled in this.
corona_df.filter("Country='US'").orderBy(col("Date"), ascending=False).show()

+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|               Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|      Washington|      Washington|
|27784|         Arizona|     US|           33.7298|-111.4312|2020-03-20 00:00:00|       78|    0|        0|         Arizona|         Arizona|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|      California|      California|
|27767

In [66]:
# Look at the Confirmed column to see the affect of ordering
corona_df.filter("Country='US'").orderBy([col("Date"), col("Confirmed")], ascending=False).show()

+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|  _c0|         State|Country|    Lat|     Long|               Date|Confirmed|Death|Recovered| state_cleaned|          City|
+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|27765|      New York|     US|42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|      New York|      New York|
|27764|    Washington|     US|47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|    Washington|    Washington|
|27766|    California|     US|36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|    California|    California|
|27773|    New Jersey|     US|40.2989|  -74.521|2020-03-20 00:00:00|      890|   11|        0|    New Jersey|    New Jersey|
|27776|      Illinois|     US|40.3495| -88.9861|2020-03-20 00:00:00|      585|    5|        0|      Illinois|      Illinois|


In [65]:
# If we have partitioned our data smartly and we know that all of our USA records are in one partition
# only then we can use sortWithinPartition. The usage of this function will be that it will be faster
# as there will be no shuffling between partitions.
corona_df.filter("Country='US'").sortWithinPartitions([col("Date"), col("Confirmed")], ascending=False).show()

+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|               Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+-------------------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|      Washington|      Washington|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|      California|      California|
|27767|   Massachusetts|     US|           42.2302| -71.5301|2020-03-20 00:00:00|      413|    1|        0|   Massachusetts|   Massachusetts|
|27768

In [69]:
corona_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Death: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- state_cleaned: string (nullable = true)
 |-- City: string (nullable = true)



In [67]:
corona_df.describe().show()

+-------+-----------------+---------+-----------+------------------+------------------+------------------+------------------+------------------+-------------+-----------+
|summary|              _c0|    State|    Country|               Lat|              Long|         Confirmed|             Death|         Recovered|state_cleaned|       City|
+-------+-----------------+---------+-----------+------------------+------------------+------------------+------------------+------------------+-------------+-----------+
|  count|            28143|    19116|      28143|             28143|             28143|             28143|             28143|             28143|        28143|      14573|
|   mean|          14071.0|     null|       null|30.965553459118834|-34.57031257861667|161.88245744945456| 5.494368048893153| 60.17290267562094|         null|       null|
| stddev|8124.328649186959|     null|       null|19.365472826597646| 80.78375872452575| 2519.847217725942|109.29475709869875|1346.6597829124426| 

In [71]:
corona_df.filter("Confirmed > 1000").sort(col("Confirmed"), ascending=False).show()

+-----+-----+-------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|  _c0|State|Country|    Lat|    Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+-----+-------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|27820|Hubei|  China|30.9756|112.2707|2020-03-20 00:00:00|    67800| 3133|    58382|        Hubei|null|
|27343|Hubei|  China|30.9756|112.2707|2020-03-19 00:00:00|    67800| 3130|    57682|        Hubei|null|
|26866|Hubei|  China|30.9756|112.2707|2020-03-18 00:00:00|    67800| 3122|    56927|        Hubei|null|
|26389|Hubei|  China|30.9756|112.2707|2020-03-17 00:00:00|    67799| 3111|    56003|        Hubei|null|
|25912|Hubei|  China|30.9756|112.2707|2020-03-16 00:00:00|    67798| 3099|    55142|        Hubei|null|
|25435|Hubei|  China|30.9756|112.2707|2020-03-15 00:00:00|    67794| 3085|    54288|        Hubei|null|
|24958|Hubei|  China|30.9756|112.2707|2020-03-14 00:00:00|    67

In [110]:
# Doesnt give exact quantiles 
corona_df.filter("Confirmed > 1000").approxQuantile("Confirmed", [0.25, 0.5, 0.75, 0.9, 0.95], relativeError=0.3)

[1001.0, 1273.0, 67800.0, 67800.0, 67800.0]

In [80]:
corona_df.agg({"Date":"max"}).collect()

[Row(max(Date)=datetime.datetime(2020, 3, 20, 0, 0))]

In [81]:
corona_df.agg({"Date":"max", "Date":"min"}).collect()

[Row(min(Date)=datetime.datetime(2020, 1, 22, 0, 0))]

In [84]:
corona_df.agg({"Date":"max", "Date":"min", "long":"min"}).collect()

[Row(min(Date)=datetime.datetime(2020, 1, 22, 0, 0), min(long)=-157.8584)]

In [88]:
corona_df.agg({"Date":"max", "Date":"min", "long":"min", "long":"max"}).collect()

[Row(min(Date)=datetime.datetime(2020, 1, 22, 0, 0), max(long)=178.065)]

In [0]:
max_date = corona_df.agg({"Date":"max"})

In [86]:
max_date.show()

+-------------------+
|          max(Date)|
+-------------------+
|2020-03-20 00:00:00|
+-------------------+



In [90]:
# Other way to do same Agg function

import pyspark.sql.functions as F
corona_df.groupBy("country", "state_cleaned").agg(F.max("Date")).show()

+--------------+--------------------+-------------------+
|       country|       state_cleaned|          max(Date)|
+--------------+--------------------+-------------------+
|      Cameroon|             Yaounde|2020-03-20 00:00:00|
|        Cyprus|             Nicosia|2020-03-20 00:00:00|
|            US|            Michigan|2020-03-20 00:00:00|
|         China|             Qinghai|2020-03-20 00:00:00|
|      Portugal|              Lisbon|2020-03-20 00:00:00|
|            US|            Colorado|2020-03-20 00:00:00|
|United Kingdom|      Cayman Islands|2020-03-20 00:00:00|
|            US|            Missouri|2020-03-20 00:00:00|
|         China|              Hainan|2020-03-20 00:00:00|
|     Australia|Australian Capita...|2020-03-20 00:00:00|
|            US|                Guam|2020-03-20 00:00:00|
|        France|             Reunion|2020-03-20 00:00:00|
|      Colombia|        Cundinamarca|2020-03-20 00:00:00|
|          Cuba|              Havana|2020-03-20 00:00:00|
|     Mauritiu

In [100]:
# First groupBY "country" and "State_cleaned" followed by getting the max of 
# "Date" for each group using aggregate function max. 
# Then joining the resultant data with original data on "Country", "State_cleaned", "Date"
# using inner join

# Alias "Date" is used because without it "max(Date)" was column name
corona_df.join(corona_df.groupBy("country", "state_cleaned").agg(F.max("Date").alias("Date")), on = ["Country", "State_cleaned", "Date"], how="inner").show()

+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|               Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20 00:00:00|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20 00:00:00|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20 00:00:00|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20 00:00:00|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawa

In [113]:
# Adding a sorting to above DF

#This will give state and country with maximum confirmed cases 
corona_df.join(corona_df.groupBy("country", "state_cleaned").agg(F.max("Date").alias("Date")), on = ["Country", "State_cleaned", "Date"], how="inner").sort("confirmed", ascending=False).show(10)

+--------------+----------------+-------------------+-----+--------------+-------+-------------------+---------+-----+---------+--------+
|       Country|   state_cleaned|               Date|  _c0|         State|    Lat|               Long|Confirmed|Death|Recovered|    City|
+--------------+----------------+-------------------+-----+--------------+-------+-------------------+---------+-----+---------+--------+
|         China|           Hubei|2020-03-20 00:00:00|27820|         Hubei|30.9756|           112.2707|    67800| 3133|    58382|    null|
|         Italy|            Rome|2020-03-20 00:00:00|27682|          null|   43.0|               12.0|    47021| 4032|     4440|    null|
|         Spain|          Toledo|2020-03-20 00:00:00|27684|          null|   40.0|               -4.0|    20410| 1043|     1588|    null|
|       Germany|          Berlin|2020-03-20 00:00:00|27677|          null|   51.0|                9.0|    19848|   67|      180|    null|
|          Iran|          Tehran|2

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [142]:
# Here we are first partitioning the data into logical partitions using Window()
# function. Here partitions are made by county and state_cleaned columns.
# Then we are orderingby date in descending order

ws = Window().partitionBy("Country", "State_cleaned").orderBy(col("Date").desc())


# Here we are creating a new column with the withColumn function
# The new column will behave like an index starting from 1
corona_df.withColumn("row_num", row_number().over(ws)).show()

# The resultant of these two lines of code will be the same dataframe
# except all the records of one country and one state will be together
# and in one particular order

# This is substitute of traditional groupBY and join where first you
# first group by country and state _cleaned followed by joining the rest of
# the dataset.

+-----+-----+--------+------------------+-------+-------------------+---------+-----+---------+-------------+----+-------+
|  _c0|State| Country|               Lat|   Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|row_num|
+-----+-----+--------+------------------+-------+-------------------+---------+-----+---------+-------------+----+-------+
|27745| null|Cameroon|3.8480000000000003|11.5021|2020-03-20 00:00:00|       20|    0|        0|      Yaounde|null|      1|
|27268| null|Cameroon|3.8480000000000003|11.5021|2020-03-19 00:00:00|       13|    0|        0|      Yaounde|null|      2|
|26791| null|Cameroon|3.8480000000000003|11.5021|2020-03-18 00:00:00|       10|    0|        0|      Yaounde|null|      3|
|26314| null|Cameroon|3.8480000000000003|11.5021|2020-03-17 00:00:00|       10|    0|        0|      Yaounde|null|      4|
|25837| null|Cameroon|3.8480000000000003|11.5021|2020-03-16 00:00:00|        4|    0|        0|      Yaounde|null|      5|
|25360| null|Cam

In [146]:
# Give only those records where row_number == 1.
# Which means latest record from each state country pair, given the records are in descending order of date.
corona_df.withColumn("row_num", row_number().over(ws)).where(col("row_num") == 1).show()

+-----+--------------------+--------------+------------------+---------+-------------------+---------+-----+---------+--------------------+--------+-------+
|  _c0|               State|       Country|               Lat|     Long|               Date|Confirmed|Death|Recovered|       state_cleaned|    City|row_num|
+-----+--------------------+--------------+------------------+---------+-------------------+---------+-----+---------+--------------------+--------+-------+
|27745|                null|      Cameroon|3.8480000000000003|  11.5021|2020-03-20 00:00:00|       20|    0|        0|             Yaounde|    null|      1|
|27859|             Qinghai|         China|           35.7452|  95.9956|2020-03-20 00:00:00|       18|    0|       18|             Qinghai|    null|      1|
|27762|                null|        Cyprus|           35.1264|  33.4299|2020-03-20 00:00:00|       67|    0|        0|             Nicosia|    null|      1|
|27812|            Michigan|            US|           43.3

In [0]:
# All the countries with maximum date
corona_max_df = corona_df.join(corona_df.groupBy("country", "state_cleaned").agg(F.max("Date").alias("Date")), on = ["Country", "state_cleaned", "date"], how="inner")

In [149]:
corona_max_df.show()

+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|               Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20 00:00:00|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20 00:00:00|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20 00:00:00|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20 00:00:00|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawa

In [122]:
# Number of cases in country with respect to date
corona_df.groupBy("country").pivot("Date").agg(F.sum("Confirmed")).show()

+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------

In [123]:
# Crosstab is geerally used to see relation between categorical variables
# How occurance of one variable is affected by another
# here the values will be 1 or 0 based on record of that state present on that 
# date or not
corona_df.filter("country='US'").crosstab("State", "Date").show()

+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+-----------

In [126]:
corona_max_df.groupBy("Country").agg({"Confirmed":"sum"}).show()

+-----------+--------------+
|    Country|sum(Confirmed)|
+-----------+--------------+
|       Chad|             1|
|   Paraguay|            13|
|     Russia|           253|
|    Senegal|            38|
|     Sweden|          1639|
| Cabo Verde|             1|
|     Guyana|             7|
|Philippines|           230|
|   Djibouti|             1|
|  Singapore|           385|
|   Malaysia|          1030|
|       Fiji|             1|
|     Turkey|           359|
|       Iraq|           208|
|    Germany|         19848|
|   Cambodia|            51|
|Afghanistan|            24|
|     Jordan|            85|
|   Maldives|            13|
|     Rwanda|            17|
+-----------+--------------+
only showing top 20 rows



In [128]:
corona_max_df.groupby("Country").agg({"Confirmed":"sum", "Recovered":"sum", "death":"sum"}).orderBy("sum(Confirmed)", ascending=False).show()

+--------------+--------------+----------+--------------+
|       Country|sum(Recovered)|sum(death)|sum(Confirmed)|
+--------------+--------------+----------+--------------+
|         China|         71266|      3253|         81250|
|         Italy|          4440|      4032|         47021|
|         Spain|          1588|      1043|         20410|
|       Germany|           180|        67|         19848|
|          Iran|          6745|      1433|         19644|
|            US|             0|       244|         19100|
|        France|            12|       450|         12726|
|  Korea, South|          1540|        94|          8652|
|   Switzerland|            15|        54|          5294|
|United Kingdom|            67|       178|          4014|
|   Netherlands|             2|       107|          3003|
|       Austria|             9|         6|          2388|
|       Belgium|             1|        37|          2257|
|        Norway|             1|         7|          1914|
|        Swede

In [130]:
corona_df.filter("country='Italy'").sort("Date", ascending=False).show()

+-----+-----+-------+----+----+-------------------+---------+-----+---------+-------------+----+
|  _c0|State|Country| Lat|Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+-----+-------+----+----+-------------------+---------+-----+---------+-------------+----+
|27682| null|  Italy|43.0|12.0|2020-03-20 00:00:00|    47021| 4032|     4440|         Rome|null|
|27205| null|  Italy|43.0|12.0|2020-03-19 00:00:00|    41035| 3405|     4440|         Rome|null|
|26728| null|  Italy|43.0|12.0|2020-03-18 00:00:00|    35713| 2978|     4025|         Rome|null|
|26251| null|  Italy|43.0|12.0|2020-03-17 00:00:00|    31506| 2503|     2941|         Rome|null|
|25774| null|  Italy|43.0|12.0|2020-03-16 00:00:00|    27980| 2158|     2749|         Rome|null|
|25297| null|  Italy|43.0|12.0|2020-03-15 00:00:00|    24747| 1809|     2335|         Rome|null|
|24820| null|  Italy|43.0|12.0|2020-03-14 00:00:00|    21157| 1441|     1966|         Rome|null|
|24343| null|  Italy|43.0|12.0

In [153]:
# We can add new column in the dataframe with the "withColumn()" function.
corona_max_df.withColumn("Active", corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death).sort("Active", ascending=False).show(40)

+--------------+----------------+-------------------+-----+--------------+--------+-------------------+---------+-----+---------+-------------+------+
|       Country|   state_cleaned|               Date|  _c0|         State|     Lat|               Long|Confirmed|Death|Recovered|         City|Active|
+--------------+----------------+-------------------+-----+--------------+--------+-------------------+---------+-----+---------+-------------+------+
|         Italy|            Rome|2020-03-20 00:00:00|27682|          null|    43.0|               12.0|    47021| 4032|     4440|         null| 38549|
|       Germany|          Berlin|2020-03-20 00:00:00|27677|          null|    51.0|                9.0|    19848|   67|      180|         null| 19601|
|         Spain|          Toledo|2020-03-20 00:00:00|27684|          null|    40.0|               -4.0|    20410| 1043|     1588|         null| 17779|
|        France|          France|2020-03-20 00:00:00|27823|        France| 46.2276|           

In [0]:
# To reflect changes in the oroginal dataset we have to store back the results like this
corona_max_df = corona_max_df.withColumn("Active", corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death)

In [152]:
corona_max_df.show()

+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+------+
|             Country|       state_cleaned|               Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|Active|
+--------------------+--------------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+------+
|            Thailand|             Bangkok|2020-03-20 00:00:00|27666|            null|    15.0|    101.0|      322|    1|       42|null|   279|
|               Japan|             Hiraide|2020-03-20 00:00:00|27667|            null|    36.0|    138.0|      963|   33|      191|null|   739|
|           Singapore|           Singapore|2020-03-20 00:00:00|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|   261|
|               Nepal|           Kathmandu|2020-03-20 00:00:00|27669|            null| 28.1667|    84.25|        1|    0|        1|null|

In [161]:
# Countries with most Active cases of Virus.
corona_max_df.groupBy("Country").agg(F.sum("Active")).orderBy("sum(Active)", ascending=False).show()

+--------------+-----------+
|       Country|sum(Active)|
+--------------+-----------+
|         Italy|      38549|
|       Germany|      19601|
|            US|      18856|
|         Spain|      17779|
|        France|      12264|
|          Iran|      11466|
|  Korea, South|       7018|
|         China|       6731|
|   Switzerland|       5225|
|United Kingdom|       3769|
|   Netherlands|       2894|
|       Austria|       2373|
|       Belgium|       2219|
|        Norway|       1906|
|        Sweden|       1607|
|       Denmark|       1327|
|      Portugal|       1009|
|      Malaysia|        940|
|        Canada|        922|
|       Czechia|        829|
+--------------+-----------+
only showing top 20 rows

