# COVID-19 bed occupied VS positivity rate dashboard

This dashboard represents COVID-19 data for England from the UK Health Security Agency
- Hospital bed occupancy by COVID-19 patients (healthcare system pressure)
- 7-day rolling average of test positivity rate (infection spread)

https://ukhsa-dashboard.data.gov.uk/

In [1]:
import json
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import clear_output
import ipywidgets as wdg
from ipywidgets import VBox, HBox

## APIwrapper Class

This class handles communications with the UKHSA API, including rate limiting to prevent being banned.

In [2]:
import requests
import time
from datetime import datetime

class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

## Load data from JSON files

Loading the data from JSON files. It contain a snapshot of the data and helping the dashboard to function even when the API is not accessible.

In [3]:
with open("hospital_beds.json", "rt") as INFILE:
    beds_data = json.load(INFILE)
with open("testing_positivity.json", "rt") as INFILE:
    positivity_data = json.load(INFILE)

## Wrangling the data

These functions convert the raw JSON data into Pandas DataFrames for plotting:

1. **wrangle_data()** combines multiple datasets into a nested dictionary organized by date
2. **create_timeseries_dataframe()** creates a DataFrame with complete date range from start to end, filling in values using .loc[row, column] assignment
3. Individual metric DataFrames are extracted from the unified timeseries DataFrame
4. dtype=float is also added to force to convert any value assigned into float64 type, and matplotlib could gets a numeric data

In [4]:
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

def wrangle_data(datasets):
    #raw data from multiple datasets into a nested dictionary structure
    data = {}
    for dataset in datasets:
        for entry in dataset:
            date = entry['date']
            metric = entry['metric']
            value = entry['metric_value']
            if date not in data:
                data[date] = {}
            data[date][metric] = value
    return data

def create_timeseries_dataframe(data):
    dates = list(data.keys()) #getting all dates and sort them
    dates.sort()
    
    startdate = parse_date(dates[0])
    enddate = parse_date(dates[-1])
    
    index = pd.date_range(startdate, enddate, freq='D')
    timeseriesdf = pd.DataFrame(index=index, columns=['beds', 'positivity'], dtype=float) 
    
    metrics = {
        'beds': 'COVID-19_healthcare_occupiedBedsByDay',
        'positivity': 'COVID-19_testing_positivity7DayRolling'
    }
    
    for date, entry in data.items():  # each entry is a dictionary
        pd_date=parse_date(date) # convert to Pandas format
        for column in ['beds', 'positivity']: 
            metric_name = metrics[column]
            # do not assume all values are there for every date - if a value is not available, insert a 0.0
            value = entry.get(metric_name, 0.0)
            # this is the way you access a specific location in the dataframe - use .loc
            # and put index,column in a single set of [ ]
            timeseriesdf.loc[date, column] = value
    
    # fill in any remaining "holes" due to missing dates
    timeseriesdf.fillna(0.0, inplace=True)
    
    return timeseriesdf

In [5]:
data = wrangle_data([beds_data, positivity_data])
timeseriesdf = create_timeseries_dataframe(data)

## Graph 1: Hospital beds occupied by COVID-19 patients in England

This time series chart shows the number of hospital beds occupied by COVID-19 patients in England over time, representing the pressure of healthcare system. The data spans from August 2020 to July 2021, covering multiple waves of the pandemic. Users can toggle between linear and logarithmic scales to see the trends.

In [6]:
def plot_beds_timeseries(df, scale):
    df.plot(y='beds', figsize=(12, 6), color='steelblue', linewidth=2)
    plt.yscale(scale)
    plt.title('COVID-19 Hospital Beds Occupied - England (2020-2021)', fontsize=14, fontweight='bold')
    plt.xlabel('Date', fontsize=12)
    plt.ylabel('Number of Beds Occupied', fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

beds_scale = wdg.Dropdown(
    options=['linear', 'log'],
    value='linear',
    description='Scale:',
    disabled=False,
    style={'description_width': 'initial'}
)

beds_graph = wdg.interactive_output(plot_beds_timeseries, {'df': wdg.fixed(timeseriesdf[['beds']]), 'scale': beds_scale}) #- In here I used wdg.fixed, is setting the parameter interactive_output to fix, captures DF state at the widget creation time, so only scale can change with user interaction, and the DF stays the same.

display(beds_scale, beds_graph)

Dropdown(description='Scale:', options=('linear', 'log'), style=DescriptionStyle(description_width='initial'),…

Output()

## Graph 2: COVID-19 test positivity rate (7-day rolling average)

This area chart shows the percentage of COVID-19 tests that came back positive, using a 7-day rolling average to smooth out daily variations. Higher positivity rates indicate increased community transmission. The data covers February 2020 to February 2021. Use the slider to adjust how many days of data to display.

In [7]:
from ipywidgets import VBox, HBox

def plot_positivity_area(df, num_days):
    df_filtered = df[df['positivity'] > 0]
    plot_df = df_filtered.tail(num_days)
    
    plt.figure(figsize=(12, 6))
    plt.fill_between(plot_df.index, plot_df['positivity'], alpha=0.4, color='coral')
    plt.plot(plot_df.index, plot_df['positivity'], color='darkred', linewidth=2)
    plt.title('COVID-19 Test Positivity Rate (7-day Rolling Average) - England', fontsize=14, fontweight='bold')
    plt.xlabel('Date', fontsize=12)
    plt.ylabel('Positivity Rate (%)', fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

positivity_days = wdg.IntSlider(
    value=180,
    min=30,
    max=365,
    step=30,
    description='Days:',
    disabled=False,
    continuous_update=False
)

positivity_graph = wdg.interactive_output(plot_positivity_area, {'df': wdg.fixed(timeseriesdf[['positivity']]), 'num_days': positivity_days})
display(positivity_days, positivity_graph)

IntSlider(value=180, continuous_update=False, description='Days:', max=365, min=30, step=30)

Output()

## Refresh data button- access from API 

The button is for download the latest data from the API of the UKHSA website and automatically updates graphs.

In [8]:
def access_api(button):
    try:
        beds_structure = {
            "theme": "infectious_disease",
            "sub_theme": "respiratory",
            "topic": "COVID-19",
            "geography_type": "Nation",
            "geography": "England",
            "metric": "COVID-19_healthcare_occupiedBedsByDay"
        }
        apibutton.description = "Downloading..."
        apibutton.icon = "hourglass"
        apibutton.disabled = True
        
        beds_api = APIwrapper(**beds_structure)
        new_beds_data = beds_api.get_all_pages()

        positivity_structure = {
            "theme": "infectious_disease",
            "sub_theme": "respiratory",
            "topic": "COVID-19",
            "geography_type": "Nation",
            "geography": "England",
            "metric": "COVID-19_testing_positivity7DayRolling"
        }
        positivity_api = APIwrapper(**positivity_structure)
        new_positivity_data = positivity_api.get_all_pages()
        
        global beds_data, positivity_data, timeseriesdf
        beds_data = new_beds_data
        positivity_data = new_positivity_data
        
        data = wrangle_data([beds_data, positivity_data])
        timeseriesdf = create_timeseries_dataframe(data)
        
        # Force widgets to refresh by triggering value change because I set the wdg.fixed. This approach is to ask the graph to auto refresh after API refresh and updated DataFrame, without user having to interact with the graph.
        beds_scale.value = beds_scale.value
        positivity_days.value = positivity_days.value
        
        print("Data refreshed successfully")
        print(f"Hospital beds data points: {len(beds_data)}")
        print(f"Testing positivity data points: {len(positivity_data)}")
        print(f"Timeseries DataFrame shape: {timeseriesdf.shape}")
        
        apibutton.description = "Refresh Data"
        apibutton.icon = "download"
        apibutton.disabled = False
        
    except Exception as e:
        print(f"Error accessing API: {e}")
        apibutton.description = "Refresh Data"
        apibutton.icon = "exclamation-triangle"
        apibutton.disabled = False

apibutton = wdg.Button(
    description='Refresh Data',
    disabled=False,
    button_style='info',
    tooltip='Click to download current data from UKHSA API',
    icon='download'
)

apibutton.on_click(access_api)
display(apibutton)

Button(button_style='info', description='Refresh Data', icon='download', style=ButtonStyle(), tooltip='Click t…

## Combined graph
A combined graph shows both beds occupance and test positivity rate between Aug 2020 and Aug 2021, showing the spreading of COVID-19 have been giving pressure to the healthcare system in England.

In [9]:
from ipywidgets import VBox, HBox

controls = VBox([beds_scale, positivity_days, apibutton])
graphs = VBox([beds_graph, positivity_graph])
dashboard = HBox([controls, graphs]) # side-by-side layout with all controls and graphs
display(dashboard)

HBox(children=(VBox(children=(Dropdown(description='Scale:', options=('linear', 'log'), style=DescriptionStyle…

Data source: [UKHSA Dashboard](https://ukhsa-dashboard.data.gov.uk/)