[DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) (C) Fabrizio Smeraldi, 2020,2024 ([f.smeraldi@qmul.ac.uk](mailto:f.smeraldi@qmul.ac.uk) - [web](http://www.eecs.qmul.ac.uk/~fabri/)). This notebook is released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/).

# DIY Disease Tracking Dashboard

This is a template for your DIY Disease Tracking Dashboard, to which you can add the code you developed in the previous notebooks. The dashboard will be displayed using [voila](https://voila.readthedocs.io/en/stable/index.html), a Python dashboarding tool that converts notebooks to standalone dashboards. Contrary to the other libraries we have seen, the ```voila``` package must be installed using *pip* or *conda* but it does not need to be imported - it rather acts at the level of the notebook server. Package ```voila``` is already installed on the QMUL JupyterHub as well as in the Binder - to install it locally, follow the [instructions](https://voila.readthedocs.io/en/stable/install.html) online.

Broadly speaking, Voila acts by **running all the cells in your notebook** when the dashboard is first loaded; it then hides all code cells and displays all markdown cells and any outputs, including widgets. However, the code is still there in the background and handles any interaction with the widgets. To view this dashboard template rendered in Voila click [here](https://mybinder.org/v2/gh/fsmeraldi/diy-covid19dash/main?urlpath=%2Fvoila%2Frender%2FDashboard.ipynb).

In [1]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json

In [None]:
%matplotlib inline
plt.rcParams['figure.dpi'] = 100

In [3]:
class APIwrapper:
    _access_point = "https://api.ukhsa-dashboard.data.gov.uk"
    _last_access = 0.0
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        url_path = (f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/"
                    f"{geography_type}/geographies/{geography}/metrics/{metric}")
        self._start_url = APIwrapper._access_point + url_path
        self._filters = None
        self._page_size = -1
        self.count = None

    def get_page(self, filters={}, page_size=5):
        if page_size > 365:
            raise ValueError("Max supported page size is 365")

        if filters != self._filters or page_size != self._page_size:
            self._filters = filters
            self._page_size = page_size
            self._next_url = self._start_url

        if self._next_url is None:
            return []

        curr_time = time.time()
        deltat = curr_time - APIwrapper._last_access
        if deltat < 0.33:
            time.sleep(0.33 - deltat)
        APIwrapper._last_access = curr_time

        parameters = {k: v for k, v in filters.items() if v is not None}
        parameters['page_size'] = page_size

        response = requests.get(self._next_url, params=parameters).json()
        self._next_url = response['next']
        self.count = response['count']
        return response['results']

    def get_all_pages(self, filters={}, page_size=365):
        data = []
        while True:
            next_page = self.get_page(filters, page_size)
            if next_page == []:
                break
            data.extend(next_page)
        return data


## Load initial data from disk

You should include "canned" data in ```.json``` files along with your dashboard. When the dashboard starts, it should load that data and assign it as a dictionary to the ```jsondata``` variable (the code below will be hidden when the dashboard is rendered by Voila).

In [4]:
jsondata={}

In [5]:
with open("timeseries.json", "rt") as f:
    jsondata["timeseries"] = json.load(f)

## Wrangle the data

The dashboard should contain the logic to wrangle the raw data into a ```DataFrame``` (or more than one, as required) that will be used for plotting. The wrangling code should be put into a function and called on the data from the JSON file (we'll need to call it again on any data downloaded from the API).  In this template, we just pretend we are wrangling ```rawdata``` and instead generate a dataframe with some random data

In [6]:

def wrangle_data(rawdata):
    if "timeseries" in rawdata:
        ts_src = rawdata["timeseries"]
        if isinstance(ts_src, dict) and "data" in ts_src:
            datalist = ts_src["data"]
        elif isinstance(ts_src, list):
            datalist = ts_src
        else:
            raise KeyError(
                "jsondata['timeseries'] must be a list or a dict with key 'data', "
                f"found type {type(ts_src)}"
            )
    elif "data" in rawdata:
        datalist = rawdata["data"]
    else:
        raise KeyError("rawdata must contain either 'timeseries' or 'data'.")

    ts = pd.DataFrame(datalist)

    if "date" in ts.columns:
        ts["date"] = pd.to_datetime(ts["date"])
        ts = ts.sort_values("date").set_index("date")
    else:
        ts.index = range(len(ts))

    df_local = pd.DataFrame(index=ts.index, columns=['One', 'Two'])

    if "cases" in ts.columns:
        df_local["One"] = ts["cases"]
    elif "value" in ts.columns:
        df_local["One"] = ts["value"]
    else:
        df_local["One"] = 0.0

    if "deaths" in ts.columns:
        df_local["Two"] = ts["deaths"]
    elif "value" in ts.columns:
        df_local["Two"] = ts["value"]
    else:
        df_local["Two"] = df_local["One"]

    return df_local

df = wrangle_data(jsondata)

## Download current data

Give your users an option to refresh the dataset - a "refresh" button will do. The button callback should
* call the code that accesses the API and download some fresh raw data;
* wrangle that data into a dataframe and update the corresponding (global) variable for plotting (here, ```df```);
* optionally: force a redraw of the graph and give the user some fredback.

Once you get it to work, you may want to wrap your API call inside an exception handler, so that the user is informed, the "canned" data are not overwritten and nothing crashes if for any reason the server cannot be reached or data are not available.

After you refresh the data, graphs will not update until the user interacts with a widget. You can trick ```iPywidgets``` into redrawing the graph by simulating interaction, as in the ```refresh_graph``` function we define in the Graph and Analysis section below.

In this example, clicking on the button below just generates some more random data and refreshes the graph. The button should read *Fetch Data*. If you see anything else, take a deep breath :)

In [7]:
def access_api():
    return {}

In [8]:
def api_button_callback(button):
    apidata = access_api()
    global df
    df = wrangle_data(apidata)
    refresh_graph()
    apibutton.icon = "check"

apibutton = wdg.Button(
    description='Refresh data',
    disabled=False,
    button_style='info',
    tooltip="Keep calm and carry on",
    icon='download'
)

display(apibutton)

Button(button_style='info', description='Refresh data', icon='download', style=ButtonStyle(), tooltip='Keep ca…

## Graphs and Analysis

Include at least one graph with interactive controls, as well as some instructions for the user and/or comments on what the graph represents and how it should be explored (this example shows two random walks)

In [9]:
def plot_random_walk(walk):
    global df
    if df is None or df.empty:
        print("No data available to plot the time series.")
        return
    if walk not in df.columns:
        print(f"Column '{walk}' not found in dataframe.")
        return

    if walk == "One":
        title = "Figure 1: Time Series — Column One (JSON)"
    else:
        title = "Figure 1: Time Series — Column Two (API Refreshed)"

    df[walk].plot()
    plt.title(title)
    plt.xlabel("Date")
    plt.ylabel("Value")
    plt.show()

whichwalk = wdg.Dropdown(
    options=['One', 'Two'],
    value='One',
    description='Walk no: ',
    disabled=False,
)

def refresh_graph():
    current = whichwalk.value
    if current == whichwalk.options[0]:
        other = whichwalk.options[1]
    else:
        other = whichwalk.options[0]
    whichwalk.value = other
    whichwalk.value = current

graph = wdg.interactive_output(plot_random_walk, {'walk': whichwalk})
display(whichwalk, graph)

Dropdown(description='Walk no: ', options=('One', 'Two'), value='One')

Output()

## Deploying the dashboard

Once your code is ready and you are satisfied with the appearance of the graphs, replace all the text boxes above with the explanations you would like a dashboard user to see. The next step is deploying the dashboard online - there are several [options](https://voila.readthedocs.io/en/stable/deploy.html) for this, we suggest deploying as a [Binder](https://mybinder.org/). This is basically the same technique that has been used to package this tutorial and to deploy this template dashboard. The instructions may seem a bit involved, but the actual steps are surprisingly easy - we will be going through them together during a live session. You will need an account on [GitHub](https://github.com/) for this - if you don't have one already, now it's the time to create it. 

**Author and License** Remember that if you deploy your dashboard as a Binder it will be publicly accessible. Change the copyright notice and take credit for your work! Also acknowledge your sources and the conditions of the license by including this notice: "Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."

In [10]:


def plot_bar_chart(column):
    global df
    if df is None or df.empty:
        print("No data available for bar chart (empty dataframe).")
        return
    if column not in df.columns:
        print(f"Column '{column}' not found in dataframe.")
        return

    series = df[column].dropna()
    if series.empty:
        print(f"No valid data in column '{column}' to plot.")
        return

    tail_series = series.tail(30)
    if tail_series.empty:
        print(f"Not enough data in '{column}' to plot.")
        return

    if column == "One":
        title = "Figure 2: Last 30 Days — Column One (JSON)"
    else:
        title = "Figure 2: Last 30 Days — Column Two (API Refreshed)"

    ax = tail_series.plot(kind='bar')
    ax.set_title(title)
    ax.set_xlabel("Day Index (last 30)")
    ax.set_ylabel("Value")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

bar_metric_wdg = wdg.RadioButtons(
    options=['One', 'Two'],
    value='One',
    description='Metric:',
    disabled=False
)

bar_graph = wdg.interactive_output(
    plot_bar_chart,
    {'column': bar_metric_wdg}
)

display(bar_metric_wdg, bar_graph)



RadioButtons(description='Metric:', options=('One', 'Two'), value='One')

Output()

In [11]:
def access_api():
    structure = {
        "theme": "infectious_disease",
        "sub_theme": "respiratory",
        "topic": "COVID-19",
        "geography_type": "Nation",
        "geography": "England",
        "metric": "COVID-19_deaths_28DaysByDay"
    }
    
    try:
        api = APIwrapper(**structure)
        filters = {"year": None, "month": None, "epiweek": None, "sex": None}
        records = api.get_all_pages(filters=filters)

        if not records:
            print("API returned no records, keeping existing data from JSON.")
            return None
        
        ts = pd.DataFrame(records)
        if "date" not in ts.columns or "value" not in ts.columns:
            print("API data does not contain expected 'date' and 'value' fields.")
            return None
        
        ts["date"] = pd.to_datetime(ts["date"])
        ts = ts.sort_values("date").set_index("date")

        return ts[["value"]]
    
    except Exception as e:
        print("Error when accessing UKHSA API, keeping existing data from JSON.")
        print("Details:", e)
        return None

def api_button_callback(button):
    global df
    
    api_ts = access_api()
    if api_ts is None or api_ts.empty:
        apibutton.icon = "exclamation-triangle"
        return
    
    df = df.reindex(df.index.union(api_ts.index))
    df["Two"] = api_ts["value"]
    
    current_val = whichwalk.value
    whichwalk.value = whichwalk.options[1] if current_val == whichwalk.options[0] else whichwalk.options[0]
    whichwalk.value = current_val

    current_metric = bar_metric_wdg.value
    bar_metric_wdg.value = 'Two' if current_metric == 'One' else 'One'
    bar_metric_wdg.value = current_metric

    apibutton.icon = "check"

apibutton.on_click(api_button_callback)


Jiahe Ma

This dashboard is based on UK Government data published by the
UK Health Security Agency (UKHSA) through its official API:
https://ukhsa-dashboard.data.gov.uk/

I would also like to acknowledge the original DIY Disease Tracking Dashboard Kit
designed and released by Fabrizio Smeraldi (QMUL), which served as the foundation for this dashboard.

This work is released under the
GNU General Public License v3.0 or later (GPLv3+).

“Based on UK Government data published by the UK Health Security Agency and on the
DIY Disease Tracking Dashboard Kit by Fabrizio Smeraldi.
Released under the GNU GPLv3.0 or later.”