In [1]:
import pyspark
import pyspark.sql
from pyspark.sql import *
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql import DataFrame
import Pandas

In [2]:
spark = SparkSession \
    .builder \
    .appName("Vaccine Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [68]:
def test_data_frame_Creation(df):
    if df!=None:
        assert df.count() == 3
        
def test_Data_frame_data(df):
    if df!=None:
        assert len(df.columns) == 4
    if df.count() > df.dropDuplicates(["Name", "VaccinationType", "VaccinationDate", "Country"]).count():
        raise ValueError('Data has duplicates')
        
def test_Data_frame_Agg_logic(df1,df2,df3,df4):
    if df1!=None and df2!=None and df3!=None and df4!=None:
        assert df1.count()+df2.count()+df3.count() == df4.count()
        
def test_Data_frame_vacc_count(df,df1):
    if df!=None and df1!=None:
        assert df1.select(sum("count")).collect()[0][0] == df.count()
        
def test_Data_frame_percent_vaccinated_logic(df):
    if df!=None:
        assert df.collect()[0][1] == 30
        
def test_Data_frame_percent_contri_logic(df):
    if df!=None:
        assert int(df.collect()[0][1]) == 33

In [3]:
# Reading CSV File
IND_DF = spark.read.csv("hdfs://nameservice1/user/adityabend99edu/IND.csv", header='true', 
                      inferSchema='true')
USA_DF = spark.read.csv("hdfs://nameservice1/user/adityabend99edu/USA.csv", header='true', 
                      inferSchema='true')

In [6]:
AUS_DF = spark.read.csv("hdfs://nameservice1/user/adityabend99edu/AUS.csv", header='true', 
                      inferSchema='true')

In [None]:
# Converting xlsx file to csv and then reading
df3 = pandas.read_excel('AUS.xlsx') 
df3.to_csv('AUS.csv', encoding='utf-8', index=False)

In [25]:
# Data Cleansing
AUS_DF = AUS_DF.withColumn("Country", lit("AUS")).withColumnRenamed("Vaccine Type", "VaccinationType").withColumnRenamed("Patient Name", "Name").withColumnRenamed("Date of Vaccination", "VaccinationDate").drop("Date of Birth","Unique ID")

AUS_DF.show()

#Testing
print(test_Data_frame_data(AUS_DF))

+---------+---------------+-------------------+-------+
|     Name|VaccinationType|    VaccinationDate|Country|
+---------+---------------+-------------------+-------+
|     Mike|            LMN|2022-05-11 00:00:00|    AUS|
|Jonnathan|            XYZ|         2021-13-13|    AUS|
| Cristina|            ABC|2022-03-12 00:00:00|    AUS|
+---------+---------------+-------------------+-------+

None


In [26]:
# Data Cleansing
USA_DF = USA_DF.withColumn("Country", lit("USA")).drop("ID")

USA_DF.show()

#Testing
print(test_Data_frame_data(USA_DF))

+----+---------------+---------------+-------+
|Name|VaccinationType|VaccinationDate|Country|
+----+---------------+---------------+-------+
| Sam|            EFG|        6152022|    USA|
|John|            XYZ|        1052022|    USA|
|Mike|            ABC|       12282021|    USA|
+----+---------------+---------------+-------+

None


In [27]:
# Data Cleansing
IND_DF = IND_DF.drop("DOB","Free or Paid","ID")
IND_DF = IND_DF.withColumn("Country", lit("IND"))

IND_DF.show()

#Testing
print(test_Data_frame_data(IND_DF))

+------+---------------+-------------------+-------+
|  Name|VaccinationType|    VaccinationDate|Country|
+------+---------------+-------------------+-------+
| Vikas|            XYZ|2022-01-01 00:00:00|    IND|
| Rahul|            ABC|2022-03-05 00:00:00|    IND|
|Sameer|            ABC|2022-02-20 00:00:00|    IND|
+------+---------------+-------------------+-------+

None


In [57]:
# Data merging into single source of truth
dfs = [AUS_DF,IND_DF,USA_DF]
Combine_df = reduce(DataFrame.unionAll, dfs)

Combine_df.show()

#Testing
print(test_Data_frame_data(Combine_df))
print(test_Data_frame_Agg_logic(AUS_DF,IND_DF,USA_DF,Combine_df))

+---------+---------------+-------------------+-------+
|     Name|VaccinationType|    VaccinationDate|Country|
+---------+---------------+-------------------+-------+
|     Mike|            LMN|2022-05-11 00:00:00|    AUS|
|Jonnathan|            XYZ|         2021-13-13|    AUS|
| Cristina|            ABC|2022-03-12 00:00:00|    AUS|
|    Vikas|            XYZ|2022-01-01 00:00:00|    IND|
|    Rahul|            ABC|2022-03-05 00:00:00|    IND|
|   Sameer|            ABC|2022-02-20 00:00:00|    IND|
|      Sam|            EFG|            6152022|    USA|
|     John|            XYZ|            1052022|    USA|
|     Mike|            ABC|           12282021|    USA|
+---------+---------------+-------------------+-------+

None
None


In [69]:
# Total vaccination count by country and vaccination type
Vaccination_Count_By_Country = df.select("Country","VaccinationType").groupBy("Country", "VaccinationType").count()

Vaccination_Count_By_Country.show()

#Testing
print(test_Data_frame_vacc_count(Combine_df,Vaccination_Count_By_Country))

+-------+---------------+-----+
|Country|VaccinationType|count|
+-------+---------------+-----+
|    AUS|            ABC|    1|
|    AUS|            XYZ|    1|
|    IND|            ABC|    2|
|    USA|            XYZ|    1|
|    AUS|            LMN|    1|
|    IND|            XYZ|    1|
|    USA|            EFG|    1|
|    USA|            ABC|    1|
+-------+---------------+-----+

None


In [40]:
# %vaccination in each country (You can assume values for total population) 
# Let Total Population Count be 10
total_pop = 10
Vaccination_Count_By_Country.createOrReplaceTempView("QUERY_TABLE")
Percentage_Vaccinated_By_Country = spark.sql("select Country, sum(count) * 100 /{0} as perecent_Vaccinated  from QUERY_TABLE  group by Country".format(total_pop))

Percentage_Vaccinated_By_Country.show()

#Testing
print(test_Data_frame_percent_vaccinated_logic(Percentage_Vaccinated_By_Country))

+-------+-------------------+
|Country|perecent_Vaccinated|
+-------+-------------------+
|    AUS|               30.0|
|    USA|               30.0|
|    IND|               30.0|
+-------+-------------------+

None


In [46]:
# vaccination contribution by country (Sum of percentages add up to 100)
df1 = Vaccination_Count_By_Country.select("Country","count").groupBy("Country").sum()
df1 = df1.withColumnRenamed("sum(count)","count")
df1.createOrReplaceTempView("QUERY_TABLE")
Percentage_Contribution_By_Country = spark.sql("select Country, count * 100/(select sum(count) from QUERY_TABLE ) as percent_contri  from QUERY_TABLE")

Percentage_Contribution_By_Country.show()

#Testing
print(test_Data_frame_percent_contri_logic(Percentage_Contribution_By_Country))

+-------+------------------+
|Country|    percent_contri|
+-------+------------------+
|    AUS|33.333333333333336|
|    USA|33.333333333333336|
|    IND|33.333333333333336|
+-------+------------------+

None
