
# <center><b> PCR Test Positive Cases Trend in England</b></center>


### <center><b>Track Positive PCR Test Cases Over Time with Real-Time Data Updates</b></center> 





About This Dashboard:

You can **select a year** from the dropdown menu and click **"Update Graph"** to fetch the most recent data from the source.    
The updated graph will display trends for the selected year.  

The starting view shows data for 2024, highlighting how cases have fluctuated over the months. The graph helps you easily identify patterns and spikes in cases over time.  

The data source is provided at the bottom of the graph for your reference.

In [1]:
import requests
import time

class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access

    # this allows all metrics to be downloaded
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

In [2]:
# structure memo
structure={'theme': 'infectious_disease', 
           'sub_theme': 'respiratory',
           'topic': 'COVID-19',
           'geography_type': 'Nation', 
           'geography': 'England',}

structure['metric']='COVID-19_testing_PCRcountByDay'

In [3]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json

%matplotlib inline
plt.rcParams['figure.dpi'] = 100



jsondata={}

# Wrangle 
def wrangle_data(rawdata):    # Create a dataframe and fill in the values        
    if isinstance(rawdata, dict) and "results" in rawdata:
        rawdata = rawdata["results"]  # extract the 'results' list
        
    startdate = pd.to_datetime("2020-01-01")
    enddate = pd.to_datetime("2024-12-31") 
    index = pd.date_range(start=startdate, end=enddate, freq='D')
    df = pd.DataFrame(index=index, columns=['Cases'])    # creating a DataFrame 

    for entry in rawdata:
        if 'date' in entry and 'metric_value' in entry:
            date=pd.to_datetime(entry['date'])
            value=entry['metric_value']
            if date in df.index:
                df.loc[date, 'Cases']=value   # filling the values to the DataFrame
    df.infer_objects(copy=False)   # replace NaN value to 0.0
    return df


# Inital graph from .json file
with open('PCRtest.json', 'r') as infile:
    jsondata = json.load(infile)
initial_df = wrangle_data(jsondata)

# wrangle intially from .json file 
df=initial_df.copy()    


 # Fetch from API using APIWrapper
def access_api():  
    structure['metric']='COVID-19_testing_PCRcountByDay'
    api=APIwrapper(**structure)
    filters={}  # option to include filters 
    PCRtest = api.get_all_pages(filters=filters)
    with open('PCRtest.json','wt') as OUTF:
        json.dump(PCRtest, OUTF)
    print('Data fetched and saved successfully!')
    return PCRtest
        
def api_button_callback(button): # Fetch Button & linking the functions
    apidata=access_api()
    global df
    df=wrangle_data(apidata)
    refresh_graph()
    apibutton.icon="check" # icon changes after fetch is completed
    apibutton.disabled=False
   
apibutton=wdg.Button(
    description='Update Graph', 
    disabled=False,
    button_style='success', 
    tooltip="Keep calm and carry on",
    icon='Download'
)

apibutton.on_click(api_button_callback) # links the button to functions
display(apibutton)



# Year widget
unique_years=sorted(df.index.year.unique()) # extract unique year from the DataFrame

year_widget=wdg.Dropdown(
    options=unique_years,
    value=unique_years[-1], # initially showing the latest data
    description='Year',
    disabled=False
)



# Plotting 
def plot_random_walk(selected_year):
    filtered_df=df[df.index.year==selected_year]  # only include the rows with the year in df.index
    ax=filtered_df['Cases'].plot(title=f'Positive PCR cases in {selected_year}')  # plot in the 'Cases' column
    ax.set_xlabel("Date", fontsize=12)
    ax.set_ylabel("Cases", fontsize=12)
    plt.grid(visible=True)
    plt.tight_layout()
    plt.show() 



# Refresh the graph 
def refresh_graph():
    current=year_widget.value
    if current==year_widget.options[0]:  # first availabe year
        other=year_widget.options[1]     # second available year 
    else:
        other=year_widget.options[0]
    year_widget.value=other 
    year_widget.value=current
 


# connect the plotting function and the widget    
graph=wdg.interactive_output(plot_random_walk, {'selected_year': year_widget})

# display the widget and the graph 
display(year_widget, graph)

Button(button_style='success', description='Update Graph', icon='Download', style=ButtonStyle(), tooltip='Keep…

Dropdown(description='Year', index=4, options=(2020, 2021, 2022, 2023, 2024), value=2024)

Output()

**Data Source:** UK Health Security Agency (UKHSA) - [https://ukhsa-dashboard.data.gov.uk](https://ukhsa-dashboard.data.gov.uk)