By Evgenii Korostelev, Student ID number: 200251127

# COVID-19 booster vaccine and COVID-19 disease dashboard

This is a COVID-19 booster vaccine vaccine waves analysis against COVID-19 cases, deaths and hospital admissions.

In [5]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json
import os

In [6]:
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

In [7]:
# Load JSON files and store the raw data in some variable. Edit as appropriate
with open("cases.json", "rt") as INFILE:
    cases=json.load(INFILE)
with open("admissions.json", "rt") as INFILE:
    admissions=json.load(INFILE)
with open("deaths.json", "rt") as INFILE:
    deaths=json.load(INFILE)
with open("autumn22_doses.json", "rt") as INFILE:
    autumn22_doses=json.load(INFILE)
with open("spring23_doses.json", "rt") as INFILE:
    spring23_doses=json.load(INFILE)
with open("autumn23_doses.json", "rt") as INFILE:
    autumn23_doses=json.load(INFILE)
with open("spring24_doses.json", "rt") as INFILE:
    spring24_doses=json.load(INFILE)
with open("autumn24_doses.json", "rt") as INFILE:
    autumn24_doses=json.load(INFILE)

In [8]:
def wangle_data():
    #below is the data aggregation algorithm which i designed to use for aggregating data in booster vaccine .json files:
    #(Although it could be used to aggregate data in other files in my opinion, I think I designed it quite well for it to be able to be applied
    #to other data in .json files. It is an algorithm I designed to be quite broad in its applicability.
    def aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints=None):
        #Parameters: 
            #file_path (str): Path to the JSON file containing a list of dictionaries.
            #match_key (list): List of keys to compare for equality between datasets.
            #other_keys(list): List of other keys to include in the aggregated result.
            #sum_keys (list): List of keys whose values need to be aggregated.
            #match_constraints (dict, optional): A dictionary of key-value pairs to use as filtering constraints.
        with open(file_path, 'r') as file:
            data = json.load(file)
    
        if match_constraints:
            data = [
                item for item in data
                if all(item.get(key) == value for key, value in match_constraints.items())
            ]
            
        aggregated_result = []
        processed_key_combinations = set()
        
        for item in data:
            key_combination = tuple(item.get(key) for key in match_keys)
            if key_combination in processed_key_combinations:
                continue
                
    
            aggregated_item = {key: item.get(key) for key in match_keys}
            for key in other_keys:
                aggregated_item[key] = item.get(key)
    
    
            for key in sum_keys:
                aggregated_item[key] = item.get(key, 0)
            
            for other_item in data:
                other_key_combination = tuple(other_item.get(key) for key in match_keys)
                if other_key_combination == key_combination and other_item != item:
                    for key in sum_keys:
                        aggregated_item[key] += other_item.get(key, 0)
            aggregated_result.append(aggregated_item)
            processed_key_combinations.add(key_combination)
        return aggregated_result
    
    
    #Here I utilised the aggregate_data_in_json function to aggregate all the required data.
    #I also created seperate files for aggregated data.
    #Please also note that I specified different constraints for different files because there was a lot of duplicate data I figured which constraints
    #to use by looking at the data inside json files - how it is structured. (Please have a look inside some of those files and see for yourself).
    file_path = "autumn22_doses.json"
    match_keys = ["date"]
    other_keys = ["metric"]
    sum_keys = ["metric_value"]
    match_constraints = {"sex": "all", "age": "50+"}
    
    result = aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints)
    with open("aggregated_"+file_path, 'w') as aggregated_file:
        json.dump(result, aggregated_file)
    
    
    file_path = "spring23_doses.json"
    match_keys = ["date"]
    other_keys = ["metric"]
    sum_keys = ["metric_value"]
    match_constraints = {"sex": "all", "age": "75+"}
    
    result = aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints)
    with open("aggregated_"+file_path, 'w') as aggregated_file:
        json.dump(result, aggregated_file)
    
    
    file_path = "autumn23_doses.json"
    match_keys = ["date"]
    other_keys = ["metric"]
    sum_keys = ["metric_value"]
    
    match_constraints = {"sex": "all", "age": "65+"}
    
    
    result = aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints)
    with open("aggregated_"+file_path, 'w') as aggregated_file:
        json.dump(result, aggregated_file)
    
    
    file_path = "spring24_doses.json"
    match_keys = ["date"]
    other_keys = ["metric"]
    sum_keys = ["metric_value"]
    match_constraints = {"sex": "all", "age": "75+"}
    
    result = aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints)
    with open("aggregated_"+file_path, 'w') as aggregated_file:
        json.dump(result, aggregated_file)
    
    
    file_path = "autumn24_doses.json"
    match_keys = ["date"]
    other_keys = ["metric"]
    sum_keys = ["metric_value"]
    match_constraints = {"sex": "all", "age": "65+"}
    
    result = aggregate_data_in_json(file_path, match_keys, other_keys, sum_keys, match_constraints)
    with open("aggregated_"+file_path, 'w') as aggregated_file:
        json.dump(result, aggregated_file)
    
    
    with open("aggregated_autumn22_doses.json", "rt") as INFILE:
        aggregated_autumn22_doses=json.load(INFILE)
    with open("aggregated_spring23_doses.json", "rt") as INFILE:
        aggregated_spring23_doses=json.load(INFILE)
    with open("aggregated_autumn23_doses.json", "rt") as INFILE:
        aggregated_autumn23_doses=json.load(INFILE)
    with open("aggregated_spring24_doses.json", "rt") as INFILE:
        aggregated_spring24_doses=json.load(INFILE)
    with open("aggregated_autumn24_doses.json", "rt") as INFILE:
        aggregated_autumn24_doses=json.load(INFILE)

    #Data mugging:
    data={}
    for dataset in [admissions, cases, deaths, aggregated_autumn22_doses, aggregated_spring23_doses, aggregated_autumn23_doses, aggregated_spring24_doses, aggregated_autumn24_doses]:
        for entry in dataset:
            date=entry['date']
            metric=entry['metric']
            value=entry['metric_value']
            if date not in data:
                data[date]={}
            data[date][metric]=value

    dates=list(data.keys())
    dates.sort()

    def parse_date(datestring):
        """ Convert a date string into a pandas datetime object """
        return pd.to_datetime(datestring, format="%Y-%m-%d")

    startdate=parse_date(dates[0])
    enddate=parse_date(dates[-1])

    index=pd.date_range(startdate, enddate, freq='D')
    timeseriesdf=pd.DataFrame(index=index, columns=['cases', 'admissions', 'deaths', 'aggregated_autumn22_doses', 'aggregated_spring23_doses', 'aggregated_autumn23_doses', 'aggregated_spring24_doses', 'aggregated_autumn24_doses'])

    # translate the columns to our metrics
    metrics ={'cases': 'COVID-19_cases_casesByDay',
              'admissions': 'COVID-19_healthcare_admissionByDay',
              'deaths': 'COVID-19_deaths_ONSByDay',
              'aggregated_autumn22_doses': 'COVID-19_vaccinations_autumn22_dosesByDay', 
              'aggregated_spring23_doses': 'COVID-19_vaccinations_spring23_dosesByDay', 
              'aggregated_autumn23_doses': 'COVID-19_vaccinations_autumn23_dosesByDay', 
              'aggregated_spring24_doses': 'COVID-19_vaccinations_spring24_dosesByDay', 
              'aggregated_autumn24_doses': 'COVID-19_vaccinations_autumn24_dosesByDay'}

    for date, entry in data.items(): # each entry is a dictionary with cases, admissions and deaths
        pd_date=parse_date(date) # convert to Pandas format
        for column in ['cases', 'admissions', 'deaths', 'aggregated_autumn22_doses', 'aggregated_spring23_doses', 'aggregated_autumn23_doses', 'aggregated_spring24_doses', 'aggregated_autumn24_doses']: 
            metric_name=metrics[column]
            # Not assuming all values are there for every date - if a value is not available, insert a 0.0
            value= entry.get(metric_name, 0.0)
            # this is the way I access a specific location in the dataframe - by using .loc
            # and put index,column in a single set of [ ]
            timeseriesdf.loc[date, column]=value
            
    # fill in any remaining "holes" due to missing dates
    timeseriesdf.fillna(0.0, inplace=True)
            
    return timeseriesdf

In [9]:
df = wangle_data()

## Download current data

Click the button below to update the data for plotting.
If the there is an error in updating the data for plotting, you will be notified of this after clicking the button and the currently stored data for graph plotting will remain the same and will not be overwritten.

Note: After you refresh the data, graphs will not update until you, the user interact with a widget.

In [10]:
class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters. You do not need to edit this line,
        # parameters will be replaced by the actual values when you instantiate an object of the class!
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters

            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results']

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

In [11]:
structure={"theme": "infectious_disease", 
           "sub_theme": "respiratory",
           "topic": "COVID-19",
           "geography_type": "Nation", 
           "geography": "England"}

In [12]:
#function to clear files.
def clear_files(filenames):
        """Deletes the specified files if they exist or initializes them empty."""
        for filename in filenames:
            with open(filename, 'w') as file:
                json.dump([], file)

In [13]:
# our API access function. This will be called by the button when it is clicked
try:
    def access_api(button):
    # Ignore the button parameter
    #
    # put code for polling the API here
    
    # Defining all the file names that need clearing
        filenames = [
            "cases.json",
            "admissions.json",
            "deaths.json",
            "autumn22_doses.json",
            "spring23_doses.json",
            "autumn23_doses.json",
            "spring24_doses.json",
            "autumn24_doses.json",
        ]

    # Clearing files before processing
        clear_files(filenames)


    
        structure["metric"]="COVID-19_cases_casesByDay"
        api=APIwrapper(**structure)
        cases=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(cases)}")

        structure["metric"]="COVID-19_healthcare_admissionByDay"
        api=APIwrapper(**structure)
        admissions=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(admissions)}")

        structure["metric"]="COVID-19_deaths_ONSByDay"
        api=APIwrapper(**structure)
        deaths=api.get_all_pages()
        print(f"Data points expected: {api.count}")

        print(f"Data points retrieved: {len(deaths)}")

        structure["metric"]="COVID-19_vaccinations_autumn22_dosesByDay"
        api=APIwrapper(**structure)
        autumn22_doses=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(autumn22_doses)}")

        structure["metric"]="COVID-19_vaccinations_spring23_dosesByDay"
        api=APIwrapper(**structure)
        spring23_doses=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(spring23_doses)}")

        structure["metric"]="COVID-19_vaccinations_autumn23_dosesByDay"
        api=APIwrapper(**structure)
        autumn23_doses=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(autumn23_doses)}")

        structure["metric"]="COVID-19_vaccinations_spring24_dosesByDay"
        api=APIwrapper(**structure)
        spring24_doses=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(spring24_doses)}")

        structure["metric"]="COVID-19_vaccinations_autumn24_dosesByDay"
        api=APIwrapper(**structure)
        autumn24_doses=api.get_all_pages()
        print(f"Data points expected: {api.count}")
        print(f"Data points retrieved: {len(autumn24_doses)}")

    
    
        with open("cases.json", "wt") as OUTF:
            json.dump(cases, OUTF)
    
        with open("admissions.json", "wt") as OUTF:
            json.dump(admissions, OUTF)

        with open("deaths.json", "wt") as OUTF:
            json.dump(deaths, OUTF)

        with open("autumn22_doses.json", "wt") as OUTF:
            json.dump(autumn22_doses, OUTF)

        with open("spring23_doses.json", "wt") as OUTF:
            json.dump(spring23_doses, OUTF)

        with open("autumn23_doses.json", "wt") as OUTF:
            json.dump(autumn23_doses, OUTF)

        with open("spring24_doses.json", "wt") as OUTF:
            json.dump(spring24_doses, OUTF)

        with open("autumn24_doses.json", "wt") as OUTF:
            json.dump(autumn24_doses, OUTF)


        with open("cases.json", "rt") as INFILE:
            cases=json.load(INFILE)
        with open("admissions.json", "rt") as INFILE:
            admissions=json.load(INFILE)
        with open("deaths.json", "rt") as INFILE:
            deaths=json.load(INFILE)
        with open("autumn22_doses.json", "rt") as INFILE:
            autumn22_doses=json.load(INFILE)
        with open("spring23_doses.json", "rt") as INFILE:
            spring23_doses=json.load(INFILE)
        with open("autumn23_doses.json", "rt") as INFILE:
            autumn23_doses=json.load(INFILE)
        with open("spring24_doses.json", "rt") as INFILE:
            spring24_doses=json.load(INFILE)
        with open("autumn24_doses.json", "rt") as INFILE:
            autumn24_doses=json.load(INFILE)


        global df
        df=wangle_data()
    
    # after all is done, I can switch the icon on the button to a "check" sign
    # and optionally disable the button - it won't be needed again.
        apibutton.icon="check"
        apibutton.disabled=True

# see the doc for the parameters    
    apibutton=wdg.Button(
        description='Refresh data',
        disabled=False,

        button_style='',
        tooltip='Click to download current Public Health England data',
        icon='download'
    )

# registered the callback function with the button:
    apibutton.on_click(access_api)

# this is an iPython function that generalises print for Jupyter Notebooks; I used it to 
# display the widgets:
    display(apibutton)
except:
    print("There was an error when refershing your data, please be notified that the current data has not been overwritten")

Button(description='Refresh data', icon='download', style=ButtonStyle(), tooltip='Click to download current Pu…

## Graphs and Analysis

This graph with interactive controls shows metrics of daily Covid-19 cases, admissions, deaths, and booster vaccine dose administrations for respective booster vaccine waves.
The aggregated_..._doses show figures for total daily booster vaccine does for each COVID-19 booster vaccine wave according to its broadly defined time period.
This graph can be viewed with all data stacked on each other in one graph on one timeline and individually for each metric.
I recommend to view the data on a linear scale to compare the magnitudes of COVID-19 cases, deaths and hospital admissions with COVID-19 booster vaccine waves of administration.
I also recommend to view the data on a log scale to compare the effectiveness of COVID-19 waves against COVID-19 cases, deaths and hospital admissions.

In [14]:
series=wdg.SelectMultiple(
    options=['all', 'cases', 'admissions', 'deaths', 'aggregated_autumn22_doses', 'aggregated_spring23_doses', 'aggregated_autumn23_doses', 'aggregated_spring24_doses', 'aggregated_autumn24_doses'],
    value=['cases', 'admissions', 'deaths', 'aggregated_autumn22_doses', 'aggregated_spring23_doses', 'aggregated_autumn23_doses', 'aggregated_spring24_doses', 'aggregated_autumn24_doses'],
    rows=9,
    description='Stats:',
    disabled=False
)

scale=wdg.RadioButtons(
    options=['linear', 'log'],
    description='Scale:',
    disabled=False
)

# I think VBox looks nicer than Hbox
controls=wdg.VBox([series, scale])

def timeseries_graph(gcols, gscale):
    logscale = gscale == 'log'  # Determine if log scale is selected
    
    # Check if "all" is in the selected options
    if 'all' in gcols:
        # Plot all columns and reset figure to avoid overlaps
  
        plt.figure()
        df.plot(logy=logscale, figsize=(12,7))
        plt.title("Log Scale" if logscale else "Linear Scale")
        plt.xlabel("Date")
        plt.ylabel("Metric")
        plt.legend(loc='upper left')
        plt.show()
    else:
        ncols = len(gcols)
        if ncols > 0:
            # Plot selected columns
            plt.figure()
            df[list(gcols)].plot(logy=logscale, figsize=(12,7))
            plt.title("Log Scale" if logscale else "Linear Scale")
            plt.xlabel("Date")
            plt.ylabel("Metric")
            plt.legend(loc='upper left')
            plt.show()
        else:
            print("Click to select data for graph")
            print("(CTRL-Click to select more than one category)")

graph = wdg.interactive_output(timeseries_graph, {'gcols': series, 'gscale': scale})

# Display controls and graph
display(controls, graph)

VBox(children=(SelectMultiple(description='Stats:', index=(1, 2, 3, 4, 5, 6, 7, 8), options=('all', 'cases', '…

Output()

**Author and License** "Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Evgenii Korostelev. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."