### Country Vx Throughput Analysis - Supply Data
 
**Note:** Gets data for the WHO/BMGF/Gavi Vx Throughput task force.

* Source:
  - supply data input files are located here: https://teams.microsoft.com/_#/files/Dashboard%20of%20Dashboards?threadId=19%3A73aa2d526027440684f61b9116291b10%40thread.tacv2&ctx=channel&context=Data%2520Inputs&rootfolder=%252Fsites%252FCOVID-19GDPTeam%252FShared%2520Documents%252FDashboard%2520of%2520Dashboards%252F04_Analysis%252FCovax%2520Throughput%2520Analysis%252FData%2520Inputs
  - Files:
    - AFR_Revue COVID-19 Immunization DataEntry Tool_11 March 2021 AFR.xlsx
    - data_export_WIISE_V_COV_PROC_LONG.xlsx
      - Grain: Country + Data_Source + Month + Manufacturer_Description
    - data_export_WIISE_V_COV_UTI_LONG.xlsx
      - Grain: County + Source + Month + Manufacturer_Description
    

* Libraries: 
  - Python
    - Office365-REST-Python-Client
    - openpyxl

* Built by: Jeremy Cooper
* Current owner: Jeremy Cooper
* Initial Build Date: 06/16/2021
* Latest Build Date: 06/16/2021

### Environment Management

In [0]:
# dbutils.widgets.removeAll()
# dbutils.widgets.remove("iso_code")

In [0]:
# # Dataset Name, will be used for the Metastore Table, Folder Name for transformed outputs
# dbutils.widgets.text("Dataset", "")

# # Project Name will be used for folder Name for transformed outputs
# dbutils.widgets.text("Project", "")

# # Team name should be consistent with the Blob Storage Container
# dbutils.widgets.text("Partner","")

# # Team name should be consistent with the Blob Storage Container
# dbutils.widgets.text("Source","")

# dbutils.widgets.text("iso_code", "")

#### Notebook Setup

##### Import any libraries or nested notebooks

In [0]:
pip install Office365-REST-Python-Client

In [0]:
pip install openpyxl

In [0]:
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File

from openpyxl import load_workbook
import pandas as pd
import datetime

from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

##### Initialize File Paths

In [0]:
storage_root = "/mnt/"+dbutils.widgets.get("Partner")+"/"
storage_branch = "/"+dbutils.widgets.get("Source")+"/" +dbutils.widgets.get("Dataset")

raw_storage_path = storage_root + "raw" +storage_branch
dbfs_raw_storage_path = "/dbfs"+raw_storage_path

transformed_storage_path = storage_root + "transformed" +storage_branch
dbfs_transformed_storage_path = "/dbfs"+transformed_storage_path

print(raw_storage_path)
print(transformed_storage_path)

### Get Data

In [0]:
uti_file_name = 'data_export_WIISE_V_COV_UTI_LONG'
sheet_name = 'data_export_WIISE_V_COV_UTI_LON'

In [0]:
# parameters for reading data directly from Excel stored on Teams/SharePoint
Tenant_ID = '296b3838-4bd5-496c-bd4b-f456ea743b74'
Client_ID = 'bdd5dcc6-2d9b-40d8-b419-61633eb0e380'
Client_secret = dbutils.secrets.get("dse-datasci-keyvault", "GDP-Covax-SPSecret")

sp_url = 'https://bmgf.sharepoint.com/sites/COVID-19GDPTeam'
app_settings = {
     'url': sp_url,
     'client_id': Client_ID,
     'client_secret': Client_secret,
}

ctx_auth = AuthenticationContext(url=app_settings['url'])
ctx_auth.acquire_token_for_app(client_id=app_settings['client_id'], client_secret=app_settings['client_secret'])
ctx = ClientContext(app_settings['url'], ctx_auth)

In [0]:
iso_code = dbutils.widgets.get("iso_code")

In [0]:
country_dimension = spark.sql("SELECT * FROM covax_supply_chain_analytics.covax_sca_country_dimension") \
  .select('iso_code', 'country_name_friendly')

awho = spark.sql("SELECT * FROM covax_supply_chain_analytics.africa_who_vx")

vx_supply = spark.sql("SELECT * FROM covax_supply_chain_analytics.vx_supply")

### Transformation

In [0]:
# # define schema for dataframe
# schema1 = StructType([
#   StructField("iso_code", StringType()),
#   StructField("country_name", StringType()),
#   StructField("doses_received", StringType()),
#   StructField("procurement_type", StringType()),
#   StructField("date", StringType()),
# ])

# file_path = '/sites/COVID-19GDPTeam/Shared Documents/Dashboard of Dashboards/04_Analysis/Covax Throughput Analysis/Data Inputs/data_export_WIISE_V_COV_PROC_LONG.xlsx'
# local_save_path = '/dbfs/tmp/data_export_WIISE_V_COV_PROC_LONG.xlsx'
# sheet_name = 'v_COV_PROC_LONG'

# response = File.open_binary(ctx, file_path)
# with open(local_save_path, "wb") as local_file:
#     local_file.write(response.content)

# # read in data, convert to df, validate columns, filter out blank rows, convert to spark df
# wb = load_workbook(local_save_path, data_only=True)
# ws = wb[sheet_name]
# df = pd.DataFrame(ws.values)
# df.columns = df.iloc[0]
# df = df[1:].astype(str)
# df = df[['ISO_3_CODE', 'COUNTRYNAME', 'YEAR', 'MONTH', 'TOTAL_DOSES', 'PROCUREMENT_TYPE']]
# df = df[df['COUNTRYNAME']!='None']

# df.columns = ['iso_code', 'country_name', 'year', 'month', 'doses_received', 'procurement_type']

# # create date column
# df['day'] = '1'
# cols = ['year', 'month', 'day']
# df['date'] = df[cols].apply(lambda x: '-'.join(x.values.astype(str)), axis='columns')
# df['date'] = pd.to_datetime(df['date']).dt.date
# df = df.drop(columns=['year', 'month', 'day'])

# df['date'] = df['date'].astype(str)
# df_proc = spark.createDataFrame(df, schema=schema1)

# df_proc = df_proc \
#  .groupBy('iso_code', 'date').agg(sum('doses_received')).withColumnRenamed('sum(doses_received)', 'doses_received') \
#   .withColumn('cumulative_doses_received', sum('doses_received').over(Window.partitionBy('iso_code').orderBy('date').rowsBetween(-sys.maxsize, 0))) \
#   .select('iso_code', 'date', 'cumulative_doses_received') \
#   .toDF('iso_code', 'date', 'cumulative_doses_received_proc')

# # display(df)
# # display(df_proc)

In [0]:
# define schema for dataframe
schema1 = StructType([
  StructField("iso_code", StringType()),
  StructField("country_name", StringType()),
  StructField("data_source", StringType()),
  StructField("manufacturer", StringType()),
  StructField("doses_received", StringType()),
  StructField("date", StringType()),
])

file_path = '/sites/COVID-19GDPTeam/Shared Documents/Dashboard of Dashboards/04_Analysis/Covax Throughput Analysis/Data Inputs/' + uti_file_name + '.xlsx'
local_save_path = '/dbfs/tmp/' + uti_file_name + '.xlsx'
# sheet_name = 'v_COV_UTI_LONG'

response = File.open_binary(ctx, file_path)
with open(local_save_path, "wb") as local_file:
    local_file.write(response.content)

# read in data, convert to df, validate columns, filter out blank rows, convert to spark df
wb = load_workbook(local_save_path, data_only=True)
ws = wb[sheet_name]
df = pd.DataFrame(ws.values)
df.columns = df.iloc[0]
df = df[1:].astype(str)
df = df[['ISO_3_CODE', 'COUNTRYNAME', 'YEAR', 'MONTH', 'DATA_SOURCE', 'MANUFACTURER_DESCRIPTION', 'TOTAL_DOSES_REC']]
df = df[df['COUNTRYNAME']!='None']

df.columns = ['iso_code', 'country_name', 'year', 'month', 'data_source', 'manufacturer', 'doses_received']

# create date column
df['day'] = '1'
cols = ['year', 'month', 'day']
df['date'] = df[cols].apply(lambda x: '-'.join(x.values.astype(str)), axis='columns')
df['date'] = pd.to_datetime(df['date']).dt.date
df = df.drop(columns=['year', 'month', 'day'])

df['date'] = df['date'].astype(str)
df_uti = spark.createDataFrame(df, schema=schema1)

# logic for aggregating the Utilization data appropriately:
# group by (iso_code + data_source + manufacturer + date) and take max doses received
# group by (iso_code + data_source + date) and take the sum
# group by (iso_code + date) and take the max
df_uti = df_uti \
 .groupBy('iso_code', 'data_source', 'manufacturer', 'date').agg(max('doses_received')).withColumnRenamed('max(doses_received)', 'doses_received') \
 .groupBy('iso_code', 'data_source', 'date').agg(sum('doses_received')).withColumnRenamed('sum(doses_received)', 'doses_received') \
 .groupBy('iso_code', 'date').agg(max('doses_received')).withColumnRenamed('max(doses_received)', 'doses_received') \
 .withColumn('monthly_doses_recieved_uti', when((col('doses_received') - lag(col('doses_received')).over(Window.partitionBy('iso_code').orderBy('date'))).isNull(), col('doses_received')) \
             .otherwise(col('doses_received') - lag(col('doses_received')).over(Window.partitionBy('iso_code').orderBy('date')))) \
 .withColumnRenamed('doses_received', 'cumulative_doses_received_uti')

# datestamp dataframe
df_uti = df_uti.withColumn("date_accessed", current_date())

display(df[df['iso_code']==iso_code])
display(df_uti.orderBy('iso_code', 'date').filter(col('iso_code')==iso_code))
display(df_uti.filter(col('monthly_doses_recieved_uti')<0))

iso_code,country_name,data_source,manufacturer,doses_received,date
CIV,Côte d'Ivoire,EJRF,Serum Institute of India,554000,2021-03-01
CIV,Côte d'Ivoire,EJRF,AstraZeneca,554000,2021-04-01
CIV,Côte d'Ivoire,AFR,AstraZeneca,729000,2021-05-01
CIV,Côte d'Ivoire,AFR,Serum Institute of India,50000,2021-05-01
CIV,Côte d'Ivoire,EJRF,AstraZeneca,1095950,2021-07-01
CIV,Côte d'Ivoire,EJRF,Pfizer BioNTech,100620,2021-07-01
CIV,Côte d'Ivoire,EJRF,Beijing Bio-Institute Biological Products (CNBG),100000,2021-07-01
CIV,Côte d'Ivoire,EJRF,AstraZeneca,827130,2021-06-01


iso_code,date,cumulative_doses_received_uti,monthly_doses_recieved_uti,date_accessed
CIV,2021-03-01,554000.0,554000.0,2021-09-13
CIV,2021-04-01,554000.0,0.0,2021-09-13
CIV,2021-05-01,779000.0,225000.0,2021-09-13
CIV,2021-06-01,827130.0,48130.0,2021-09-13
CIV,2021-07-01,1296570.0,469440.0,2021-09-13


iso_code,date,cumulative_doses_received_uti,monthly_doses_recieved_uti,date_accessed
CAF,2021-05-01,115000.0,-115000.0,2021-09-13
DMA,2021-07-01,20000.0,-98000.0,2021-09-13
GIN,2021-05-01,934400.0,-69000.0,2021-09-13
MRT,2021-05-01,175700.0,-27100.0,2021-09-13
NER,2021-05-01,780000.0,-200.0,2021-09-13
NER,2021-07-01,931200.0,-151200.0,2021-09-13
SLV,2021-05-01,940400.0,-793600.0,2021-09-13
SYC,2021-05-01,140000.0,-1000.0,2021-09-13


### Save to Azure Storage / Register in Databricks metastore

In [0]:
delta_path = transformed_storage_path + '.delta'

# dbutils.fs.rm(delta_path, True)

df_uti.write.format("delta").mode("overwrite").save(delta_path)

In [0]:
# path for delta
print(transformed_storage_path + '.delta')

In [0]:
%sql

DROP TABLE IF EXISTS covax_supply_chain_analytics.analysis_vx_throughput_supply;

CREATE TABLE covax_supply_chain_analytics.analysis_vx_throughput_supply
USING DELTA
LOCATION '/mnt/covax-supply-chain-analytics/transformed/who/analysis_vx_throughput_supply.delta'

In [0]:
display(spark.sql("SELECT * FROM covax_supply_chain_analytics.analysis_vx_throughput_supply").orderBy('iso_code', 'date'))
display(spark.sql("SELECT * FROM covax_supply_chain_analytics.analysis_vx_throughput_supply").groupby('iso_code').agg(count('*')))

iso_code,date,cumulative_doses_received_uti,monthly_doses_recieved_uti,date_accessed
AFG,2021-03-01,968000.0,968000.0,2021-09-13
AGO,2021-04-01,1359000.0,1359000.0,2021-09-13
AGO,2021-05-01,1509620.0,150620.0,2021-09-13
AGO,2021-06-01,1610930.0,101310.0,2021-09-13
AGO,2021-07-01,1835695.0,224765.0,2021-09-13
BEN,2021-04-01,347000.0,347000.0,2021-09-13
BEN,2021-05-01,347000.0,0.0,2021-09-13
BEN,2021-06-01,347000.0,0.0,2021-09-13
BEN,2021-07-01,649400.0,302400.0,2021-09-13
BFA,2021-05-01,115000.0,115000.0,2021-09-13


iso_code,count(1)
ZMB,4
MOZ,5
SOM,1
COD,4
ETH,5
GNQ,4
UKR,4
CMR,4
GHA,4
NER,5


##### Query Delta Log

In [0]:
display(
  spark.sql("DESCRIBE HISTORY delta. `/mnt/covax-supply-chain-analytics/transformed/who/analysis_vx_throughput_supply.delta`")
)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2021-09-13T23:59:10.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2262750785590075),0210-222124-taper262,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 4430, numOutputRows -> 223)",
0,2021-09-13T23:35:54.000+0000,6136552160696939,jeremy.cooper@gatesfoundation.org,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2262750785590075),0716-171126-shelf806,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 4430, numOutputRows -> 223)",


## Appendix