# Extract, Transform, Load (ETL) Data


This notebook provides the initial raw data processing for storage on GitHub. The focus is on extracting, combining, and loading the data and useful metadata.

**Note:** The processed data is stored and available on GitHub as part of this repo, so this notebook is not necessary to run unless you want to update the data or add new datasets.

Key steps are:
-   Setting up paths (where to put the data)
-   Downloading datasets from various sources
-   Saving useful metadata
-   Saving larger processed data as gzipped (geo)parquet files
-   Saving smaller metadata / lookups / variable descriptions as plain CSV files


## Datasets

**MedSat**
```mermaid
graph LR;
    classDef dataset fill:#ffccff,stroke:#333,stroke-width:2px;
    classDef metadata fill:#c1e1c1,stroke:#333,stroke-width:2px;
    classDef variables fill:#add8e6,stroke:#333,stroke-width:2px;

    A[MedSat]:::dataset --> B[medsat.parquet.gzip]:::dataset
    A --> C[medsat_variables.xlsx]:::metadata
    A --> D[sociodemographic.csv]:::metadata
    A --> E[environmental.csv]:::metadata
    A --> F[outcomes.csv]:::metadata
    A --> G[auxiliary.csv]:::metadata
    A --> H[variables.csv]:::variables
```

**Spatial Signatures**
```mermaid
graph TD;
    classDef dataset fill:#ffccff,stroke:#333,stroke-width:2px;
    classDef metadata fill:#c1e1c1,stroke:#333,stroke-width:2px;
    classDef variables fill:#add8e6,stroke:#333,stroke-width:2px;

    A[Spatial Signatures]:::dataset --> B[Partitioned Parquet files]:::dataset
    B --> B2[spatial_signatures_XX.parquet.gzip]:::dataset
    A --> C[metadata.csv]:::metadata
    A --> D[variables.csv]:::variables
```

**LSOA Lookups**
```mermaid
graph TD;
    classDef dataset fill:#ffccff,stroke:#333,stroke-width:2px;
    classDef metadata fill:#c1e1c1,stroke:#333,stroke-width:2px;

    M[LSOA Lookups]:::dataset --> N[lsoa_to_bua_lad_region_lookup.csv]:::metadata
```

## Setup

In [None]:
import os
import re
import shutil
from ftplib import FTP
from pathlib import Path
from getpass import getpass

import geopandas as gpd
import pandas as pd
from utils import find_best_match, clean_columns, cdrc_get_metadata, cdrc_login


CSV_KWARGS = dict(index=False, encoding="utf-8", quoting=2)

# Set up paths
TMP_PATH = Path("../data/_tmp")
MEDSAT_PATH = Path("../data/medsat")
SPATIAL_SIGNATURES_PATH = Path("../data/spatial_signatures")

for folder in (TMP_PATH, MEDSAT_PATH, SPATIAL_SIGNATURES_PATH):
    folder.mkdir(exist_ok=True, parents=True)

## MedSat

### Extract

In [None]:
# https://doi.org/10.14459/2023mp1714817
host = "dataserv.ub.tum.de"
user = "m1714817"
password = user

download_list = [
    "point_data/2019_spatial_raw_master.geojson",
    "point_data/2020_spatial_raw_master.geojson",
    "point_data/MedSat Variables.xlsx",
    "auxiliary data/lsoas_regions_mapping.csv",
]

In [None]:
with FTP(host, user, password) as ftp:
    for ftp_file in download_list:
        out_path = TMP_PATH / ftp_file.split("/")[-1]

        if out_path.exists():
            print(f"File already exists: {out_path}")
            continue

        print(f"Downloading {ftp_file} to {out_path}")
        with open(out_path, "wb") as f:
            ftp.retrbinary(f"RETR {ftp_file}", f.write)

### Transform

In [None]:
lsoa_to_region = pd.read_csv("../data/_tmp/lsoas_regions_mapping.csv")[["LSOA21CD", "RGN22CD", "RGN22NM"]]
medsat_2019 = gpd.read_file("../data/_tmp/2019_spatial_raw_master.geojson").assign(year=2019)
medsat_2020 = gpd.read_file("../data/_tmp/2020_spatial_raw_master.geojson").assign(year=2020)

medsat_variables = pd.ExcelFile(MEDSAT_PATH / "medsat_variables.xlsx")

#### MedSat Data

In [None]:
assert (
    len(set(medsat_2019.columns) ^ set(medsat_2020.columns)) == 0
), "Columns do not match between Medsat Dataset Years"

In [None]:
medsat = (
    pd.concat([medsat_2019, medsat_2020], ignore_index=True)
    .rename(columns={"geography code": "LSOA21CD"})
    .merge(lsoa_to_region, on="LSOA21CD", how="left")
    .sort_values(by=["RGN22CD", "LSOA21CD", "year"], ignore_index=True)
    # 54 ring self intersections:
    .assign(geometry=lambda df: df['geometry'].make_valid())
)

# certain census (c_) columns have multiple spaces:
medsat.columns = [re.sub(r'\s+', ' ', c) for c in medsat]

#### Medsat Variables / Metadata

In [None]:
sheets = medsat_variables.parse("INFO", skiprows=[0, 1], usecols=[1, 2], names=["sheet_name", "sheet_description"])
sheets

Sociodemographic Variables

In [None]:
sociodemographic = medsat_variables.parse("sociodemographic", skiprows=[0, 1]).iloc[:, 1:]
sociodemographic.columns = [c.lower().replace(" ", "_") for c in sociodemographic.columns]

sociodemographic = sociodemographic.rename(columns={"source.1": "source_category"}).assign(
    more_info=lambda df: df["more_info"].ffill(),
    year=lambda df: df["year"].ffill().astype(int),
    type=lambda df: df["type"].str.strip().str.replace('  ', ' ')
)

In [None]:
# variable names do not match medsat columns for around ~30 out of 111 sociodemographic columns
# fuzzy matching used and manually checked as per:
# sociodemographic[['name', 'mesdat_name']].to_clipboard()
c_columns = set(medsat.filter(regex="^c_").columns)
sociodemographic["medsat_name"] = sociodemographic["name"].apply(find_best_match, args=(c_columns,))

Environmental Variables

In [None]:
environmental = medsat_variables.parse("environmental", skiprows=[0]).iloc[:, 3:]
environmental.columns = [c.lower().replace(" ", "_").replace("(", "").replace(")", "") for c in environmental.columns]

environmental["link_to_data_source"] = environmental["link_to_data_source"].ffill()
environmental["medsat_name"] = "e_" + environmental["name"]

Outcome Variables

In [None]:
outcomes = medsat_variables.parse("outcomes", skiprows=[0, 1]).iloc[:, 4:]
outcomes.columns = [c.lower().replace(" ", "_") for c in outcomes.columns]

outcomes["medsat_name"] = "o_" + outcomes["name"].str.replace("quantitiy", "quantity")

Auxiliary Variables

In [None]:
auxiliary = medsat_variables.parse("auxiliary").iloc[:, 1:]
auxiliary.columns = [c.lower().replace(" ", "_") for c in auxiliary.columns]
auxiliary["medsat_name"] = auxiliary["name"]

auxiliary.loc[0] = {
    "name": "LSOA21CD",
    "meaning": "LSOA code",
    "medsat_name": "LSOA21CD",
}
new_fields = pd.DataFrame(
    [
        {"name": "RGN22CD", "meaning": "Region code", "medsat_name": "RGN22CD"},
        {"name": "RGN22NM", "meaning": "Region name", "medsat_name": "RGN22NM"},
        {"name": "year", "meaning": "year of medsat data", "medsat_name": "year"},
    ]
)
auxiliary = pd.concat([auxiliary, new_fields], ignore_index=True)

Single Variable Lookup Table

In [None]:
variables = [
    sociodemographic[["medsat_name", "type"]].assign(category="sociodemographic"),
    environmental[["medsat_name", "type"]].assign(category="environmental"),
    outcomes[["medsat_name", "type"]].assign(category="outcomes"),
    auxiliary[["medsat_name"]].assign(category="auxiliary"),
]

variables = pd.concat(variables, ignore_index=True)

The following columns lack metadata entries but are not of interest for this piece:

- `o_OME_per_capita`
- `e_skin_reservoir_content`


In [None]:
set(medsat.columns) ^ set(variables["medsat_name"])

### Load

In [None]:
medsat.to_parquet(MEDSAT_PATH / "medsat.parquet.gzip", index=False, compression="gzip")

In [None]:
shutil.copy("../data/_tmp/MedSat Variables.xlsx", MEDSAT_PATH / "medsat_variables.xlsx")

In [None]:
sociodemographic.to_csv(MEDSAT_PATH / "sociodemographic.csv", **CSV_KWARGS)
environmental.to_csv(MEDSAT_PATH / "environmental.csv", **CSV_KWARGS)
outcomes.to_csv(MEDSAT_PATH / "outcomes.csv", **CSV_KWARGS)
auxiliary.to_csv(MEDSAT_PATH / "auxiliary.csv", **CSV_KWARGS)
variables.to_csv(MEDSAT_PATH / "variables.csv", **CSV_KWARGS)

## Spatial Signatures

### Extract

In [None]:
# make an account at https://data.cdrc.ac.uk
username = "cdrowley"
password = getpass("CDRC Password: ")

cdrc_session = cdrc_login(username, password)
if cdrc_session is None:
    raise Exception("CDRC login failed")

In [None]:
download_list = [
    "https://data.cdrc.ac.uk/system/files/key.csv",
    "https://data.cdrc.ac.uk/system/files/pen_portraits.json",
    "https://data.cdrc.ac.uk/system/files/per_geometry.csv",
    "https://data.cdrc.ac.uk/system/files/spatial_signatures_GB_clean.gpkg_.zip",
]

for url in download_list:
    out_path = TMP_PATH / url.split("/")[-1]

    if out_path.exists():
        print(f"File already exists: {out_path}")
        continue

    print(f"Downloading {url} to {out_path}")
    with cdrc_session.get(url, stream=True) as r:
        with open(out_path, "wb") as f:
            f.write(r.content)

### Transform

#### Spatial Signatures Data

In [None]:
ss_zip = "../data/_tmp/spatial_signatures_GB_clean.gpkg_.zip"
shutil.unpack_archive(ss_zip, extract_dir=TMP_PATH)

In [None]:
ss_per_geometry = pd.read_csv("../data/_tmp/per_geometry.csv")
ss_pen_portraits = (
    pd.read_json("../data/_tmp/pen_portraits.json", orient="index")
    .rename(columns={0: "description"})
    .reset_index(names=["type"])
)

ss = (
    gpd.read_file("../data/_tmp/spatial_signatures_GB_clean.gpkg")
    .merge(ss_pen_portraits, on="type", how="left")
    .merge(ss_per_geometry, on="id", how="left")
)
ss.head(2)

#### Spatial Signatures Variables / Metadata

In [None]:
variables = pd.read_csv("../data/_tmp/key.csv").rename(columns={"Unnamed: 0": "name", "0": "description"})
variables.head(3)

In [None]:
metadata = cdrc_get_metadata("https://data.cdrc.ac.uk/dataset/spatial-signatures-great-britain", cdrc_session, 1)
metadata

### Load

In [None]:
# to get around 100mb limit on GitHub files
chunk_size = 6_000
n_chunks = len(ss) // chunk_size + 1
SS_PART_DIR = SPATIAL_SIGNATURES_PATH / "partitioned"
SS_PART_DIR.mkdir(exist_ok=True, parents=True)


for i in range(n_chunks):
    chunk = ss.iloc[i * chunk_size : (i + 1) * chunk_size]
    chunk_num = i + 1
    file_path = SS_PART_DIR / f"spatial_signatures_{chunk_num:02d}.parquet.gzip"

    chunk.to_parquet(file_path, index=False, compression="gzip")
    print(f"Written chunk {chunk_num:02d} of {n_chunks}, {len(chunk)} rows, {file_path.stat().st_size / 1e6:.2f} MB")

In [None]:
metadata.to_csv(SPATIAL_SIGNATURES_PATH / "metadata.csv", **CSV_KWARGS)
variables.to_csv(SPATIAL_SIGNATURES_PATH / "variables.csv", **CSV_KWARGS)

## LSOA Lookups

The region to LSOA lookup table sourced from MedSat does not contain the regions for all LSOAs, the most recent version (updated 03 May 2024) from the Office for National Statistics (ONS) is used as the preferred source. This file also includes lookups for Built-up Areas (BUAs) and local authority districts (LADs).

[Link to Source](https://www.data.gov.uk/dataset/c43641d8-710c-48e6-9139-1302953cf16c/lsoa-2021-to-bua-to-lad-to-region-december-2022-best-fit-lookup-in-ew-v2), note this is different to the direct link to the file downloaded below (which is more liable to change over time).

In [None]:
ONS_PATH = Path("../data/ons")
ONS_PATH.mkdir(exist_ok=True, parents=True)

url = 'https://open-geography-portalx-ons.hub.arcgis.com/api/download/v1/items/0352e811ec2c4fc5917f39aea2d1b8a3/csv?layers=0'

cols_to_keep = ['LSOA21CD', 'LSOA21NM', 'BUA22CD', 'BUA22NM', 'LAD22CD', 'LAD22NM', 'RGN22CD', 'RGN22NM', 'ObjectId']
lsoa_lookup = pd.read_csv(url)[cols_to_keep]

lsoa_lookup.to_csv(ONS_PATH / "lsoa_to_bua_lad_region_lookup.csv", **CSV_KWARGS)