# Common Core

Created by Michael George (AKA Logiqx)

Website: https://logiqx.github.io/covid-stats/

In [1]:
import os
import sys
from datetime import date, datetime, timedelta

import unittest

import urllib.request
import urllib.parse
import re

from bs4 import BeautifulSoup

import csv
import numpy as np

from PIL import Image

projdir = os.path.realpath(os.path.join(sys.path[0], ".."))
dataDir = os.path.join(projdir, "data")

## Common Definitions

e.g. Nation and region names

In [2]:
# There are 7 nations in the UK
nations = \
{
    "K02000001": "United Kingdom",
    "K03000001": "Great Britain",
    "K04000001": "England and Wales",
    "E92000001": "England",
    "W92000004": "Wales",
    "S92000003": "Scotland",
    "N92000002": "Northern Ireland"
}
nationNames = [*nations.values()]

UNITED_KINGDOM = nations["K02000001"]
GREAT_BRITAIN = nations["K03000001"]
ENGLAND_WALES = nations["K04000001"]
ENGLAND = nations["E92000001"]
WALES = nations["W92000004"]
SCOTLAND = nations["S92000003"]
NORTHERN_IRELAND = nations["N92000002"]

# Some data sources such as the ONS daily occurrences do not use the standard nation codes
nationMappings = \
{
    "W99999999": "W92000004"
}

In [3]:
# There are 9 regions in England
regions = \
{
    "E12000001": "North East",
    "E12000002": "North West",
    "E12000003": "Yorkshire and The Humber",
    "E12000004": "East Midlands",
    "E12000005": "West Midlands",
    "E12000006": "East of England",
    "E12000007": "London",
    "E12000008": "South East",
    "E12000009": "South West"
}
regionNames = [*regions.values()]

# There are some common aliases for regions in England
regionAliases = {
    "East of England": ["East"]    # Used by historical ONS deaths data
}

# There are 7 NHS regions in England
nhsRegionNames = \
[
    "North East and Yorkshire",
    "North West",
    "Midlands",
    "East of England",
    "London",
    "South East",
    "South West"
]

# Mapping of regions to NHS regions
nhsRegionMappings = \
{
    "North East": "North East and Yorkshire",
    "North West": "North West",
    "Yorkshire and The Humber": "North East and Yorkshire",
    "East Midlands": "Midlands",
    "West Midlands": "Midlands",
    "East of England": "East of England",
    "London": "London",
    "South East": "South East",
    "South West": "South West"
}

In [4]:
dateFieldNames = ["week_ended", "date"]

In [5]:
verbose = False

## Printable Class

Simple class that allows other classes to be printed.

In [6]:
class Printable:
    def __repr__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

## Common Functions

Useful functions such as modifying area names for use as filenames

In [7]:
def getSafeName(areaName):
    """Return area name suitable for use in filenames"""

    for word in 'of', 'and', 'the', 'The', '+':
        areaName = areaName.replace(' ' + word + ' ', ' ')

    areaName = areaName.lower().replace(' ', '_').replace(',', '')

    return areaName


def getPartName(fileName):
    """Get the part of the filename which is relative to the data directory"""
    
    partName = fileName.replace(dataDir, "")[1:]
    
    return partName

In [8]:
class TestGetSafeName(unittest.TestCase):
    '''Class to test getSafeName function'''   

    def test_of(self):
        '''Test use of the word "of"'''
        self.assertEqual(getSafeName('East of England'), 'east_england')

    def test_and(self):
        '''Test use of the words "and"'''
        self.assertEqual(getSafeName('England and Wales'), 'england_wales')
        self.assertEqual(getSafeName('North East and Yorkshire'), 'north_east_yorkshire')

    def test_and_the(self):
        '''Test use of the words "and" + "the"'''
        self.assertEqual(getSafeName('Yorkshire and the Humber'), 'yorkshire_humber')
        self.assertEqual(getSafeName('Yorkshire and The Humber'), 'yorkshire_humber')

    def test_lists(self):
        '''Test use of the seperator "," + word "and"'''
        self.assertEqual(getSafeName('Bournemouth, Christchurch and Poole'), 'bournemouth_christchurch_poole')

    def test_plus(self):
        '''Test use of the "+" symbol'''
        self.assertEqual(getSafeName('England + Wales'), 'england_wales')

In [9]:
def getOnsWeek(dt):
    """Simple function to get the ONS year, week and day"""

    # Shifting by 2 days allows us to use isocalendar() but for weeks ending on Friday
    shifted = dt + timedelta(days=2)

    return shifted.isocalendar()

In [10]:
class TestGetOnsWeek(unittest.TestCase):
    '''Class to test getOnsWeek function'''   

    def test_w1_d7(self):
        '''Test week 1 day 7'''
        self.assertEqual(getOnsWeek(date(2011, 1, 7)), (2011, 1, 7))
        self.assertEqual(getOnsWeek(date(2012, 1, 6)), (2012, 1, 7))
        self.assertEqual(getOnsWeek(date(2013, 1, 4)), (2013, 1, 7))
        self.assertEqual(getOnsWeek(date(2014, 1, 3)), (2014, 1, 7))
        self.assertEqual(getOnsWeek(date(2015, 1, 2)), (2015, 1, 7))
        self.assertEqual(getOnsWeek(date(2016, 1, 8)), (2016, 1, 7))
        self.assertEqual(getOnsWeek(date(2017, 1, 6)), (2017, 1, 7))
        self.assertEqual(getOnsWeek(date(2018, 1, 5)), (2018, 1, 7))
        self.assertEqual(getOnsWeek(date(2019, 1, 4)), (2019, 1, 7))
        self.assertEqual(getOnsWeek(date(2020, 1, 3)), (2020, 1, 7))
        self.assertEqual(getOnsWeek(date(2021, 1, 8)), (2021, 1, 7))

    def test_w1_d1(self):
        '''Test week 1 day 1'''
        self.assertEqual(getOnsWeek(date(2011, 1, 1)),   (2011, 1, 1))
        self.assertEqual(getOnsWeek(date(2011, 12, 31)), (2012, 1, 1))
        self.assertEqual(getOnsWeek(date(2012, 12, 29)), (2013, 1, 1))
        self.assertEqual(getOnsWeek(date(2013, 12, 28)), (2014, 1, 1))
        self.assertEqual(getOnsWeek(date(2014, 12, 27)), (2015, 1, 1))
        self.assertEqual(getOnsWeek(date(2016, 1, 2)),   (2016, 1, 1))
        self.assertEqual(getOnsWeek(date(2016, 12, 31)), (2017, 1, 1))
        self.assertEqual(getOnsWeek(date(2017, 12, 30)), (2018, 1, 1))
        self.assertEqual(getOnsWeek(date(2018, 12, 29)), (2019, 1, 1))
        self.assertEqual(getOnsWeek(date(2019, 12, 28)), (2020, 1, 1))
        self.assertEqual(getOnsWeek(date(2021, 1, 2)),   (2021, 1, 1))

    def test_w52_d7(self):
        '''Test week 1 day 1'''
        self.assertEqual(getOnsWeek(date(2010, 12, 31)), (2010, 52, 7))
        self.assertEqual(getOnsWeek(date(2011, 12, 30)), (2011, 52, 7))
        self.assertEqual(getOnsWeek(date(2012, 12, 28)), (2012, 52, 7))
        self.assertEqual(getOnsWeek(date(2013, 12, 27)), (2013, 52, 7))
        self.assertEqual(getOnsWeek(date(2014, 12, 26)), (2014, 52, 7))
        self.assertEqual(getOnsWeek(date(2016, 1, 1)),   (2015, 53, 7))
        self.assertEqual(getOnsWeek(date(2016, 12, 30)), (2016, 52, 7))
        self.assertEqual(getOnsWeek(date(2017, 12, 29)), (2017, 52, 7))
        self.assertEqual(getOnsWeek(date(2018, 12, 28)), (2018, 52, 7))
        self.assertEqual(getOnsWeek(date(2019, 12, 27)), (2019, 52, 7))
        self.assertEqual(getOnsWeek(date(2021, 1, 1)),   (2020, 53, 7))

## Download Functions

Download spreadsheets by parsing the HTML for suitable links

In [11]:
skipExisting = True
skipHistory = False

class WebDownload():

    def __init__(self, skipExisting=skipExisting, skipHistory=skipHistory, verbose=verbose):
        """Initialisise the area object"""

        self.skipExisting, self.skipHistory, self.verbose = skipExisting, skipHistory, verbose
        self.downloaded = {}
        
        
    def downloadFile(self, url, rawDir, subDir):
        """Download a binary file from the URL provided"""

        baseName = os.path.basename(url)
        dirName = os.path.join(rawDir, subDir)
        fileName = os.path.join(dirName, baseName)

        partName = getPartName(fileName)

        if (os.path.exists(fileName) or baseName in self.downloaded) and self.skipExisting:
            if self.verbose:
                print(f"Skipping download of {partName}...")
        else:
            print(f"Downloading {partName}...")

            # Ensure raw path exists
            if not os.path.exists(dirName):
                os.makedirs(dirName)

            req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla'})
            response = urllib.request.urlopen(req, timeout=60)

            with open(fileName, "wb") as outfile:
                chunk = response.read(4096)
                while chunk:
                    outfile.write(chunk)
                    chunk = response.read(4096)

            response.close()

        if baseName not in self.downloaded:
            self.downloaded[baseName] = partName


    def downloadFiles(self, rawDir, url, patterns, category=None):

        req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla'})
        response = urllib.request.urlopen(req, timeout=15)

        soup = BeautifulSoup(response, "lxml")
        anchors = soup.find_all("a")

        response.close()

        done = False

        for anchor in anchors:
            href = anchor.get("href")

            for pattern in patterns:
                if re.search(pattern[1], href):
                    url = urllib.parse.urljoin(url, href)

                    if category:
                        subDir = os.path.join(pattern[0], category)
                    else:
                        subDir = pattern[0]

                    self.downloadFile(url, rawDir, subDir)

                    if self.skipHistory:
                        done = True
                        
            if done:
                break
                        
        return list(self.downloaded.values())

## Load CSV into NumPy Array

SImple function to load data from CSV into an ndarray

In [12]:
def loadCsvIntoArray(fileName, verbose=verbose):
    '''Load CSV file into numpy array'''

    if verbose:
        partName = getPartName(fileName)

        print(f"Loading {partName}...")

    try:
        with open(fileName, 'r') as f:
            reader = csv.reader(f, delimiter = ',')

            dtype = []
            converters = {}
            colNames = next(reader)

            for i in range(len(colNames)):
                colName = colNames[i]
                if colName in dateFieldNames:
                    dtype.append((colName, "U10"))
                else:
                    dtype.append((colName, "u4"))
                    converters[i] = lambda s: int(s or 0)

            data = np.genfromtxt(f, dtype=dtype, converters=converters, delimiter=",")

    except:
        print(f"Failed to load {fileName}")
        raise
        
    return data

## NumPy Helper Functions

Useful functionality such as moving average or rolling sum

In [13]:
def rollingSum(data, window=7):
    """Calculate rolling sum using linear convolution"""
    
    # The mode "full" results in the more values than required, hence the len(data)
    # The result should also match the original data, hence the astype()
    result = np.convolve(data, np.ones(window), mode="full")[:len(data)].astype(data.dtype) 
    
    return result

In [14]:
class TestRollingSum(unittest.TestCase):
    '''Class to test rollingSum function'''   

    def testShortList(self):
        '''Test processing of a list shorter than the window size'''

        actual = rollingSum(np.arange(6), 7)
        expected = np.array([0, 1, 3, 6, 10, 15])

        self.assertEqual((actual == expected).all(), True)


    def testLongerList(self):
        '''Test processing of a list longer than the window size'''

        actual = rollingSum(np.arange(14), 7)
        expected = np.array([0, 1, 3, 6, 10, 15, 21, 28, 35, 42, 49, 56, 63, 70])

        self.assertEqual((actual == expected).all(), True)

In [15]:
def movingAverage(data, window=7):
    """Calculate moving average using linear convolution"""

    # Only use convolution if the input is at least as long as the window size
    if len(data) >= window:
        # The mode "valid" results in the less values than required, hence the np.zeros()
        result = np.concatenate((np.zeros(window // 2),
                                 np.convolve(data, np.ones(window) / window, mode="valid"),
                                 np.zeros(window // 2)))

    else:
        # Result is a simple ndarray of zeros
        result = np.zeros(len(data))

    return result

In [16]:
class TestMovingAverage(unittest.TestCase):
    '''Class to test rollingSum function'''   

    def testShortList(self):
        '''Test processing of a list shorter than the window size'''

        actual = movingAverage(np.arange(6), 7)
        expected = np.array(np.zeros(6))

        self.assertEqual((actual == expected).all(), True)


    def testLongerList(self):
        '''Test processing of a list longer than the window size'''

        actual = movingAverage(np.arange(14), 7)
        expected = np.array([0, 0, 0, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0])

        # Comparison of floating point values must be approximate
        self.assertEqual((abs(actual - expected) < 1e-10).all(), True)

## Image Galleries

Page + Gallery + Figure

In [17]:
class IndexPage(Printable):
    """Simple class to create HTML index page"""

    def __init__(self, fileName, details):

        self.fileName = fileName
        self.details = details

        self.galleries = {}
        
        # Determine relative path to "docs" folder
        docsPath = ""
        tmpPath = os.path.dirname(fileName)
        while os.path.basename(tmpPath) != "docs":
            if docsPath:
                docsPath += "/"
            docsPath += ".."
            tmpPath = os.path.dirname(tmpPath)           
        self.details["DOCS_PATH"] = docsPath

    
    def addGallery(self, galleryId, details):
        """Add a new gallery to the index page"""

        self.galleries[galleryId] = IndexGallery(details)
        
        return self.galleries[galleryId]


    def getHtml(self):
        """Get the final HTML using the templates"""

        template = os.path.join(projdir, "docs", "template", "main.template.html")
        with open(template, 'r') as f:
            html = f.read()

        for detail in self.details:
            html = html.replace("{" + f"{detail}" + "}", self.details[detail])

        galleriesHtml = ""
        for galleryId in self.galleries:
            galleriesHtml += self.galleries[galleryId].getHtml() + "\n"

        html = html.replace("{GALLERY_TEMPLATES}\n", galleriesHtml)

        return html


    def saveHtml(self):
        """Save the final HTML to disk"""

        html = self.getHtml()

        with open(self.fileName, 'w') as f:
            f.write(html)

        return html


class IndexGallery(Printable):
    """Simple class to create HTML gallery"""

    def __init__(self, details):

        self.details = details

        self.figures = {}

    
    def addFigure(self, figureId, relPath, fileName, details):
        """Add a new figure to the gallery"""

        self.figures[figureId] = IndexFigure(relPath, fileName, details)
        
        return self.figures[figureId]


    def getHtml(self):
        """Get the final HTML using the templates"""

        template = os.path.join(projdir, "docs", "template", "gallery.template.html")
        with open(template, 'r') as f:
            html = f.read()

        for detail in self.details:
            html = html.replace("{" + f"{detail}" + "}", self.details[detail])

        figuresHtml = ""
        for figureId in self.figures:
            figuresHtml += self.figures[figureId].getHtml() + "\n"

        html = html.replace("{FIGURE_TEMPLATES}\n", figuresHtml)

        return html


class IndexFigure(Printable):
    """Simple class to create HTML figure"""

    def __init__(self, relPath, fileName, details):

        self.relPath = relPath
        self.fileName = fileName
        self.details = details

        self.details["IMAGE_FILENAME"] = relPath + "/" + os.path.basename(fileName)


    def createThumb(self, suffix="-thumb"):
        """Create thumbnail for the figure"""

        root, ext = os.path.splitext(os.path.split(self.fileName)[1])
        thumbName = root + suffix + ext
        
        image = Image.open(self.fileName)
        width, height = image.size

        self.details["IMAGE_WIDTH"] = str(width)
        self.details["IMAGE_HEIGHT"] = str(height)

        thumb = image.resize((width // 4, height // 4), Image.ANTIALIAS)
        thumb.save(os.path.join(os.path.dirname(self.fileName), thumbName))
        
        self.details["THUMB_FILENAME"] = os.path.join(self.relPath, thumbName)


    def getHtml(self):
        """Get the final HTML using the templates"""

        template = os.path.join(projdir, "docs", "template", "figure.template.html")
        with open(template, 'r') as f:
            html = f.read()

        for detail in self.details:
            html = html.replace("{" + f"{detail}" + "}", self.details[detail])

        return html

## Run Unit Tests

In [18]:
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

............
----------------------------------------------------------------------
Ran 12 tests in 0.027s

OK
