### `Initializing Spark session` 

In [1]:
import findspark

findspark.init("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark")

import pyspark 

sc = pyspark.SparkContext()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

The data can be loaded into a PySpark SQL DataFrames creating as many DataFrames as required by the available data.

In [2]:
cases = spark.read.csv('/user/tswdsml6user0131/Capstone_Bidisha/us_cases_over_time.csv',header=True,inferSchema=True)
census = spark.read.csv("/user/tswdsml6user0131/Capstone_Bidisha/us-census-data.csv",header=True,inferSchema=True)
states = spark.read.csv("/user/tswdsml6user0131/Capstone_Bidisha/usstatenamescodes.csv",header=True,inferSchema=True)
janssen = spark.read.csv("/user/tswdsml6user0131/Capstone_Bidisha/vaccines_janssen.csv",header=True,inferSchema=True)
moderna = spark.read.csv("/user/tswdsml6user0131/Capstone_Bidisha/vaccines_moderna.csv",header=True,inferSchema=True)
pfizer = spark.read.csv("/user/tswdsml6user0131/Capstone_Bidisha/vaccines_pfizer.csv",header=True,inferSchema=True)

### `EDA`

In [3]:
cases.count()

28740

In [4]:
cases.printSchema()

root
 |-- submission_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- tot_cases: integer (nullable = true)
 |-- conf_cases: integer (nullable = true)
 |-- prob_cases: integer (nullable = true)
 |-- new_case: integer (nullable = true)
 |-- pnew_case: integer (nullable = true)
 |-- tot_death: integer (nullable = true)
 |-- conf_death: integer (nullable = true)
 |-- prob_death: integer (nullable = true)
 |-- new_death: integer (nullable = true)
 |-- pnew_death: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- consent_cases: string (nullable = true)
 |-- consent_deaths: string (nullable = true)



In [5]:
states.show(2)

+-------+------+
| STNAME|STCODE|
+-------+------+
|ALABAMA|    AL|
| ALASKA|    AK|
+-------+------+
only showing top 2 rows



In [6]:
janssen.show(2)

+------------+-------------------+--------------------+
|Jurisdiction|Week of Allocations|1st Dose Allocations|
+------------+-------------------+--------------------+
| Connecticut|         05/10/2021|                6400|
|       Maine|         05/10/2021|                2500|
+------------+-------------------+--------------------+
only showing top 2 rows



In [7]:
moderna.show(2)

+------------+-------------------+--------------------+--------------------+
|Jurisdiction|Week of Allocations|1st Dose Allocations|2nd Dose Allocations|
+------------+-------------------+--------------------+--------------------+
| Connecticut|         05/17/2021|               41220|               41220|
|       Maine|         05/17/2021|               15800|               15800|
+------------+-------------------+--------------------+--------------------+
only showing top 2 rows



In [8]:
pfizer.show(2)

+------------+-------------------+--------------------+--------------------+
|Jurisdiction|Week of Allocations|1st Dose Allocations|2nd Dose Allocations|
+------------+-------------------+--------------------+--------------------+
| Connecticut|         05/17/2021|               54990|               54990|
|       Maine|         05/17/2021|               21060|               21060|
+------------+-------------------+--------------------+--------------------+
only showing top 2 rows



In [9]:
cases.count(), len(cases.columns)

(28740, 15)

In [10]:
census.count(), len(census.columns)

(3193, 164)

In [11]:
states.count(), len(states.columns)

(56, 2)

In [12]:
janssen.count(), len(janssen.columns)

(504, 3)

In [13]:
moderna.count(), len(moderna.columns)

(1386, 4)

In [14]:
pfizer.count(), len(pfizer.columns)

(1449, 4)

### `Problem Statements`

From the census data only the required columns need to be taken and loaded into a look up table to get the population of any state based on the state code.


In [15]:
from pyspark.sql.functions import *

In [16]:
population_by_state = census.select("STNAME",'CENSUS2010POP', "POPESTIMATE2019").groupBy('STNAME')\
                      .agg(sum('CENSUS2010POP').alias('Actualpop2010'),\
                           sum('POPESTIMATE2019').alias('Estimatedpop2019'))

In [17]:
population_by_state.show(2)

+------+-------------+----------------+
|STNAME|Actualpop2010|Estimatedpop2019|
+------+-------------+----------------+
|  Utah|      5527770|         6411916|
|Hawaii|      2720602|         2831744|
+------+-------------+----------------+
only showing top 2 rows



In [18]:
states.show(2)

+-------+------+
| STNAME|STCODE|
+-------+------+
|ALABAMA|    AL|
| ALASKA|    AK|
+-------+------+
only showing top 2 rows



A delimited file of the names of states in the US and their standard code also needs to be used as it gives a standardize way to refer to the states in all the datasets.


In [19]:
population_by_stcode = states.join(population_by_state,\
                       lower(states.STNAME) == lower(population_by_state.STNAME))\
                       .drop(states.STNAME)

In [20]:
population_by_stcode.show(2)

+------+------+-------------+----------------+
|STCODE|STNAME|Actualpop2010|Estimatedpop2019|
+------+------+-------------+----------------+
|    UT|  Utah|      5527770|         6411916|
|    HI|Hawaii|      2720602|         2831744|
+------+------+-------------+----------------+
only showing top 2 rows



`Sanity Checks`
- For data cleansing we need to check and remove the records where The sum of confirmed cases and probable cases is not equal to the total cases.
-  Similarly records when the sum of confirmed deaths and probable deaths is not equal to the total deaths the records are rejected. 


In [21]:
cases_authentic = cases.filter((col('conf_cases')+col("prob_cases")== col("tot_cases"))\
                    & (col('conf_death')+col("prob_death")== col("tot_death")))

In [22]:
cases_authentic.show(2)

+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|
+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|     07/23/2020|   KY|    25147|     23882|      1265|     607|       61|      684|       680|         4|        7|         0|07/24/2020 02:18:...|        Agree|         Agree|
|     04/11/2021|   NJ|   953490|    837052|    116438|    3387|      517|    24870|     22297|      2573|       11|         0|04/12/2021 01:39:...|        Agree|         Agree|
+---------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+----

In [23]:
cases_authentic.count()

11051

In [24]:
cases.count()

28740

The dates given in the first data set are not in the default format of PySpark SQL/Hive. 
So we need to format the dates to make use of the timestamp/date calculations

In [25]:
cases_authentic_timestamp = cases_authentic.withColumn('submission_date',\
                           to_timestamp(col('submission_date'), 'mm/dd/yyyy'))

In [26]:
cases_authentic_timestamp.show(1)

+-------------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|    submission_date|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|          created_at|consent_cases|consent_deaths|
+-------------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
|2020-01-23 00:07:00|   KY|    25147|     23882|      1265|     607|       61|      684|       680|         4|        7|         0|07/24/2020 02:18:...|        Agree|         Agree|
+-------------------+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+--------------------+-------------+--------------+
only showing top 1 row



In [27]:
cases_authentic_timestamp.printSchema()

root
 |-- submission_date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- tot_cases: integer (nullable = true)
 |-- conf_cases: integer (nullable = true)
 |-- prob_cases: integer (nullable = true)
 |-- new_case: integer (nullable = true)
 |-- pnew_case: integer (nullable = true)
 |-- tot_death: integer (nullable = true)
 |-- conf_death: integer (nullable = true)
 |-- prob_death: integer (nullable = true)
 |-- new_death: integer (nullable = true)
 |-- pnew_death: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- consent_cases: string (nullable = true)
 |-- consent_deaths: string (nullable = true)



In [28]:
cases_authentic_by_state = cases_authentic.groupby("state")\
                          .agg(sum('tot_cases').alias('tot_cases'),\
                               sum('conf_cases').alias('conf_cases'),\
                               sum('prob_cases').alias('prob_cases'),\
                               sum('new_case').alias('new_case'),\
                               sum('pnew_case').alias('pnew_case'),\
                               sum('tot_death').alias('tot_death'),\
                               sum('conf_death').alias('conf_death'),\
                               sum('prob_death').alias('prob_death'),\
                               sum('new_death').alias('new_death'),\
                               sum('pnew_death').alias('pnew_death'),\
                               max('submission_date').alias('submission_date'))

In [29]:
cases_authentic_by_state.show(4)

+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+---------------+
|state|tot_cases|conf_cases|prob_cases|new_case|pnew_case|tot_death|conf_death|prob_death|new_death|pnew_death|submission_date|
+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----------+---------------+
|   AZ|153156910| 144930924|   8225986|  866678|    64475|  3074608|   2786532|    288076|    17270|      1943|     12/31/2020|
|  FSM|      127|       127|         0|       1|        0|        0|         0|         0|        0|         0|     12/31/2020|
|   NJ|108059772|  95724885|  12334887|  502510|    75871|  3084866|   2774020|    310846|     8662|       627|     05/14/2021|
|   OR| 29408702|  29061876|    346826|  192698|        0|   420822|    417846|      2976|     2510|         0|     12/31/2020|
+-----+---------+----------+----------+--------+---------+---------+----------+----------+---------+----

Next the data needs to be augmented by adding a few derived fields as explained below.
- Take the census data and extract the estimated population of 2019. – Selection of relevant columns
- Look up the standard state names and code table and get the state-wise census into a DataFrame and/or a Hive table. – Aggregation & Joining with state names dataset


The above step has already been done when we have extracted the population data by statecode.
So, the same look up table `population_by_stcode` can be reused here

In [30]:
df_EstimatedPop2019 = population_by_stcode.select('STCODE','STNAME','Estimatedpop2019')

In [31]:
df_EstimatedPop2019.show()

+------+--------------------+----------------+
|STCODE|              STNAME|Estimatedpop2019|
+------+--------------------+----------------+
|    UT|                Utah|         6411916|
|    HI|              Hawaii|         2831744|
|    MN|           Minnesota|        11279264|
|    OH|                Ohio|        23378200|
|    AR|            Arkansas|         6035608|
|    OR|              Oregon|         8435474|
|    TX|               Texas|        57991762|
|    ND|        North Dakota|         1524124|
|    PA|        Pennsylvania|        25603978|
|    CT|         Connecticut|         7130574|
|    NE|            Nebraska|         3868816|
|    VT|             Vermont|         1247978|
|    NV|              Nevada|         6160312|
|    WA|          Washington|        15229786|
|    IL|            Illinois|        25343642|
|    OK|            Oklahoma|         7913942|
|    DC|District of Columbia|         1411498|
|    DE|            Delaware|         1947528|
|    AK|     

Now joining the resultant dataframe with our Cases dataset.

Once we are equipped with the above processed data we can write the data into `Parquet` format. 


In [32]:
df_cases_with_estimatedpop2019 = df_EstimatedPop2019.join(cases_authentic_by_state, \
                                 col('STCODE') == col('state'))

In [33]:
df_cases_with_estimatedpop2019.select("STCODE","STNAME",'Estimatedpop2019','tot_cases',\
                                      'conf_cases',"conf_death").show(2)

+------+------+----------------+---------+----------+----------+
|STCODE|STNAME|Estimatedpop2019|tot_cases|conf_cases|conf_death|
+------+------+----------------+---------+----------+----------+
|    UT|  Utah|         6411916| 68682628|  68653597|    356085|
|    OH|  Ohio|        23378200|173559217| 152098392|   3014436|
+------+------+----------------+---------+----------+----------+
only showing top 2 rows



In [34]:
df_cases_with_estimatedpop2019.write.mode('overwrite').\
parquet('/user/tswdsml6user0131/Capstone_Bidisha/cases_with_estimatedpop2019.parquet')

From this data the following listing or reports need to be generated and displayed.

- Compute and list the Positive Case Rate per 100000  population for each state. This is calculated as:
 `(Total positive cases in the state / Total estimated Population of the state)*100000`

In [35]:
df_cases_with_estimatedpop2019.withColumn('positiveCaseRate(Per 100000)',\
                                           col("conf_cases") / col("Estimatedpop2019") * 100000)\
                        .select("STCODE","STNAME",round('positiveCaseRate(Per 100000)',2)\
                        .alias('positiveCaseRate(Per 100000)'))\
                        .show(4)

+------+-----------+----------------------------+
|STCODE|     STNAME|positiveCaseRate(Per 100000)|
+------+-----------+----------------------------+
|    UT|       Utah|                  1070718.91|
|    OH|       Ohio|                   650599.24|
|    OR|     Oregon|                   344519.77|
|    CT|Connecticut|                   723854.66|
+------+-----------+----------------------------+
only showing top 4 rows



Get the number of identified positive cases in each state in the last 7 days of the reporting period. 

In [36]:
df_confCasesbyStatebyDate = \
             cases_authentic_timestamp.select('state', 'submission_date','conf_cases')\
                                      .withColumn('Date', to_date('submission_date'))\
                                      .drop('submission_date')\
                                      .groupBy('state','Date').agg(sum('conf_cases')\
                                      .alias('IDPositiveCases')) #to_date only acts on timestamp col


In [37]:
df_confCasesbyStatebyDate.orderBy("state").show(20)

+-----+----------+---------------+
|state|      Date|IDPositiveCases|
+-----+----------+---------------+
|   AL|2020-01-14|         945694|
|   AL|2020-01-07|         876173|
|   AL|2021-01-20|        1545112|
|   AL|2020-01-26|        1062544|
|   AL|2021-01-21|        1548195|
|   AL|2021-01-31|         771530|
|   AL|2021-01-15|        1529947|
|   AL|2020-01-27|        1073240|
|   AL|2021-01-22|        1551375|
|   AL|2021-01-13|        1934624|
|   AL|2021-01-27|        1566081|
|   AL|2020-01-16|         967020|
|   AL|2021-01-10|        1921655|
|   AL|2020-01-29|        1098446|
|   AL|2021-01-28|        1568374|
|   AL|2020-01-01|         823776|
|   AL|2020-01-13|         934577|
|   AL|2020-01-25|        1054366|
|   AL|2021-01-18|        1538558|
|   AL|2021-01-17|        1534891|
+-----+----------+---------------+
only showing top 20 rows



In [38]:
from pyspark.sql.window import Window
window = Window.partitionBy(df_confCasesbyStatebyDate['state'])\
         .orderBy(df_confCasesbyStatebyDate.Date.desc())

df_confCasesbyStatebyDate.select('*', rank().over(window).alias('rank'))\
          .filter(col('rank') <= 7)\
          .groupBy('state').agg(sum('IDPositiveCases')\
          .alias('IDPositiveCasesPerStateInLast7Days'))\
          .orderBy('state')\
          .show()

+-----+----------------------------------+
|state|IDPositiveCasesPerStateInLast7Days|
+-----+----------------------------------+
|   AL|                           9388302|
|   AZ|                          18168895|
|   CO|                          10072363|
|   CT|                           6520187|
|   DE|                           2038094|
|  FSM|                                24|
|   GA|                          19632690|
|   ID|                           3386882|
|   IL|                          29177141|
|   KY|                           7466840|
|   MA|                          13616672|
|   ME|                            900503|
|   MI|                          15822475|
|   MP|                              3597|
|   MS|                           4404990|
|   MT|                           2321488|
|   NC|                          18093872|
|   NJ|                          17811665|
|  NYC|                          15511648|
|   OH|                          20109253|
+-----+----

Compute and list the Deaths Rate per 100000  due to COVID-19 population for each state. This is calculated as:
 (Total deaths due to COVID-19 in the state / Total estimated Population of the state)*100000


In [39]:
df_cases_with_estimatedpop2019.withColumn('deathRate(Per 100000)',\
                               col("conf_death") / col("Estimatedpop2019") * 100000)\
                               .select("STCODE","STNAME",round('deathRate(Per 100000)', 2)\
                               .alias('deathRate(Per 100000)'))\
                               .show(4)

+------+-----------+---------------------+
|STCODE|     STNAME|deathRate(Per 100000)|
+------+-----------+---------------------+
|    UT|       Utah|              5553.49|
|    OH|       Ohio|             12894.22|
|    OR|     Oregon|              4953.44|
|    CT|Connecticut|             24085.13|
+------+-----------+---------------------+
only showing top 4 rows



Get the number of deaths due to COVID-19 as identified in each state in the last 7 days of the reporting period.

In [40]:
df_deathCasesbyStatebyDate = \
             cases_authentic_timestamp.select('state', 'submission_date','conf_death')\
                                      .withColumn('Date', to_date('submission_date'))\
                                      .drop('submission_date')\
                                      .groupBy('state','Date').agg(sum('conf_death')\
                                      .alias('IDDeathCases'))

from pyspark.sql.window import Window
window = Window.partitionBy(df_deathCasesbyStatebyDate['state'])\
         .orderBy(df_deathCasesbyStatebyDate.Date.desc())

df_deathCasesbyStatebyDate.select('*', rank().over(window).alias('rank'))\
          .filter(col('rank') <= 7)\
          .groupBy('state').agg(sum('IDDeathCases')\
          .alias('IDDeathCasesPerStateInLast7Days'))\
          .orderBy('state')\
          .show()

+-----+-------------------------------+
|state|IDDeathCasesPerStateInLast7Days|
+-----+-------------------------------+
|   AL|                         195504|
|   AZ|                         332981|
|   CO|                         125476|
|   CT|                         150014|
|   DE|                          30621|
|  FSM|                              0|
|   GA|                         366146|
|   ID|                          39647|
|   IL|                         495153|
|   KY|                         113118|
|   MA|                         384722|
|   ME|                          16216|
|   MI|                         380226|
|   MP|                             48|
|   MS|                         116847|
|   MT|                          33308|
|   NC|                         238287|
|   NJ|                         507829|
|  NYC|                         597499|
|   OH|                         371928|
+-----+-------------------------------+
only showing top 20 rows



Vaccine Allocation & Distribution
State-wise breakup of allocation of Pfizer vaccine


In [41]:
pfizer.show(4)

+-------------+-------------------+--------------------+--------------------+
| Jurisdiction|Week of Allocations|1st Dose Allocations|2nd Dose Allocations|
+-------------+-------------------+--------------------+--------------------+
|  Connecticut|         05/17/2021|               54990|               54990|
|        Maine|         05/17/2021|               21060|               21060|
|Massachusetts|         05/17/2021|              105300|              105300|
|New Hampshire|         05/17/2021|               21060|               21060|
+-------------+-------------------+--------------------+--------------------+
only showing top 4 rows



In [42]:
pfizer_by_Jurisdiction = pfizer.drop("Week of Allocations").groupby("Jurisdiction")\
                            .agg(sum("1st Dose Allocations").alias('tot_first_dose_alloc'),sum("2nd Dose Allocations")\
                            .alias('tot_second_dose_alloc')).orderBy("Jurisdiction")

In [43]:
pfizer_by_Jurisdiction.show(10)

+--------------+--------------------+---------------------+
|  Jurisdiction|tot_first_dose_alloc|tot_second_dose_alloc|
+--------------+--------------------+---------------------+
|       Alabama|             1191840|              1191840|
|        Alaska|              286260|               274560|
|American Samoa|               42510|                    0|
|       Arizona|             1690260|              1690260|
|      Arkansas|              729300|               729300|
|    California|             9455160|              9455160|
|       Chicago|              673725|               673725|
|      Colorado|             1352325|              1352325|
|   Connecticut|              904800|               904800|
|      Delaware|              250770|               250770|
+--------------+--------------------+---------------------+
only showing top 10 rows



State-wise breakup of allocation of Moderna vaccine


In [44]:
moderna_by_Jurisdiction = moderna.drop("Week of Allocations").groupby("Jurisdiction")\
                        .agg(sum("1st Dose Allocations").alias('tot_first_dose_alloc'),\
                             sum("2nd Dose Allocations").alias('tot_second_dose_alloc'))\
                            .orderBy("Jurisdiction")

In [45]:
moderna_by_Jurisdiction.show(10)

+--------------+--------------------+---------------------+
|  Jurisdiction|tot_first_dose_alloc|tot_second_dose_alloc|
+--------------+--------------------+---------------------+
|       Alabama|             1021660|              1021660|
|        Alaska|              225860|               225860|
|American Samoa|               17400|                    0|
|       Arizona|             1448360|              1448360|
|      Arkansas|              619920|               619920|
|    California|             8149300|              8149300|
|       Chicago|              567920|               567920|
|      Colorado|             1158700|              1158700|
|   Connecticut|              767420|               767420|
|      Delaware|              203440|               203440|
+--------------+--------------------+---------------------+
only showing top 10 rows



In [46]:
moderna_by_Jurisdiction.printSchema()

root
 |-- Jurisdiction: string (nullable = true)
 |-- tot_first_dose_alloc: long (nullable = true)
 |-- tot_second_dose_alloc: long (nullable = true)



State-wise breakup of allocation of Jenssen vaccine


In [47]:
janssen_by_Jurisdiction = janssen.drop("Week of Allocations").groupby("Jurisdiction")\
                            .agg(sum("1st Dose Allocations").alias('tot_first_dose_alloc'))\
                            .orderBy("Jurisdiction")

In [48]:
janssen_by_Jurisdiction.show(10)

+--------------+--------------------+
|  Jurisdiction|tot_first_dose_alloc|
+--------------+--------------------+
|       Alabama|              179400|
|        Alaska|               39900|
|American Samoa|                4600|
|       Arizona|              253900|
|      Arkansas|              109000|
|    California|             1428400|
|       Chicago|               99900|
|      Colorado|              203400|
|   Connecticut|              134800|
|      Delaware|               35700|
+--------------+--------------------+
only showing top 10 rows



Total number of vaccines allocated – State-wise breakup
- in order to do that, we need to add a third column to the `janssen` dataframe and the union all of them

In [49]:
janssen_by_Jurisdiction = janssen_by_Jurisdiction.withColumn("tot_second_dose_alloc",\
                                                              lit('0').cast('long'))

In [50]:
janssen_by_Jurisdiction.show(4)

+--------------+--------------------+---------------------+
|  Jurisdiction|tot_first_dose_alloc|tot_second_dose_alloc|
+--------------+--------------------+---------------------+
|       Alabama|              179400|                    0|
|        Alaska|               39900|                    0|
|American Samoa|                4600|                    0|
|       Arizona|              253900|                    0|
+--------------+--------------------+---------------------+
only showing top 4 rows



In [53]:
df_unified_vaccine = pfizer_by_Jurisdiction\
                    .union(moderna_by_Jurisdiction)\
                    .union(janssen_by_Jurisdiction)

df_unified_vaccine = df_unified_vaccine.groupby("Jurisdiction").agg(\
                   sum('tot_first_dose_alloc').alias('tot_first_dose_alloc'),\
                   sum('tot_second_dose_alloc').alias('tot_second_dose_alloc'))\
                  .orderBy('Jurisdiction')
            

In [54]:
df_unified_vaccine.show()

+--------------------+--------------------+---------------------+
|        Jurisdiction|tot_first_dose_alloc|tot_second_dose_alloc|
+--------------------+--------------------+---------------------+
|             Alabama|             2392900|              2213500|
|              Alaska|              552020|               500420|
|      American Samoa|               64510|                    0|
|             Arizona|             3392520|              3138620|
|            Arkansas|             1458220|              1349220|
|          California|            19032860|             17604460|
|             Chicago|             1341545|              1241645|
|            Colorado|             2714425|              2511025|
|         Connecticut|             1807020|              1672220|
|            Delaware|              489910|               454210|
|District of Columbia|              373295|               346095|
|    Federal Entities|             8096105|              7418395|
|         

In [55]:
population_by_state.show(3)

+---------+-------------+----------------+
|   STNAME|Actualpop2010|Estimatedpop2019|
+---------+-------------+----------------+
|     Utah|      5527770|         6411916|
|   Hawaii|      2720602|         2831744|
|Minnesota|     10607850|        11279264|
+---------+-------------+----------------+
only showing top 3 rows



Ratio of the population covered with vaccinations in each state based on the allocation and population figures from the census data.


In [58]:
df_vaccinatedPopRatioByState = \
population_by_state.join(df_unified_vaccine, population_by_state.STNAME  == df_unified_vaccine.Jurisdiction)\
                    .drop("Jurisdiction")\
                    .withColumn("firstVaccinatedEstPopRatio", col('tot_first_dose_alloc')/col("Estimatedpop2019"))\
                    .withColumn("firstVaccinatedActPopRatio", col('tot_first_dose_alloc')/col("Actualpop2010"))\
                    .withColumn("secondVaccinatedEstPopRatio", col('tot_second_dose_alloc')/col("Estimatedpop2019"))\
                    .withColumn("secondVaccinatedActPopRatio", col('tot_second_dose_alloc')/col("Actualpop2010"))
df_vaccinatedPopRatioByState.select("STNAME","firstVaccinatedEstPopRatio","firstVaccinatedActPopRatio",\
                                    'secondVaccinatedEstPopRatio','secondVaccinatedActPopRatio').show(5)

+---------+--------------------------+--------------------------+---------------------------+---------------------------+
|   STNAME|firstVaccinatedEstPopRatio|firstVaccinatedActPopRatio|secondVaccinatedEstPopRatio|secondVaccinatedActPopRatio|
+---------+--------------------------+--------------------------+---------------------------+---------------------------+
|     Utah|       0.21244741821321428|       0.24642758291318198|        0.19652394073783874|         0.2279572051659168|
|   Hawaii|       0.25503717850201146|        0.2654559542336586|        0.23614422772680016|         0.2457911888618769|
|Minnesota|       0.23931836332583403|       0.25446579655632384|        0.22140939337885876|        0.23542329501265571|
|     Ohio|       0.24478894867868356|        0.2480268285782244|        0.22644707462507807|         0.2294423423248499|
| Arkansas|       0.24160283437890598|       0.25004475434494383|         0.2235433447632782|        0.23135424247183906|
+---------+-------------

Proportion of population that is so far not yet covered i.e. without access to vaccines as on date


In [72]:
df_NotVaccinatedTotPop = df_vaccinatedPopRatioByState.withColumn("totPopNotVaccinated",\
                        col("Estimatedpop2019")-col("tot_first_dose_alloc"))\
                            .select("STNAME","totPopNotVaccinated","Estimatedpop2019")                                                               

In [74]:
df_NotVaccinatedTotPop.show(5)

+--------------------+-------------------+----------------+
|              STNAME|totPopNotVaccinated|Estimatedpop2019|
+--------------------+-------------------+----------------+
|                Utah|            5049721|         6411916|
|              Hawaii|            2109544|         2831744|
|           Minnesota|            8579929|        11279264|
|                Ohio|           17655475|        23378200|
|            Arkansas|            4577388|         6035608|
|              Oregon|            6380724|         8435474|
|               Texas|           44955317|        57991762|
|        North Dakota|            1141579|         1524124|
|        Pennsylvania|           19969653|        25603978|
|         Connecticut|            5323554|         7130574|
|            Nebraska|            2940426|         3868816|
|             Vermont|             909583|         1247978|
|              Nevada|            4725242|         6160312|
|          Washington|           1158075

`76.78%` of US population was not vaccinated till May 2021 

In [81]:
df_NotVaccinatedTotPop.select(sum('totPopNotVaccinated')).collect()[0][0] / \
df_NotVaccinatedTotPop.select(sum('Estimatedpop2019')).collect()[0][0]

0.767822863610486