In [1]:
# Connect to AWS - Save Final Product

aws_access_key_id = ACCESS_KEY
aws_secret_access_key = SECRET_KEY
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)

In [2]:
from datetime import datetime, date, timedelta
import dateutil.relativedelta
import pandas

# Setting Current Date - Based on Run Date
now = (datetime.date(datetime.now()))

# Is Today's CSV File Available? 
# If not, "today" actually needs to be yesterday

def whenistoday():
  today = now.strftime("%m-%d-%Y")
  
  try:
    csv_path = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{}.csv'.format(today)
    # Path Test
    df = pandas.read_csv(csv_path, nrows = 5)
    return(today)
  except:
    today = (now - timedelta(days = 1)).strftime("%m-%d-%Y")
    return(today)

today = whenistoday()

if now != today:
  now = (now - timedelta(days = 1))
  
# Setting Other Relative Dates
yesterday = (now - timedelta(days = 1)).strftime("%m-%d-%Y")
twoday = (now - timedelta(days = 2)).strftime("%m-%d-%Y")
threeday = (now - timedelta(days = 3)).strftime("%m-%d-%Y")
fourday = (now - timedelta(days = 4)).strftime("%m-%d-%Y")
fiveday = (now - timedelta(days = 5)).strftime("%m-%d-%Y")
sixday = (now - timedelta(days = 6)).strftime("%m-%d-%Y")
oneweek = (now - timedelta(days = 7)).strftime("%m-%d-%Y")
eightday = (now - timedelta(days = 8)).strftime("%m-%d-%Y")
twoweek = (now - timedelta(days = 14)).strftime("%m-%d-%Y")
threeweek = (now - timedelta(days = 21)).strftime("%m-%d-%Y")

# Setting Relative Date - One Month Ago
# onemonth = now - dateutil.relativedelta.relativedelta(months=1)
# onemonth = datetime.strptime(str(onemonth) , '%Y-%m-%d').strftime("%m-%d-%Y")

In [3]:
# Prepare Loop for Import

from pyspark import SparkFiles, SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import s3fs
import boto3

timeframes = [today, yesterday, twoday, threeday, fourday, fiveday, sixday, oneweek, eightday, twoweek, threeweek]

# Clear Old Files
def DeleteDailyReports():
  s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
  bucket = s3.Bucket('erikatestbucket')
  bucket.objects.filter(Prefix="COVID-19/DailyReports").delete()

DeleteDailyReports()

In [4]:
# Loop to import files from Johns Hopkins CSSEGIS GitHub

for time in timeframes:
  url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{}.csv'.format(time)
  
  # Read CSV via Pandas
  pandasdf = pandas.read_csv(url)
  
  # Write CSV via Pandas
  
  bytes_to_write = pandasdf.to_csv(None, index = False).encode()
  fs = s3fs.S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key)
  with fs.open('s3://erikatestbucket/COVID-19/DailyReports/{}.csv'.format(time), 'wb') as f:
    f.write(bytes_to_write)

In [5]:
# UDF to Import Tables (Without using dictionary)
def importdf(time):
  df = spark.read.csv('s3://erikatestbucket/COVID-19/DailyReports/{}.csv'.format(time), header = True, inferSchema = True)
  return(df)

# Import Tables
dftoday = importdf(today)
dfyesterday = importdf(yesterday)
dftwoday = importdf(twoday)
dfthreeday = importdf(threeday)
dffourday = importdf(fourday)
dffiveday = importdf(fiveday)
dfsixday = importdf(sixday)
dfoneweek = importdf(oneweek)
dfeightday = importdf(eightday)
dftwoweek = importdf(twoweek)
dfthreeweek = importdf(threeweek)

In [6]:
# Prepare Other DFs for Join - Rename Fields
# Loop/UDF Later

from pyspark.sql import functions as sf

def cleantable(df, num):
  if df == dftoday:
    df = df.select('Combined_Key', 'Admin2', 'Province_State', 'Country_Region', 'Confirmed', 'Deaths', 'Recovered', 'Active')\
    .withColumnRenamed('Confirmed', 'Confirmed_{}'.format(num))\
    .withColumnRenamed('Deaths', 'Deaths_{}'.format(num))\
    .withColumnRenamed('Recovered', 'Recovered_{}'.format(num))\
    .withColumnRenamed('Active', 'Active_{}'.format(num))
  else:
     df = df.select('Combined_Key', 'Confirmed', 'Deaths', 'Recovered', 'Active')\
    .withColumnRenamed('Confirmed', 'Confirmed_{}'.format(num))\
    .withColumnRenamed('Deaths', 'Deaths_{}'.format(num))\
    .withColumnRenamed('Recovered', 'Recovered_{}'.format(num))\
    .withColumnRenamed('Active', 'Active_{}'.format(num))
      
  return df

df = cleantable(dftoday, '0')
dfyesterday = cleantable(dfyesterday, '1')
dftwoday = cleantable(dftwoday, '2')
dfthreeday= cleantable(dfthreeday, '3')
dffourday = cleantable(dffourday, '4')
dffiveday = cleantable(dffiveday, '5')
dfsixday = cleantable(dfsixday, '6')
dfoneweek = cleantable(dfoneweek, '7')
dfeightday = cleantable(dfeightday, '8')
dftwoweek = cleantable(dftwoweek, '14')
dfthreeweek = cleantable(dfthreeweek, '21')

In [7]:
# Join Tables Together

tables = [dfyesterday, dftwoday, dfthreeday, dffourday, dffiveday, dfsixday, dfoneweek, dfeightday, dftwoweek, dfthreeweek]

def join(df, table):
  df = df.join(table, 'Combined_Key', "left_outer")
  return df

for t in tables:
  df = join(df, t)
  
# Check that row count remains the same
df.count() == dftoday.count()

In [8]:
# Aggregate States - Drop Counties and Combined Key
# Reduce Tables To Topics

from pyspark.sql import functions as F


def ReduceTables(Newdf):
  Dict = {dfActive: 'Active', dfDeaths: 'Deaths', dfRecovered: 'Recovered', dfConfirmed: 'Confirmed'} 
  dftype = Dict.get(Newdf)

  Newdf = df.groupBy('Province_State', 'Country_Region')\
  .agg(F.sum("{}_0".format(dftype)).alias("{}_0".format(dftype)),
  F.sum("{}_1".format(dftype)).alias("{}_1".format(dftype)),
  F.sum("{}_2".format(dftype)).alias("{}_2".format(dftype)),
  F.sum("{}_3".format(dftype)).alias("{}_3".format(dftype)),
  F.sum("{}_4".format(dftype)).alias("{}_4".format(dftype)),
  F.sum("{}_5".format(dftype)).alias("{}_5".format(dftype)),
  F.sum("{}_6".format(dftype)).alias("{}_6".format(dftype)),
  F.sum("{}_7".format(dftype)).alias("{}_7".format(dftype)),
  F.sum("{}_8".format(dftype)).alias("{}_8".format(dftype)),
  F.sum("{}_14".format(dftype)).alias("{}_14".format(dftype)),
  F.sum("{}_21".format(dftype)).alias("{}_21".format(dftype)))
  return Newdf

dfActive = ReduceTables(dfActive)
dfDeaths = ReduceTables(dfDeaths)
dfRecovered = ReduceTables(dfRecovered)
dfConfirmed = ReduceTables(dfConfirmed)

In [9]:
# Adding Daily New Cases Columns

def AddColumns(df):  
  Dict = {dfActive: 'Active', dfDeaths: 'Deaths', dfRecovered: 'Recovered', dfConfirmed: 'Confirmed'} 
  dftype = Dict.get(df)

  for i in list(range(0, 8)):
    now = i
    then = i+1
    df = df.withColumn('{}New_{}'.format(dftype, now), df['{}_{}'.format(dftype, now)] - df['{}_{}'.format(dftype, then)])
  return df

dfConfirmed = AddColumns(dfActive)
dfDeaths = AddColumns(dfDeaths)
dfRecovered = AddColumns(dfRecovered)
dfActive = AddColumns(dfActive)

In [10]:
# Clear Old Files
def DeleteOutput():
  s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
  bucket = s3.Bucket('erikatestbucket')
  bucket.objects.filter(Prefix="COVID-19/Output").delete()

DeleteOutput()

In [11]:
Output = [dfConfirmed, dfDeaths, dfRecovered, dfActive]

for df in Output:
  Dict = {dfActive: 'Active', dfDeaths: 'Deaths', dfRecovered: 'Recovered', dfConfirmed: 'Confirmed'} 
  dftype = Dict.get(df)
  
  url = 's3a://erikatestbucket/COVID-19/Output/{}'.format(dftype)
  # S3 (CSV)
  df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(url)
  
  
  # Get Filename of "Path" that CSV is saved to
  BucketList = []
  bucket = s3.Bucket('erikatestbucket')
  BucketObjects = bucket.objects.filter(Prefix="COVID-19/Output/{}".format(dftype))
  
  for object in BucketObjects:
    BucketList.append(str(object))
  
  for object in BucketList:
    if 'csv' in object:
    
      # Get csv filename
      string = (object)
      filenameind = string.index('part')
      csvind = string.index('.csv')
      oldfilename = string[filenameind:csvind+4]
      newfilename = '{}.csv'.format(dftype)
      
      
      deletepath = 'COVID-19/Output/{}/'.format(dftype)
      newpath = 'COVID-19/Output/{}.csv'.format(dftype)
      oldpath = 'erikatestbucket/COVID-19/Output/{}/{}'.format(dftype, oldfilename)

      # Copy old CSV location to new
      s3.Object('erikatestbucket', newpath).copy_from(CopySource=oldpath)
      # Delete old loation
      bucket.objects.filter(Prefix=deletepath).delete()