In [1]:
import findspark
import pandas as pd
import glob, os    
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql.types import *

In [2]:
k = glob.glob(os.path.join('C:\Study\spark\COVID-19\csse_covid_19_data\csse_covid_19_daily_reports', "*.csv"))
dictFiles = {Path(i).resolve().stem: i for i in k}
dictPD = {k: pd.read_csv(v) for k, v in dictFiles.items()}
dictPDDate = {k: v.assign(repDate=k) for k, v in dictPD.items()}
resultingFrame = pd.concat([v for _, v in dictPDDate.items()])
resultingFrame['ProvinceStateCombined'] = resultingFrame['Province/State'].combine_first(resultingFrame['Province_State'])
resultingFrame['CountryRegionCombined'] = resultingFrame['Country/Region'].combine_first(resultingFrame['Country_Region'])
resultingFrame['LatitudeCombined'] = resultingFrame['Latitude'].combine_first(resultingFrame['Lat'])
resultingFrame['LongitudeCombined'] = resultingFrame['Longitude'].combine_first(resultingFrame['Long_'])
resultingFrame['LastUpdateCombined'] = resultingFrame['Last Update'].combine_first(resultingFrame['Last_Update'])
resultingFrame = resultingFrame.drop(['Province/State', 'Country/Region', 'Last Update', 'Last_Update', 'Lat', 'Long_', 'Province_State', 'Country_Region', 'Latitude', 'Longitude'], axis=1)
resultingFrame['ReportingDate'] = pd.to_datetime(resultingFrame['repDate'], format='%m-%d-%Y')
resultingFrame['ReportingYear'] = resultingFrame['ReportingDate'].dt.year
resultingFrame['ReportingMonth'] = resultingFrame['ReportingDate'].dt.month
resultingFrame['ReportingDay'] = resultingFrame['ReportingDate'].dt.day
resultingFrame['ReportingMonthName'] = resultingFrame['ReportingDate'].dt.month_name()

In [3]:
resultingFrame = resultingFrame.apply(lambda x : x.fillna(0) if x.dtype.kind in 'biufc' else x.fillna(''))

In [4]:
schema = StructType([StructField("Confirmed", FloatType(), True)
                   ,StructField("Deaths", FloatType(), True)
                   ,StructField("Recovered", FloatType(), True)
                   ,StructField("repDate", StringType(), True)
                   ,StructField("FIPS", FloatType(), True)
                   ,StructField("Admin2", StringType(), True)
                   ,StructField("Active", FloatType(), True)
                   ,StructField("Combined_Key", StringType(), True)
                   ,StructField("Incidence_Rate", FloatType(), True)
                   ,StructField("Case-Fatality_Ratio", StringType(), True)
                   ,StructField("ProvinceStateCombined", StringType(), True)
                   ,StructField("CountryRegionCombined", StringType(), True)
                   ,StructField("LatitudeCombined", FloatType(), True)
                   ,StructField("LongitudeCombined", FloatType(), True)
                   ,StructField("LastUpdateCombined", StringType(), True)
                   ,StructField("ReportingDate", DateType(), True)
                   ,StructField("ReportingYear", IntegerType(), True)
                   ,StructField("ReportingMonth", IntegerType(), True)
                   ,StructField("ReportingDay", IntegerType(), True)
                   ,StructField("ReportingMonthName", StringType(), True)
                    ])

In [5]:
resultingFrame.head()

Unnamed: 0,Confirmed,Deaths,Recovered,repDate,FIPS,Admin2,Active,Combined_Key,Incidence_Rate,Case-Fatality_Ratio,ProvinceStateCombined,CountryRegionCombined,LatitudeCombined,LongitudeCombined,LastUpdateCombined,ReportingDate,ReportingYear,ReportingMonth,ReportingDay,ReportingMonthName
0,1.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Anhui,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
1,14.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Beijing,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
2,6.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Chongqing,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
3,1.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Fujian,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
4,0.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Gansu,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January


In [6]:
findspark.init()
#pyspark.SparkConf.set('spark.sql.execution.arrow.enabled', 'True')
sc = pyspark.SparkContext(appName="Covid")
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
sqlDF = sqlContext.createDataFrame(resultingFrame, schema=schema)
sqlDF.show()
sqlDF.createOrReplaceTempView("covid_data")
repDF = spark.sql("SELECT repDate, sum(Confirmed) as confirmed, sum(Deaths) as deaths, sum(Active) as active, sum(Recovered) as recovered from covid_data group by repDate order by repDate")
pdRepDF = repDF.toPandas()
sc.stop()

+---------+------+---------+----------+----+------+------+------------+--------------+-------------------+---------------------+---------------------+----------------+-----------------+------------------+-------------+-------------+--------------+------------+------------------+
|Confirmed|Deaths|Recovered|   repDate|FIPS|Admin2|Active|Combined_Key|Incidence_Rate|Case-Fatality_Ratio|ProvinceStateCombined|CountryRegionCombined|LatitudeCombined|LongitudeCombined|LastUpdateCombined|ReportingDate|ReportingYear|ReportingMonth|ReportingDay|ReportingMonthName|
+---------+------+---------+----------+----+------+------+------------+--------------+-------------------+---------------------+---------------------+----------------+-----------------+------------------+-------------+-------------+--------------+------------+------------------+
|      1.0|   0.0|      0.0|01-22-2020| 0.0|      |   0.0|            |           0.0|                0.0|                Anhui|       Mainland China|          

In [7]:
sc.stop()

In [8]:
pdRepDF.to_csv('output/daySpark.csv')

In [9]:
resultingFrame.dtypes

Confirmed                       float64
Deaths                          float64
Recovered                       float64
repDate                          object
FIPS                            float64
Admin2                           object
Active                          float64
Combined_Key                     object
Incidence_Rate                  float64
Case-Fatality_Ratio             float64
ProvinceStateCombined            object
CountryRegionCombined            object
LatitudeCombined                float64
LongitudeCombined               float64
LastUpdateCombined               object
ReportingDate            datetime64[ns]
ReportingYear                     int64
ReportingMonth                    int64
ReportingDay                      int64
ReportingMonthName               object
dtype: object

In [10]:
resultingFrame.head()

Unnamed: 0,Confirmed,Deaths,Recovered,repDate,FIPS,Admin2,Active,Combined_Key,Incidence_Rate,Case-Fatality_Ratio,ProvinceStateCombined,CountryRegionCombined,LatitudeCombined,LongitudeCombined,LastUpdateCombined,ReportingDate,ReportingYear,ReportingMonth,ReportingDay,ReportingMonthName
0,1.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Anhui,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
1,14.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Beijing,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
2,6.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Chongqing,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
3,1.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Fujian,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January
4,0.0,0.0,0.0,01-22-2020,0.0,,0.0,,0.0,0.0,Gansu,Mainland China,0.0,0.0,1/22/2020 17:00,2020-01-22,2020,1,22,January


In [11]:
resultingFrame[resultingFrame['repDate'] == '07-07-2020']

Unnamed: 0,Confirmed,Deaths,Recovered,repDate,FIPS,Admin2,Active,Combined_Key,Incidence_Rate,Case-Fatality_Ratio,ProvinceStateCombined,CountryRegionCombined,LatitudeCombined,LongitudeCombined,LastUpdateCombined,ReportingDate,ReportingYear,ReportingMonth,ReportingDay,ReportingMonthName
0,134.0,1.0,0.0,07-07-2020,45001.0,Abbeville,133.0,"Abbeville, South Carolina, US",546.336690,0.746269,South Carolina,US,34.223334,-82.461707,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
1,1068.0,43.0,0.0,07-07-2020,22001.0,Acadia,1025.0,"Acadia, Louisiana, US",1721.331292,4.026217,Louisiana,US,30.295065,-92.414197,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
2,1042.0,14.0,0.0,07-07-2020,51001.0,Accomack,1028.0,"Accomack, Virginia, US",3224.408962,1.343570,Virginia,US,37.767072,-75.632346,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
3,3252.0,23.0,0.0,07-07-2020,16001.0,Ada,3229.0,"Ada, Idaho, US",675.267397,0.707257,Idaho,US,43.452658,-116.241552,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
4,16.0,0.0,0.0,07-07-2020,19001.0,Adair,16.0,"Adair, Iowa, US",223.713647,0.000000,Iowa,US,41.330756,-94.471059,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3798,1297.0,348.0,591.0,07-07-2020,0.0,,358.0,Yemen,4.348560,26.831149,,Yemen,15.552727,48.516388,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
3799,1895.0,42.0,1348.0,07-07-2020,0.0,,505.0,Zambia,10.307901,2.216359,,Zambia,-13.133897,27.849332,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
3800,787.0,9.0,201.0,07-07-2020,0.0,,577.0,Zimbabwe,5.295054,1.143583,,Zimbabwe,-19.015438,29.154857,2020-07-08 05:33:48,2020-07-07,2020,7,7,July
3801,0.0,0.0,0.0,07-07-2020,41069.0,Wheeler,0.0,"Wheeler, Oregon, US",0.000000,0.000000,Oregon,US,44.726982,-120.028143,2020-07-06 19:33:59,2020-07-07,2020,7,7,July
