### Country Vx Throughput Analysis - Data Fixes
 
This workbook identifies the records that have been manually edited throughput the cleaning process, intended to serve as a reference for end users.

* Source:
  - covax_supply_chain_analytics.analysis_vx_throughput_data
  - covax_supply_chain_analytics.analysis_vx_throughput_output_daily

* Target:
  - covax_supply_chain_analytics.analysis_vx_throughput_data_fixes

* Libraries: 
  - Python

* Built by: Jeremy Cooper
* Current owner: Jeremy Cooper
* Initial Build Date: 11/02/2021
* Latest Build Date: 11/02/2021

### Environment Management

In [0]:
# dbutils.widgets.removeAll()

In [0]:
# # Dataset Name, will be used for the Metastore Table, Folder Name for transformed outputs
# dbutils.widgets.text("Dataset", "dataset_name")

# # Project Name will be used for folder Name for transformed outputs
# dbutils.widgets.text("Project", "project_name")

# # Team name should be consistent with the Blob Storage Container
# dbutils.widgets.text("Partner","partner_name")

# # Team name should be consistent with the Blob Storage Container
# dbutils.widgets.text("Source","data_source")

# dbutils.widgets.text("iso_code", "")

#### Notebook Setup

##### Import any libraries or nested notebooks

In [0]:
from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

##### Initialize File Paths

In [0]:
storage_root = "/mnt/"+dbutils.widgets.get("Partner")+"/"
storage_branch = "/"+dbutils.widgets.get("Source")+"/" +dbutils.widgets.get("Dataset")

raw_storage_path = storage_root + "raw" +storage_branch
dbfs_raw_storage_path = "/dbfs"+raw_storage_path

transformed_storage_path = storage_root + "transformed" +storage_branch
dbfs_transformed_storage_path = "/dbfs"+transformed_storage_path

print(raw_storage_path)
print(transformed_storage_path)

### Get Data

In [0]:
iso_code = dbutils.widgets.get("iso_code")

In [0]:
# use this to get iso_code from country
iso_mapping = spark.sql("SELECT * FROM country_dimension.iso_mapping")

# pre cleaned data
raw = spark.sql("SELECT * FROM covax_supply_chain_analytics.analysis_vx_throughput_data")

# cleaned data
who = spark.sql("SELECT * FROM covax_supply_chain_analytics.analysis_vx_throughput_output_daily")

### Transformation

In [0]:
# get iso code, clean additional country names
# convert date to date_week, based on "next" Friday date
df1 = raw \
  .withColumn('total_doses', col('total_doses').cast(DoubleType())) \
  .withColumn('at_least_one_dose', col('at_least_one_dose').cast(DoubleType())) \
  .withColumn('fully_vaccinated', col('fully_vaccinated').cast(DoubleType())) \
  .filter((col('total_doses').isNotNull()) & (col('total_doses')>0)) \
  .select('country_name', 'date', 'total_doses', 'at_least_one_dose', 'fully_vaccinated', 'date_accessed') \
  .withColumn('date', to_date(col('date'))) \
  .drop_duplicates() \
  .join(iso_mapping, 'country_name', how='left') \
  .withColumn('iso_code', when(col('country_name')=='Bonaire, Sint Eustatius And Saba/Saba', 'BES1') \
              .when(col('country_name')=='Bonaire, Sint Eustatius And Saba/Sint Eustatius', 'BES1') \
              .when(col('country_name')=='Bonaire, Sint Eustatius And Saba', 'BES2') \
              .when(col('country_name')=='Bonaire, Sint Eustatius And Saba/Bonaire', 'XAA') \
              .otherwise(col('iso_code'))) \
  .fillna(0)


display(df1.filter(col('iso_code').isNull()))
display(df1.filter(col('iso_code')==iso_code))
display(df1.orderBy('iso_code', 'date'))

country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed,iso_code


country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed,iso_code
Afghanistan,2022-04-18,5948889.0,5253929.0,4603187.0,2022-05-09,AFG
Afghanistan,2021-08-09,1767239.0,769869.0,997370.0,2022-05-09,AFG
Afghanistan,2021-07-05,915671.0,726349.0,189322.0,2022-05-09,AFG
Afghanistan,2022-04-05,5874881.0,5190149.0,4534761.0,2022-05-09,AFG
Afghanistan,2021-04-07,103402.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2021-10-31,3398410.0,848598.0,2549812.0,2022-05-09,AFG
Afghanistan,2022-03-14,5672576.0,5016607.0,4349252.0,2022-05-09,AFG
Afghanistan,2021-04-12,145291.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2022-04-04,5872684.0,5188057.0,4532577.0,2022-05-09,AFG
Afghanistan,2021-04-22,250169.0,0.0,29903.0,2022-05-09,AFG


country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed,iso_code
Aruba,2021-02-19,12.0,12.0,0.0,2022-05-09,ABW
Aruba,2021-02-26,2896.0,2896.0,0.0,2022-05-09,ABW
Aruba,2021-03-05,8882.0,8882.0,0.0,2022-05-09,ABW
Aruba,2021-03-12,10679.0,10679.0,0.0,2022-05-09,ABW
Aruba,2021-03-19,14628.0,11772.0,2856.0,2022-05-09,ABW
Aruba,2021-03-26,21602.0,12835.0,8767.0,2022-05-09,ABW
Aruba,2021-04-01,27904.0,17554.0,10350.0,2022-05-09,ABW
Aruba,2021-04-09,35430.0,24102.0,11328.0,2022-05-09,ABW
Aruba,2021-04-16,48254.0,36112.0,12142.0,2022-05-09,ABW
Aruba,2021-04-23,62343.0,44421.0,17922.0,2022-05-09,ABW


In [0]:
who1 = who \
  .select('iso_code', 'date', 'total_doses', 'at_least_one_dose', 'fully_vaccinated') \
  .withColumn('check', lit(1)) \
  .fillna(0)

df2 = df1 \
  .join(who1, ['iso_code', 'date', 'total_doses', 'at_least_one_dose', 'fully_vaccinated'], how='left') \
  .filter(col('check').isNull()) \
  .select('iso_code', 'country_name', 'date', 'total_doses', 'at_least_one_dose', 'fully_vaccinated', 'date_accessed')

display(df1.filter(col('iso_code')==iso_code))
display(who1.filter(col('iso_code')==iso_code))

country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed,iso_code
Afghanistan,2022-04-18,5948889.0,5253929.0,4603187.0,2022-05-09,AFG
Afghanistan,2021-08-09,1767239.0,769869.0,997370.0,2022-05-09,AFG
Afghanistan,2021-07-05,915671.0,726349.0,189322.0,2022-05-09,AFG
Afghanistan,2022-04-05,5874881.0,5190149.0,4534761.0,2022-05-09,AFG
Afghanistan,2021-04-07,103402.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2021-10-31,3398410.0,848598.0,2549812.0,2022-05-09,AFG
Afghanistan,2022-03-14,5672576.0,5016607.0,4349252.0,2022-05-09,AFG
Afghanistan,2021-04-12,145291.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2022-04-04,5872684.0,5188057.0,4532577.0,2022-05-09,AFG
Afghanistan,2021-04-22,250169.0,0.0,29903.0,2022-05-09,AFG


iso_code,date,total_doses,at_least_one_dose,fully_vaccinated,check
AFG,2021-02-22,0,0,0,1
AFG,2021-02-23,2396,0,0,1
AFG,2021-02-24,4792,0,0,1
AFG,2021-02-25,7188,0,0,1
AFG,2021-02-26,9584,0,0,1
AFG,2021-02-27,11980,0,0,1
AFG,2021-02-28,14376,0,0,1
AFG,2021-03-01,16773,0,0,1
AFG,2021-03-02,19169,0,0,1
AFG,2021-03-03,21565,0,0,1


In [0]:
display(df1.filter(col('iso_code')==iso_code))
display(who1.filter(col('iso_code')==iso_code))
display(df2.filter(col('iso_code')==iso_code))

country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed,iso_code
Afghanistan,2022-04-18,5948889.0,5253929.0,4603187.0,2022-05-09,AFG
Afghanistan,2021-08-09,1767239.0,769869.0,997370.0,2022-05-09,AFG
Afghanistan,2021-07-05,915671.0,726349.0,189322.0,2022-05-09,AFG
Afghanistan,2022-04-05,5874881.0,5190149.0,4534761.0,2022-05-09,AFG
Afghanistan,2021-04-07,103402.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2021-10-31,3398410.0,848598.0,2549812.0,2022-05-09,AFG
Afghanistan,2022-03-14,5672576.0,5016607.0,4349252.0,2022-05-09,AFG
Afghanistan,2021-04-12,145291.0,0.0,0.0,2022-05-09,AFG
Afghanistan,2022-04-04,5872684.0,5188057.0,4532577.0,2022-05-09,AFG
Afghanistan,2021-04-22,250169.0,0.0,29903.0,2022-05-09,AFG


iso_code,date,total_doses,at_least_one_dose,fully_vaccinated,check
AFG,2021-02-22,0,0,0,1
AFG,2021-02-23,2396,0,0,1
AFG,2021-02-24,4792,0,0,1
AFG,2021-02-25,7188,0,0,1
AFG,2021-02-26,9584,0,0,1
AFG,2021-02-27,11980,0,0,1
AFG,2021-02-28,14376,0,0,1
AFG,2021-03-01,16773,0,0,1
AFG,2021-03-02,19169,0,0,1
AFG,2021-03-03,21565,0,0,1


iso_code,country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed
AFG,Afghanistan,2021-10-31,3398410.0,848598.0,2549812.0,2022-05-09
AFG,Afghanistan,2021-07-14,1024168.0,770229.0,253939.0,2022-05-09
AFG,Afghanistan,2021-07-24,1171064.0,861336.0,309728.0,2022-05-09
AFG,Afghanistan,2021-08-20,1201286.0,770542.0,430744.0,2022-05-09
AFG,Afghanistan,2021-08-30,1979652.0,773002.0,441371.0,2022-05-09
AFG,Afghanistan,2022-04-03,5873352.0,5187976.0,4532249.0,2022-05-09
AFG,Afghanistan,2021-07-27,1277085.0,939835.0,337250.0,2022-05-09
AFG,Afghanistan,2021-07-18,1094257.0,811119.0,283138.0,2022-05-09


In [0]:
display(df2)

iso_code,country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed
BES1,"Bonaire, Sint Eustatius And Saba/Saba",2021-05-21,1300.0,0.0,1300.0,2022-05-09
CHN,China,2021-11-26,2483096365.0,5196118.0,4935101.0,2022-05-09
BRA,Brazil,2021-01-29,1656851.0,0.0,0.0,2022-05-09
ROU,Romania,2021-07-25,9322767.0,4946730.0,4376037.0,2022-05-09
COG,Congo,2021-07-24,125044.0,67651.0,57393.0,2022-05-09
COD,Democratic Republic Of The Congo,2022-04-24,1143186.0,1771324.0,1064851.0,2022-05-09
GUM,Guam,2021-05-14,129675.0,74213.0,56182.0,2022-05-09
MRT,Mauritania,2022-03-21,2650797.0,1553213.0,1057009.0,2022-05-09
SVN,Slovenia,2022-04-24,2955677.0,1238921.0,1207664.0,2022-05-09
PRY,Paraguay,2022-02-25,7638311.0,3569130.0,3017339.0,2022-05-09


### Save to Azure Storage / Register in Databricks metastore

In [0]:
delta_path = transformed_storage_path + '.delta'

# dbutils.fs.rm(delta_path, True)

df2.write.format("delta").mode("overwrite").save(delta_path)

In [0]:
# path for delta
print(transformed_storage_path + '.delta')

In [0]:
%sql

DROP TABLE IF EXISTS covax_supply_chain_analytics.analysis_vx_throughput_data_fixes;

CREATE TABLE covax_supply_chain_analytics.analysis_vx_throughput_data_fixes
USING DELTA
LOCATION '/mnt/covax-supply-chain-analytics/transformed/who/analysis_vx_throughput_data_fixes.delta'

In [0]:
display(spark.sql("SELECT * FROM covax_supply_chain_analytics.analysis_vx_throughput_data_fixes") \
        .orderBy(['iso_code', 'date'])) \
#         .select('iso_code') \
#         .drop_duplicates()
#        )

iso_code,country_name,date,total_doses,at_least_one_dose,fully_vaccinated,date_accessed
AFG,Afghanistan,2021-07-14,1024168.0,770229.0,253939.0,2022-05-09
AFG,Afghanistan,2021-07-18,1094257.0,811119.0,283138.0,2022-05-09
AFG,Afghanistan,2021-07-24,1171064.0,861336.0,309728.0,2022-05-09
AFG,Afghanistan,2021-07-27,1277085.0,939835.0,337250.0,2022-05-09
AFG,Afghanistan,2021-08-20,1201286.0,770542.0,430744.0,2022-05-09
AFG,Afghanistan,2021-08-30,1979652.0,773002.0,441371.0,2022-05-09
AFG,Afghanistan,2021-10-31,3398410.0,848598.0,2549812.0,2022-05-09
AFG,Afghanistan,2022-04-03,5873352.0,5187976.0,4532249.0,2022-05-09
AGO,Angola,2021-08-23,1869933.0,1025212.0,844721.0,2022-05-09
AGO,Angola,2021-10-26,6164219.0,4498272.0,1665947.0,2022-05-09


##### Query Delta Log

In [0]:
display(
  spark.sql("DESCRIBE HISTORY delta. `/mnt/covax-supply-chain-analytics/transformed/who/analysis_vx_throughput_data_fixes.delta`")
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
34,2022-05-09T21:04:26.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,33,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1058, numOutputBytes -> 27778)",
33,2022-05-03T17:35:49.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,32,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1022, numOutputBytes -> 27049)",
32,2022-04-26T03:23:56.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,31,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 925, numOutputBytes -> 25156)",
31,2022-04-18T22:56:37.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,30,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 921, numOutputBytes -> 25080)",
30,2022-04-11T22:22:43.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,29,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 904, numOutputBytes -> 24704)",
29,2022-04-04T20:02:42.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,28,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 24168, numOutputRows -> 882)",
28,2022-03-28T22:48:13.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,27,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 24160, numOutputRows -> 882)",
27,2022-03-21T18:25:07.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,26,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 23475, numOutputRows -> 849)",
26,2022-03-15T01:35:24.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,25,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 22763, numOutputRows -> 818)",
25,2022-03-07T14:34:37.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1884626790114274),1112-212424-shuwbub0,24,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 22698, numOutputRows -> 816)",


### Appendix