[DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) (C) Fabrizio Smeraldi, 2020,2024 ([f.smeraldi@qmul.ac.uk](mailto:f.smeraldi@qmul.ac.uk) - [web](http://www.eecs.qmul.ac.uk/~fabri/)). This notebook is released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/).

# DIY Disease Tracking Dashboard

In [40]:
from IPython.display import display, clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json

In [41]:
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

## API

In [42]:
def access_api(theme, sub_theme, topic, geography_type, geography, metric, **optional_params):
    page_size = 99999  # Define the default page size

    base_url = "https://api.ukhsa-dashboard.data.gov.uk/themes"
    
    # Construct the full URL
    url = f"{base_url}/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/{geography_type}/geographies/{geography}/metrics/{metric}"
    
    # Include page_size in optional_params (ensuring it's a part of the query)
    optional_params['page_size'] = page_size
    
    # Send GET request with optional parameters
    response = requests.get(url, params=optional_params)
    
    # Parse response to JSON
    try:
        response_data = response.json()
        results = response_data.get('results', [])  # Safely get 'results' key, default to empty list if not found
        data = pd.DataFrame(results)  # Directly create DataFrame from 'results'
    except Exception as e:
        print(f"Error fetching or converting data: {e}")
        data = pd.DataFrame()  # Return an empty DataFrame in case of error
    return data

In [43]:
# https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/respiratory/topics/COVID-19/geography_types/Nation/geographies/England/metrics/COVID-19_cases_casesByDay

In [44]:
def fetch_options(url,param):
    urls = url + param
    try:
        response = requests.get(urls)
        response.raise_for_status()
        data = response.json()
        return [item['name'] for item in data]  # Extract names 
    except Exception as e:
        print(f"Error fetching options: {e}")
        return ["Error"]


In [45]:
def clean_dataframe(df):
    # Drop columns that are all the same
    df = df.loc[:, df.nunique() != 1]
    
    # Convert date to datetime
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
    
    print(df.dtypes)
    return df

In [46]:
url = "https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/"

In [47]:
def url_to_dict(url):
    # Remove the protocol and domain, extract only the path
    path = url.split("//")[-1].split("/", 1)[-1]
    
    # Split the path into components
    components = [component for component in path.split("/") if component]
    
    # Form a dictionary by pairing consecutive components
    return {components[i]: components[i + 1] for i in range(0, len(components) - 1, 2)}


In [48]:
df = pd.DataFrame()

In [None]:
# Create a button for submitting all values
submit_button = wdg.Button(
    description="Submit",
    disabled=False,
    button_style='primary',
    tooltip="Click to submit the data",
    icon='check'
)

# Define the button click handler
def on_button_click(b):
    print("Button clicked!")
    # Collect values from all the input fields, but only include non-empty fields
    global input_fields
    global url_dict
    form_data = {param_name: input_fields[param_name].value for param_name in input_fields if input_fields[param_name].value != ''}
    print(f"Form data submitted (non-empty values only): {form_data}")
    try:
        response = access_api(
            theme= url_dict['themes'],
            sub_theme= url_dict['sub_themes'],
            topic= url_dict['topics'],
            geography_type= url_dict['geography_types'],
            geography= url_dict['geographies'],
            metric= url_dict['metrics'],
            **form_data  # Passing the collected form data
        )
        global df
        df = pd.DataFrame(response)
        # print(df)
        # Clean the dataframe
        df = clean_dataframe(df)
        # display(starOver_button)
    except Exception as e:
        print("Error fetching data:", e)

submit_button.on_click(on_button_click)


starOver_button = wdg.Button(
    description="Start Over",
    disabled=False,
    button_style='info',
    tooltip="Click to refresh the data",
    icon='refresh'
)
# if the button is clicked, fetch data
def on_refresh_button_click(b):
    fetch_data_dynamic_levels(url)
    # display(submit_button)
    print("Data refreshed")
    url_dict.clear()
    print(url_dict)
starOver_button.on_click(on_refresh_button_click)
display(starOver_button)

Fetched data: [{'name': 'bloodstream_infection', 'link': 'https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/bloodstream_infection'}, {'name': 'gastrointestinal', 'link': 'https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/gastrointestinal'}, {'name': 'respiratory', 'link': 'https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/respiratory'}, {'name': 'vaccine_preventable', 'link': 'https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/vaccine_preventable'}]
Current URL: https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/
name key found


Dropdown(description='Option:', options=('bloodstream_infection', 'gastrointestinal', 'respiratory', 'vaccine_…



Empty url
Data refreshed
{}


In [50]:
final_url = None  # Global variable to store the final URL
url_dict = {}
input_fields = {}

def fetch_data_dynamic_levels(url, cp=True):
    global final_url
    clear_output()
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()

        print("Fetched data:", data)

        if not data:
            print("No data received.")
            final_url = url  # Store the URL if no data is fetched

        print(f"Current URL: {url}")

        if 'results' in data:
            print("results key found")
            final_url = url  # Store the URL and stop recursion
               # Parameters to create input fields for
            params = {
                'stratum': None,
                'sex': None,
                'age': None,
                'year': None,
                'month': None,
                'epiweek': None,
                'date': None,
                'metric_value': None,
                'in_reporting_delay_period': None
            }

            # Create an empty dictionary to hold the input fields
            global input_fields 

            df = pd.DataFrame()

            # Create input fields for each parameter
            for param_name in params.keys():
                # Create a text input for each parameter
                input_fields[param_name] = wdg.Text(
                    value='',
                    description=f'{param_name.capitalize()}:',
                    disabled=False
                )
                display(input_fields[param_name])
        
            display(starOver_button, submit_button)

    
        elif 'name' in data[0]:
            print("name key found")
            param = [item['name'] for item in data]

            param_dropdown = wdg.Dropdown(options=param, description="Option:")
            display(param_dropdown)

            button = wdg.Button(
                description='Fetch Data',
                disabled=False,
                button_style='warning',
                tooltip="Click to fetch data",
                icon='exclamation-triangle'
            )
            display(button)

            def on_button_click(b):
                selected_value = param_dropdown.value
                print(f"Selected: {selected_value}")

                if cp:
                    updated_url = f"{url}{selected_value}"
                else:
                    updated_url = f"{url}/{selected_value}"
                print(f"Updated URL: {updated_url}")
                # update the tracker
                
                fetch_data_dynamic_levels(updated_url, False)

            button.on_click(on_button_click)

        elif 'name' not in data[0]:
            print("name key not found, updating URL with key")
            if not isinstance(data[0], dict):
                print("Expected a dictionary structure, but found:", type(data[0]))
                final_url = url  # Store the URL if the data structure is not as expected
                return
            key = list(data[0].keys())
            print(f"Possible keys: {key}")
            updated_url = url + '/' + key[0]
            print(f"Updated URL: {updated_url}")
            fetch_data_dynamic_levels(updated_url, False)  # Continue recursion with updated URL

        else:
            print(f"No valid data returned for URL: {url}. Ending recursion.")
            final_url = url  # Store the URL if no valid data is found

    except Exception as e:
        print(f"Error fetching data: {e}")
        final_url = url  # Store the URL even in case of an error
    global url_dict
    
    try:
        url_dict = url_to_dict(final_url)
        print("URL Dictionary:", url_dict)
    except Exception as e:
        print("Empty url")

## Graphs and Analysis

Include at least one graph with interactive controls, as well as some instructions for the user and/or comments on what the graph represents and how it should be explored (this example shows two random walks)

In [51]:
defalt_response = access_api(
    theme="infectious_disease",
    sub_theme="respiratory",
    topic="COVID-19",
    geography_type="Nation",
    geography="England",
    metric="COVID-19_cases_casesByDay",
    year=2022
)
# convert json to dataframe
df = pd.DataFrame(defalt_response)
df = clean_dataframe(df)

month                    int64
epiweek                  int64
date            datetime64[ns]
metric_value           float64
dtype: object


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['date'] = pd.to_datetime(df['date'])


In [52]:
def plot_random_walk(walk, gscale, slider):

    global df
    """Plot the selected data series from the DataFrame using 'date' as x-axis with zoom functionality."""
    
    if df.empty:
        print("DataFrame is empty. Please load data before plotting.")
        return
    
    # Check if the selected walk exists in the DataFrame
    if walk in df.columns:
        # Clear the previous plot
        plt.clf()
        
        # Plot the selected column using 'date' as the x-axis
        ax = df.plot(x='date', y=walk, kind='line', logy=(gscale == 'log'))
        
        # Zoom effect: adjust the x-axis and y-axis limits based on slider value
        zoom_factor = slider  # The slider value affects the zoom level
        
        # Calculate the zoom ranges (adjust these values based on your data's range)
        x_min = df['date'].min() + (df['date'].max() - df['date'].min()) * (1 - zoom_factor)
        x_max = df['date'].min() + (df['date'].max() - df['date'].min()) * zoom_factor
        y_min = df[walk].min() + (df[walk].max() - df[walk].min()) * (1 - zoom_factor)
        y_max = df[walk].min() + (df[walk].max() - df[walk].min()) * zoom_factor
        
        # Set new x and y limits
        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)

        # Set plot title and labels
        plt.title(f"Plot of {walk}")
        plt.xlabel("date")  # X-axis is 'date'
        plt.ylabel(walk)    # Y-axis is the selected column (e.g., 'metric_value')
        plt.grid(True)      # Optional: to show grid lines for better readability
        plt.show()
    else:
        print(f"Column '{walk}' not found in the DataFrame.")


whichwalk = wdg.Dropdown(
    options=[col for col in df.columns if col != 'date'],  # Exclude 'date' from the dropdown options
    value=df.columns[0],  # Default value to the first column
    description='Walk no: ',
    disabled=False,
)

# Radio button widget to select the scale
scale = wdg.RadioButtons(
    options=['linear', 'log'],
    value='linear',  # Default scale
    description='Scale:',
    disabled=False
)

# Use a FloatSlider for zoom functionality, ranging from 0.1 to 1.0
slider = wdg.FloatSlider(
    value=1.0,   # Default zoom value (no zoom)
    min=0.1,     # Minimum zoom value (high zoom out)
    max=1.0,     # Maximum zoom value (no zoom)
    step=0.01,   # Step size for finer zoom control
    description='Zoom',
)

control = wdg.GridBox([whichwalk, scale, slider])

# refresh df button
refresh_df_button = wdg.Button(
    description="Refresh Data",
    disabled=False,
    button_style='info',
    tooltip="Click to refresh the data,then choose a new walk",
    icon='refresh'
)

def on_refresh_df_button_click(b):
    global df
    df = clean_dataframe(df)
    print("Data refreshed")
    print(df)
refresh_df_button.on_click(on_refresh_df_button_click)


# Connect the widgets to the plot function using interactive_output
graph = wdg.interactive_output(plot_random_walk, {'walk': whichwalk, 'gscale': scale, 'slider': slider})

# Display the widget and graph
display(control, refresh_df_button,graph)

GridBox(children=(Dropdown(description='Walk no: ', options=('month', 'epiweek', 'metric_value'), value='month…

Button(button_style='info', description='Refresh Data', icon='refresh', style=ButtonStyle(), tooltip='Click to…

Output()

## Deploying the dashboard

Once your code is ready and you are satisfied with the appearance of the graphs, replace all the text boxes above with the explanations you would like a dashboard user to see. The next step is deploying the dashboard online - there are several [options](https://voila.readthedocs.io/en/stable/deploy.html) for this, we suggest deploying as a [Binder](https://mybinder.org/). This is basically the same technique that has been used to package this tutorial and to deploy this template dashboard. The instructions may seem a bit involved, but the actual steps are surprisingly easy - we will be going through them together during a live session. You will need an account on [GitHub](https://github.com/) for this - if you don't have one already, now it's the time to create it. 

**Author and License** Remember that if you deploy your dashboard as a Binder it will be publicly accessible. Change the copyright notice and take credit for your work! Also acknowledge your sources and the conditions of the license by including this notice: "Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."