# DSCI 551 Project: File System & Analysis on COVID-19 Vaccination Rates
Gordon Su, Lucas Huang  
Section 32414 (Afternoon)

Date range: 2020-12-02 to 2022-03-29

### All necessary imports

In [215]:
import datetime
import firebase_admin
import json
import math
import numpy as np
import os
import pandas as pd
import requests

from firebase_admin import db
from firebase_admin import credentials

### Import and preview vaccination data

In [139]:
vacc_df = pd.read_csv('./data/country_vaccinations.csv')
vacc_df['date'] = pd.to_datetime(vacc_df['date'])
vacc_df['year'] = vacc_df.apply(lambda row: row.date.year, axis=1)
vacc_df['month'] = vacc_df.apply(lambda row: row.date.month, axis=1)
vacc_df['day'] = vacc_df.apply(lambda row: row.date.day, axis=1)

vacc_cols_to_drop = ['date', 'iso_code', 'total_vaccinations', 'people_vaccinated', 'people_fully_vaccinated', 
                'daily_vaccinations', 'total_vaccinations_per_hundred', 'people_vaccinated_per_hundred', 
                'people_fully_vaccinated_per_hundred', 'daily_vaccinations_per_million', 'vaccines', 
                'source_name', 'source_website']
vacc_df = vacc_df.drop(vacc_cols_to_drop, axis=1)
vacc_df.head(3)

Unnamed: 0,country,daily_vaccinations_raw,year,month,day
0,Afghanistan,,2021,2,22
1,Afghanistan,,2021,2,23
2,Afghanistan,,2021,2,24


### Import and preview PFE price data

In [141]:
pfe_prices_df = pd.read_csv('./data/PFE_historical_prices.csv')
pfe_prices_df['Date'] = pd.to_datetime(pfe_prices_df['Date'])
pfe_prices_df['year'] = pfe_prices_df.apply(lambda row: row.Date.year, axis=1)
pfe_prices_df['month'] = pfe_prices_df.apply(lambda row: row.Date.month, axis=1)
pfe_prices_df['day'] = pfe_prices_df.apply(lambda row: row.Date.day, axis=1)

pfe_cols_to_drop = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
pfe_prices_df = pfe_prices_df.drop(pfe_cols_to_drop, axis=1)
pfe_prices_df.head(3)

Unnamed: 0,Adj Close,year,month,day
0,38.354626,2020,12,2
1,37.68718,2020,12,3
2,37.922195,2020,12,4


### Convert vaccination dataframe into json format

In [152]:
selected_countries = ['Australia', 'Belgium', 'Canada', 'France', 'Germany', 'Italy', 'Japan', 'Switzerland', 'United Kingdom', 'United States']
selected_countries_json_files = []
all_years = [2020, 2021, 2022]
all_months = [month for month in range(1, 13)]

for country in selected_countries:
    country_df = vacc_df.loc[vacc_df['country'] == country]
    country_dict = {}
    
    for year in all_years:
        year_df = vacc_df.loc[vacc_df['year'] == year]
        year_dict = {}
        
        for month in all_months:
            month_df = vacc_df.loc[vacc_df['month'] == month]
            month_dict = dict(month_df[['day', 'daily_vaccinations_raw']].values)
            month_dict = {int(k):(0 if math.isnan(v) else int(v)) for k,v in month_dict.items()}
            
            year_dict[month] = month_dict
        country_dict[year] = year_dict
    
    file_name = './data/' + country + '_vaccinations.json'
    selected_countries_json_files.append(file_name)
    
    with open(file_name, 'w') as json_file:
        json.dump(country_dict, json_file)

### Convert PFE dataframe into json format

In [222]:
all_years = [2020, 2021, 2022]
all_months = [month for month in range(1, 13)]

pfe_dict = {}
    
for year in all_years:
    year_df = pfe_prices_df.loc[pfe_prices_df['year'] == year]
    year_dict = {}

    for month in all_months:
        month_df = pfe_prices_df.loc[pfe_prices_df['month'] == month]
        month_dict = dict(month_df[['day', 'Adj Close']].values)
        month_dict = {int(k):(0 if math.isnan(v) else float(v)) for k,v in month_dict.items()}

        year_dict[month] = month_dict
    pfe_dict[year] = year_dict

pfe_file_name = './data/PFE_historical_prices.json'

with open(pfe_file_name, 'w') as json_file:
    json.dump(pfe_dict, json_file)

### EDFS Implementation

In [None]:
cred = credentials.Certificate('./dsci-551-3cb2b-firebase-adminsdk-7ajia-c0dffd1d3c.json')
databaseURL = 'https://dsci-551-3cb2b-default-rtdb.firebaseio.com/'
default_app = firebase_admin.initialize_app(cred, {
    'databaseURL': databaseURL
})
ref = db.reference('/')

In [232]:
# help
def edfs_help():
    print('''\033[1m\033[4mFunctions:\033[0m\033[0m
    \033[1mremove_file(path)\033[0m \t\t\t Usage example: remove_file('/test/test.json')
    \033[1mremove_directory(path)\033[0m \t\t Usage example: remove_directory('/test/')
    \033[1mcat_file(path)\033[0m \t\t\t Usage example: cat_file('/test/test.json')
    \033[1mlist_contents(path)\033[0m \t\t Usage example: list_contents('/test/')
    \033[1mmake_directory(path)\033[0m \t\t Usage example: make_directory('/test/')
    \033[1mput_file(file, path)\033[0m \t\t Usage example: put_file('test.json', '/test/')
    \033[1mgetPartitionLocations(file_path)\033[0m \t\t Usage example: getPartitionLocations('/test/test.json')
    \033[1mreadPartition(file_path, partition_num)\033[0m \t\t Usage example: readPartition('/test/test.json', 1)''')

# rm
def remove_file(path):
    splitPath = os.path.splitext(path)
    
    # Remove file metadata
    metadata_ref = db.reference(splitPath[0] + splitPath[1].replace('.', ' '))
    children = metadata_ref.get()
    if children == None:
        print('File does not exist')
        return
    for key, value in children.items():
        metadata_ref.child(key).set({})
        
    # Remove file partition contents
    partition_contents_ref = db.reference('/partitions' + splitPath[0] + splitPath[1].replace('.', ' '))
    children = partition_contents_ref.get()
    if children != None:
        for key, value in children.items():
            partition_contents_ref.child(key).set({})
    
    parent_dir_path = path[:path.rfind('/')]
    
    # Parent directory is now empty
    metadata_parent_ref = db.reference(parent_dir_path)
    children = metadata_parent_ref.get()
    if children == None:
        print('Directory empty')
        
        # Set new path for metadata
        metadata_parent_ref.set({
            'New Path':
            {
                'New_Path': True
            }})

        # Set new path for partition contents
        partition_contents_parent_ref = db.reference('/partitions' + parent_dir_path)
        partition_contents_parent_ref.set({
            'New Path':
            {
                'New_Path': True
            }})
        
# rm directory
def remove_directory(path):
    # Remove metadata directory
    metadata_ref = db.reference(path)
    children = metadata_ref.get()
    if children == None:
        print('Directory does not exist')
        return
    for key, value in children.items():
        metadata_ref.child(key).set({})
        
    # Remove partition contents directory
    partition_contents_ref = db.reference('/partitions' + path)
    children = partition_contents_ref.get()
    if children != None:
        for key, value in children.items():
            partition_contents_ref.child(key).set({})

# cat
def cat_file(path):
    splitPath = os.path.splitext(path)
    metadata_ref = db.reference(splitPath[0] + splitPath[1].replace('.', ' '))
    contents = metadata_ref.get()
    if contents == None:
        print('File does not exist')
        return
    for key, value in contents.items():
        partition_ref = db.reference(value['location'])
        partition_contents = partition_ref.get()
        print(json.dumps(partition_contents, indent=1))

# ls
def list_contents(path):
    ref = db.reference(path)
    contents = ref.get()
    if contents == None:
        print('Directory does not exist')
        return
    for key, val in contents.items():
        print(key.replace(' ', '.'))   
        
# mkdir
def make_directory(path):
    # Set new path for metadata
    metadata_ref = db.reference(path)
    metadata_ref.set({
        'New Path':
        {
            'New_Path': True
        }})
    
    # Set new path for partition contents
    partition_contents_ref = db.reference('/partitions' + path)
    partition_contents_ref.set({
        'New Path':
        {
            'New_Path': True
        }})
    
# put
def put_file(file, path):
    dir_ref = db.reference(path)
    query = dir_ref.child('New Path').get()
    if query != None:
        dir_ref.child('New Path').set({})
        
    name_of_file = file.replace('.', ' ')
    file_ref = db.reference(path + name_of_file)
    
    dir_partition_contents_ref = db.reference('/partitions' + path)
    query = dir_partition_contents_ref.child('New Path').get()
    if query != None:
        dir_partition_contents_ref.child('New Path').set({})
        
    with open('./data/' + file, 'r') as f:
        contents = json.load(f)
        contents_df = pd.DataFrame(contents.items())
        partition_metadata = {}
        partition_num = 1
        
        for index, row in contents_df.iterrows():
            year = int(row[0])
            year_data = row[1]
            months = list(year_data.keys())
            months.sort()
            
            for month in months:
                partition_name = 'p' + str(partition_num)
                partition_path = '/partitions' + path + name_of_file + '/' + partition_name
                partition_ref = db.reference(partition_path)
                partition_dict = {}
                partition_num += 1
                
                # Store metadata
                metadata = {}
                metadata['location'] = partition_path
                metadata['year'] = year
                metadata['month'] = int(month)
                partition_metadata[partition_name] = metadata
                
                # Store actual contents in partition
                for day, value in year_data[month].items():
                    partition_dict[day] = value
                partition_ref.set(partition_dict)
                
        partition_metadata['metadata'] = {'num_partitions': partition_num - 1}
        file_ref.set(partition_metadata)
        
# getPartitionLocations (for PMR)
def getPartitionData(file_path):
    partition_data = []
    splitPath = os.path.splitext(file_path)
    
    metadata_ref = db.reference(splitPath[0] + splitPath[1].replace('.', ' '))
    partition_metadata = metadata_ref.get()
    num_partitions = partition_metadata['metadata']['num_partitions']
    
    for partition_num in range(1, num_partitions + 1):
        partition_name = 'p' + str(partition_num)
        location = partition_metadata[partition_name]['location']
        year = partition_metadata[partition_name]['year']
        month = partition_metadata[partition_name]['month']
        partition_data.append([location, year, month])
        
    return partition_data
    
# readPartition (for PMR)
def readPartition(partition_content_path):
    partition_content_ref = db.reference(partition_content_path)
    content = partition_content_ref.get()
    content_df = pd.DataFrame(content)
    content_df = content_df.fillna(method='ffill')
    content_df = content_df.fillna(method='bfill')
    return content_df

In [235]:
ddd = readPartition('/partitions/PFE_data/PFE_historical_prices json/p1')
print(len(ddd))
ddd.head(32)

32


Unnamed: 0,0
0,55.338352
1,55.338352
2,55.338352
3,55.338352
4,53.267437
5,54.341972
6,53.570259
7,54.429882
8,34.904591
9,34.904591


### Partition-based map and reduce (PMR) implementation

In [214]:
# Returns dataframe of all data within data range
# Returns -1 if no data within given range
def mapPartition(partition_content_path, partition_year, partition_month, start_date, end_date):
    partition_date = datetime.datetime(partition_year, partition_month, 1)
    start_year = start_date.year
    start_month = start_date.month
    start_day = start_date.day
    end_year = end_date.year
    end_month = end_date.month
    end_day = end_date.day
    
    partition_results = -1
    
    # Partition stores data for first month in date range
    if (partition_year == start_year) and (partition_month == start_month):
        partition_results = readPartition(partition_content_path)
        partition_results = partition_results.iloc[]
    
    # Partition stores data for last month in date range
    elif (partition_year == end_year) and (partition_month == end_month):
        partition_results = readPartition(partition_content_path)
        partition_results = partition_results.iloc[]
        
    # Partition stores data for middle month in date range
    elif (partition_date > start_date) and (partition_date < end_date):
        partition_results = readPartition(partition_content_path)
    
    return partition_results
    
# Joins and returns all dataframes
# Returns -1 if no valid data
def reduce(all_partition_results):
    reduced_df = -1
    
    for partition_df in all_partition_results:
        if partition_df != -1 and len(partition_df) > 0:
            if reduced_df == -1:
                reduced_df = partition_df
            else:
                reduced_df = pd.concat([reduced_df, partition_df])
    return reduced_df

# Returns dataframe of all data in data range
# Returns -1 if no results found
def pmf(file_path, start_date, end_date):
    all_partitions_data = getPartitionData(file_path)
    all_partition_results = []
    
    for partition_data in all_partitions_data:
        location = partition_data[0]
        year = partition_data[0]
        month = partition_data[0]
        partition_results = mapPartition(location, year, month, start_date, end_date)
        all_partition_results.append(partition_results)
        
    return reduce(all_partition_results)

SyntaxError: invalid syntax (679650637.py, line 2)

### Demonstrate EDFS functionality

In [None]:
edfs_help()

In [None]:
make_directory('/PFE_data/')

In [223]:
put_file('PFE_historical_prices.json', '/PFE_data/')

In [None]:
make_directory('/country_vacc_data/')

In [None]:
for selected_countries_json_files:
    put_file(country_vacc_file, '/country_vacc_data/')

In [None]:
remove_file('/PFE_data/PFE_historical_prices.json')

In [None]:
remove_directory('/PFE_data/')

In [213]:
put_file('PFE_historical_prices.json', '/PFE_data/')

In [124]:
readPartition('/test/test.json', 2)

{'Person3': {'description': 'Eligendi voluptate in nostrum odit. Et voluptas mollitia suscipit. Optio quas ut voluptas.', 'email': 'patience67@gmail.com', 'name': 'Mr. Daryl Leffler Jr.', 'phone': '+1-551-867-7600'}}


In [204]:
put_file('PFE_historical_prices.json', '/test/')

In [203]:
remove_file('/test/test json')

Directory empty


In [29]:
edfs_help()

[1m[4mFunctions:[0m[0m
    [1mremove_file(path)[0m 			 Usage example: remove_file('/test/test.json')
    [1mremove_directory(path)[0m 		 Usage example: remove_directory('/test/')
    [1mcat_file(path)[0m 			 Usage example: cat_file('/test/test.json')
    [1mlist_contents(path)[0m 		 Usage example: list_contents('/test/')
    [1mmake_directory(path)[0m 		 Usage example: make_directory('/test/')
    [1mput_file(file, path)[0m 		 Usage example: put_file('test.json', '/test/')


In [78]:
make_directory('/test/')

In [100]:
put_file('test.json', '/test/')

In [99]:
remove_file('/test/test.json')

In [77]:
remove_directory('/test/')

In [208]:
list_contents('/test/')

PFE_historical_prices.json


In [107]:
cat_file('/test/test.json')

{
 "Person1": {
  "description": "Dolor a sit illum vel corrupti est quia. Dolorem repudiandae molestiae qui ut. Repellendus in mollitia error repellat necessitatibus molestiae.",
  "email": "cale83@hotmail.com",
  "name": "Lorenza Nienow",
  "phone": "1-786-677-7753"
 },
 "Person2": {
  "description": "Amet veniam dolorum dolor sit voluptas. Voluptatem sit amet et provident. Culpa soluta nobis voluptatibus blanditiis animi veritatis officia.",
  "email": "dietrich.janet@gmail.com",
  "name": "Jack Koch PhD",
  "phone": "947.729.3179"
 }
}
{
 "Person3": {
  "description": "Eligendi voluptate in nostrum odit. Et voluptas mollitia suscipit. Optio quas ut voluptas.",
  "email": "patience67@gmail.com",
  "name": "Mr. Daryl Leffler Jr.",
  "phone": "+1-551-867-7600"
 }
}


In [206]:
list_contents('/test/')

PFE_historical_prices json


In [153]:
cat_file('/dummy/dummy.json')

File does not exist


In [154]:
make_directory('/test2/')

In [155]:
put_file('test.json', '/test2/')

In [159]:
remove_file('/test/test.json')

In [157]:
remove_file('/test2/test.json')