In [1]:
import requests

# Same as the above but in Python, with a couple of extra parameters added to the URL
requests.get("https://api.ukhsa-dashboard.data.gov.uk/themes"
             "/infectious_disease/sub_themes/bloodstream_infection/topics"
             "/E-coli/geography_types/Nation/geographies/England"
             "/metrics/e-coli_cases_countsByOnsetType" ).json()

{'count': 90,
 'next': 'https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/bloodstream_infection/topics/E-coli/geography_types/Nation/geographies/England/metrics/e-coli_cases_countsByOnsetType?page=2',
 'previous': None,
 'results': [{'theme': 'infectious_disease',
   'sub_theme': 'bloodstream_infection',
   'topic': 'E-coli',
   'geography_type': 'Nation',
   'geography': 'England',
   'geography_code': 'E92000001',
   'metric': 'e-coli_cases_countsByOnsetType',
   'metric_group': 'cases',
   'stratum': 'Hospital-onset, healthcare associated',
   'sex': 'all',
   'age': 'all',
   'year': 2023,
   'month': 6,
   'epiweek': 26,
   'date': '2023-06-30',
   'metric_value': 664.0,
   'in_reporting_delay_period': False},
  {'theme': 'infectious_disease',
   'sub_theme': 'bloodstream_infection',
   'topic': 'E-coli',
   'geography_type': 'Nation',
   'geography': 'England',
   'geography_code': 'E92000001',
   'metric': 'e-coli_cases_countsByOnsetType',
   'metric_g

In [2]:
https://api.ukhsa-dashboard.data.gov.uk/themes/infectious_disease/sub_themes/bloodstream_infection/topics/E-coli/geography_types/Nation/geographies/England/metrics/e-coli_cases_countsByOnsetType

SyntaxError: invalid syntax (3193606719.py, line 1)

In [3]:
structure={"theme": "infectious_disease", 
           "sub_theme": "respiratory",
           "topic": "RSV",
           "geography_type": "Nation", 
           "geography": "England"}

structure["metric"]="RSV_healthcare_admissionRateByWeek" 

In [4]:
import time

class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters, increase when you start looping 
        over all pages. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results'] 

In [5]:
api=APIwrapper(**structure)
data=api.get_page() # default size is 5
print(api.count)
print(data)

2157
[{'theme': 'infectious_disease', 'sub_theme': 'respiratory', 'topic': 'RSV', 'geography_type': 'Nation', 'geography': 'England', 'geography_code': 'E92000001', 'metric': 'RSV_healthcare_admissionRateByWeek', 'metric_group': 'healthcare', 'stratum': 'default', 'sex': 'all', 'age': '00-04', 'year': 2020, 'month': 9, 'epiweek': 40, 'date': '2020-09-28', 'metric_value': 0.0, 'in_reporting_delay_period': False}, {'theme': 'infectious_disease', 'sub_theme': 'respiratory', 'topic': 'RSV', 'geography_type': 'Nation', 'geography': 'England', 'geography_code': 'E92000001', 'metric': 'RSV_healthcare_admissionRateByWeek', 'metric_group': 'healthcare', 'stratum': 'default', 'sex': 'all', 'age': '45-54', 'year': 2020, 'month': 9, 'epiweek': 40, 'date': '2020-09-28', 'metric_value': 0.0, 'in_reporting_delay_period': False}, {'theme': 'infectious_disease', 'sub_theme': 'respiratory', 'topic': 'RSV', 'geography_type': 'Nation', 'geography': 'England', 'geography_code': 'E92000001', 'metric': 'RSV_

In [6]:
structure["metric"]="RSV_healthcare_admissionRateByWeek"
api=APIwrapper(**structure)
Admissions=[]
page=1
while True:
    data=api.get_page(page_size=365)
    print(f"Pages retrieved: {page}")
    if data==[]:
        break
    Admissions.extend(data)
    page+=1
print(f"Data points expected: {api.count}")
print(f"Data points retrieved: {len(Admissions)}")

Pages retrieved: 1
Pages retrieved: 2
Pages retrieved: 3
Pages retrieved: 4
Pages retrieved: 5
Pages retrieved: 6
Pages retrieved: 7
Data points expected: 2157
Data points retrieved: 2157


In [7]:
structure["metric"]="RSV_testing_positivityByWeek"
# the structure has changed, so we need to create a new object
api=APIwrapper(**structure)
positives=[]
while True:
    data=api.get_page(page_size=365)
    if data==[]:
        break
    positives.extend(data)
    page+=1
print(f"Data points expected: {api.count}")
print(f"Data points retrieved: {len(Admissions)}")

Data points expected: 2811
Data points retrieved: 2157


In [8]:
import json
with open("Admissions.json", "wt") as OUTF:
    json.dump(Admissions, OUTF)
with open("positives.json", "wt") as OUTF:
    json.dump(positives, OUTF)

In [None]:
filters={"stratum" : None, # Smallest subgroup a metric can be broken down into e.g. ethnicity, testing pillar
         "age": 20_30, # Smallest subgroup a metric can be broken down into e.g. 15_44 for the age group of 15-44 years
         "sex": None, #  Patient gender e.g. 'm' for Male, 'f' for Female or 'all' for all genders
         "year": 2022, #  Epi year of the metrics value (important for annual metrics) e.g. 2020
         "month": None, # Epi month of the metric value (important for monthly metrics) e.g. 12
         "epiweek" :None, # Epi week of the metric value (important for weekly metrics) e.g. 30
         "date" : None, # The date which this metric value was recorded in the format YYYY-MM-DD e.g. 2020-07-20
         "in_reporting_delay_period": None # Boolean indicating whether the data point is considered to be subject to retrospective updates
        }

In [None]:
import json
with open("Admissions.json", "wt") as OUTF:
    json.dump(Admissions, OUTF)
with open("positives.json", "wt") as OUTF:
    json.dump(positives, OUTF)


In [None]:
# Let's filter for the year 2022.
# None values will be ignored by the APIwrapper

filters={"stratum" : None, # Smallest subgroup a metric can be broken down into e.g. ethnicity, testing pillar
         "age": None, # Smallest subgroup a metric can be broken down into e.g. 15_44 for the age group of 15-44 years
         "sex": None, #  Patient gender e.g. 'm' for Male, 'f' for Female or 'all' for all genders
         "year": None, #  Epi year of the metrics value (important for annual metrics) e.g. 2020
         "month": None, # Epi month of the metric value (important for monthly metrics) e.g. 12
         "epiweek" :None, # Epi week of the metric value (important for weekly metrics) e.g. 30
         "date" : None, # The date which this metric value was recorded in the format YYYY-MM-DD e.g. 2020-07-20
         "in_reporting_delay_period": None # Boolean indicating whether the data point is considered to be subject to retrospective updates
        }

In [None]:
data_2022=api.get_page(filters, page_size=3)
print(api.count)
print(data_2022)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import json

In [None]:
with open("Admissions.json", "rt") as INFILE:
    admissions=json.load(INFILE)
with open("positives.json", "rt") as INFILE:
    cases=json.load(INFILE)

In [9]:

for dataset in [Admissions, positives]:
    for entry in dataset:
        date=entry['date']
        metric=entry['metric']
        value=entry['metric_value']
        if date not in data:
            data[date]={}
        data[date][metric]=value

TypeError: list indices must be integers or slices, not str

In [None]:
data

In [None]:
dates=list(data.keys())
dates.sort()
dates

In [None]:
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

In [None]:
startdate=parse_date(dates[0])
enddate=parse_date(dates[-1])
print (startdate, ' to ', enddate)

In [None]:
index=pd.date_range(startdate, enddate, freq='D')
timeseriesdf=pd.DataFrame(index=index, columns=['Admissions', 'positives'])
timeseriesdf

In [None]:
metrics ={'Admissions': 'RSV_healthcare_admissionRateByWeek',
          'positives': 'RSV_testing_positivityByWeek'}
for date, entry in data.items(): # each entry is a dictionary with cases, admissions and deaths
    pd_date=parse_date(date) # convert to Pandas format
    for column in ['Admissions', 'positives']: 
        metric_name=metrics[column]
        # do not assume all values are there for every date - if a value is not available, insert a 0.0
        value= entry.get(metric_name, 0.0)
        # this is the way you access a specific location in the dataframe - use .loc
        # and put index,column in a single set of [ ]
        timeseriesdf.loc[date, column]=value
            
# fill in any remaining "holes" due to missing dates
timeseriesdf.fillna(0.0, inplace=True)
            
timeseriesdf

In [None]:
ax=timeseriesdf.plot() # easy peasy...
ax.set_title('Admissions, positives');

In [None]:
ax=timeseriesdf.plot(logy=True) # ...lemon squeezy
ax.set_title('Admissions, positives(log scale)');

In [None]:
timeseriesdf.to_pickle("timeseriesdf.pkl")

In [None]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

In [None]:
timeseriesdf=pd.read_pickle("timeseriesdf.pkl")

In [None]:
series=wdg.SelectMultiple(
    options=['Admissions', 'positives'],
    value=['Admissions', 'positives'],
    rows=3,
    description='Stats:',
    disabled=False
)

scale=wdg.RadioButtons(
    options=['linear', 'log'],
#   value='pineapple', # Defaults to 'pineapple'
#   layout={'width': 'max-content'}, # If the items' names are long
    description='Scale:',
    disabled=False
)

# try replacing HBox with a VBox
controls=wdg.HBox([series, scale])

def timeseries_graph(gcols, gscale):
    if gscale=='linear':
        logscale=False
    else:
        logscale=True
    ncols=len(gcols)
    if ncols>0:
        timeseriesdf[list(gcols)].plot(logy=logscale)
        plt.show() # important - graphs won't update if this is missing 
    else:
        print("Click to select data for graph")
        print("(CTRL-Click to select more than one category)")

# keep calling timeseries_graph(gcols=value_of_series, gscale=value_of_scale); 
# capture output in widget graph   
graph=wdg.interactive_output(timeseries_graph, {'gcols': series, 'gscale': scale})

display(controls, graph)

In [None]:
year=wdg.Select(
    # options available: unique years in the dataframe
    options=timeseriesdf.index.year.unique(), # options available
    value=timeseriesdf.index.year[-1], # initial value: most recent year
    rows=1, # rows of the selection box
    description='Year',
    disabled=False
)
def timeseries_graph1(graphyear):
    # our callback function.
    yeardf=timeseriesdf[timeseriesdf.index.year==graphyear]
    # average the rows by month
    monthly= yeardf.groupby(pd.Grouper(freq='1ME')).mean() 
    totals=monthly.sum(axis=1) # over the rows
    # make sure it's all normalised to 100
    monthly=monthly.div(totals, axis=0)*100
    # older dates on top of the graph
    monthly = monthly[::-1]
    ax=monthly.plot(kind='barh', stacked=True,cmap='tab20')
    ax.legend(loc='center left',bbox_to_anchor=(1.0, 0.5))
    ax.set_yticklabels(monthly.index.strftime('%Y-%m-%d'))
output=wdg.interactive_output(timeseries_graph1, {'graphyear': year})
display(year, output)