In [4]:
import pandas as pd
import re
import requests
import aiohttp
import urllib.parse

In [2]:
base_endpoint = 'https://opendata.nhsbsa.net/api/3/action'
package_list_endpoint = f"{base_endpoint}/package_list"    # List of data-sets in the portal
package_show_endpoint = f"{base_endpoint}/package_show"
sql_endpoint = f"{base_endpoint}/datastore_search_sql"

In [5]:
from dataclasses import dataclass
from typing import Any, TypeGuard, no_type_check

def is_json_dict(json_response: Any) -> TypeGuard[dict[str, Any]]:
    return isinstance(json_response, dict) and all(isinstance(key, str) for key in json_response)

def unnest_response(json_response: Any) -> dict[str, Any]:
    """the api has a variadic way of nesting responses"""
    if not is_json_dict(json_response):
        raise TypeError(f"expected: dict[str, Any], actual: {type(json_response)}")
    match json_response.get("success"):
        case None:
            return json_response
        case 'true' | True:
            return unnest_response(json_response["result"])
        case _ if "error" in json_response:
            raise ConnectionError(json_response["error"])
        case _:
            raise NotImplementedError(f"structure found in response has no implementation {json_response}")
        
def get_single_record(url, col_name = "num_records") -> int:
    records = unnest_response(requests.get(url).json())
    match records["records"]:
        case [single_val]:
            return int(single_val["num_records"])
        case [first, *rest]:
            raise ValueError(f"expected [single_value], actual: {[first, *rest]}")
        case wrong_type:
            raise TypeError(f"expected list, actual{type(wrong_type)}")

@dataclass(init=False, frozen=True, slots=True)
class MetaData:
    dataset_id = "english-prescribing-data-epd"
    url = f"{base_endpoint}/package_show?id={dataset_id}"
    metadata_dict = unnest_response(requests.get(url).json())
    table_names = [record["name"] for record in metadata_dict["resources"]]
    dates = [x[-6:] for x in table_names]
    max_date =  max(dates)
    min_date = min(dates)
meta = MetaData()

In [None]:
from typing import TypedDict, NotRequired

ENDPOINT = 'https://opendata.nhsbsa.net/api/3/action'
LIMIT: int = 32000
COUNT_COL_ALIAS = "num_records"

class DataStoreParams(TypedDict):
    resource_id: str
    sql: str
    limit: int
    start: NotRequired[int]

@dataclass(frozen=True, slots=True)
class DataStoreEndpoint:
    """Typical sructure required to create a url for a datastore query"""
    resource_id: str  # can add validation to ensure it's a valid resource
    raw_sql: str  # can add validation to ensure it's valid postgressql syntax
    start: int | None = None

    @property
    def query_string(self) -> str:
        return "?" + "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in self.query_dict.items())
    
    @property
    def query_dict(self) -> DataStoreParams:
        required = DataStoreParams(
            resource_id=self.resource_id,
            sql=self.raw_sql,
            limit=LIMIT,
        )
        if self.start:
            required["start"] = self.start
        return required

    @property
    def postable_url(self):
        return f"{ENDPOINT}/datastore_search_sql"
    
    @property
    def gettable_url(self):
        return f"{self.postable_url}/{self.query_string}"

@dataclass(frozen=True, slots=True)     
class DataStoreResource:
    """capable of creating DataStoreEndpoints for a given resource_id for various pre-defined sql queries
    primarily this helps when you want to adjust the group cols to use in the queries"""
    resource_id: str  # can add validation to ensure it's a valid resource
    group_cols_tuple: tuple[str, ...] = (
        "year_month", 
        "practice_code", 
        "postcode", 
        "bnf_chemical_substance"
    )  # can add validation to ensure it's a valid resource

    @property
    def group_cols(self) -> str:
        return ", ".join(self.group_cols_tuple)

    def get_endpoint(self, sql: str, start: int = 0):
        return DataStoreEndpoint(
            self.resource_id,
            sql,
            start
        )

    def group_endpoint(self, start: int = 0) -> DataStoreEndpoint:
        """groups can be arbritrarily large so a start can be provided as responses will be limited by the api"""
        return self.get_endpoint(
            f"SELECT {self.group_cols}, SUM(items) total_items "
            f"FROM `{self.resource_id}` "
            f"GROUP BY {self.group_cols} "
            f"ORDER BY {self.group_cols}",
            start
        )
    
    @property
    def record_count_endpoint(self) -> DataStoreEndpoint:
        return self.get_endpoint(
            f"SELECT COUNT(*) {COUNT_COL_ALIAS} "
            f"FROM `{self.resource_id}`"
        )
    
    @property
    def group_count_endpoint(self) -> DataStoreEndpoint:
        return self.get_endpoint(
            f"SELECT COUNT(*) {COUNT_COL_ALIAS} "
            f"FROM (SELECT DISTINCT {self.group_cols} FROM `{self.resource_id}`) "
        )
    
@dataclass
class EpdYear:
    year: int



In [6]:
import polars as pl
year = 2014
df = pl.scan_csv([x["url"] for x in meta.metadata_dict["resources"] if int(x["name"][-6:-2]) == year])

In [None]:
df

In [152]:
meta.metadata_dict

{'author': 'NHS Business Services Authority (NHSBSA)',
 'author_email': 'DataServicesSupport@nhsbsa.nhs.uk',
 'creator_user_id': 'ca4e917f-4aca-4a02-8eda-9f2bb0f1bef9',
 'id': '65050ec0-5abd-48ce-989d-defc08ed837e',
 'isopen': False,
 'license_id': 'OGL-UK-3.0',
 'license_title': 'OGL-UK-3.0',
 'maintainer': 'NHS Business Services Authority (NHSBSA)',
 'maintainer_email': 'DataServicesSupport@nhsbsa.nhs.uk',
 'metadata_created': '2020-03-31T22:07:42.207889',
 'metadata_modified': '2025-07-04T08:21:22.214737',
 'name': 'english-prescribing-data-epd',
 'num_resources': 136,
 'num_tags': 2,
 'organization': {'id': 'ab9f74b0-a7ac-41a8-a97b-e49f8cd02aff',
  'name': 'community_prescribing_dispensing',
  'title': 'Community Prescribing & Dispensing',
  'type': 'organization',
  'description': 'Datasets on the prescribing and dispensing of medicines in the community',
  'image_url': 'https://storage.googleapis.com/dx-nhs-production/_assets/group/2020-11-29-111330.020756-prescribing-Icon.jpg',


In [111]:
len(records["records"])

10063

In [None]:
from pydantic import AfterValidator, BeforeValidator


def validate_by_typed_root_model[T](value: Any):
    lambda x: RootModel[T].model_validate_json(x).root

StringType = TypeVar("StringType", bound=str)


class KeyValueModel(BaseModel, Generic[StringType, ModelType]):
    key: StringType = Field(frozen=True)
    value: Annotated[
        ModelType, 
        BeforeValidator(lambda x: RootModel[ModelType].model_validate_json(x).root)
    ]

    def __hash__(self):
        return hash(self.key)


class TableSchemaModelRecords(BaseModel):
    name: str
    title: str
    type: str
    description: str

class TableSchemaModel(BaseModel):
    fields: list[TableSchemaModelRecords]

class ResourceIdModel(BaseModel, extra="allow"):
    name: str
    id: str
    datastore_active: bool
    url: Url # can download the csv from here

TableSchemaExtra = KeyValueModel[Literal["tableschema"], TableSchemaModel]
GenericExtra = KeyValueModel[str, str]


class CkanMetaData(BaseModel, extra="allow"):
    name: str
    num_resources: int
    extras: list[
        TableSchemaExtra | GenericExtra
    ]  # simplest option is to use order of preference from left to right as a discriminator
    resources: list[ResourceIdModel]
            
    def tableschema_records(self) -> list[TableSchemaModelRecords] | None:
        for extra in self.extras:
            if isinstance(extra.value, TableSchemaModel):
                return extra.value.fields
            
    def table_names(self) -> dict[str, bool]:
        return {r.name: r.datastore_active for r in self.resources}

In [41]:
MetaDataResponse[CkanMetaData].model_validate(metadata_repsonse).result.table_names()

{'EPD_201401': False,
 'EPD_201402': False,
 'EPD_201403': False,
 'EPD_201404': False,
 'EPD_201405': False,
 'EPD_201406': False,
 'EPD_201407': False,
 'EPD_201408': False,
 'EPD_201409': False,
 'EPD_201410': False,
 'EPD_201411': False,
 'EPD_201412': False,
 'EPD_201501': False,
 'EPD_201502': False,
 'EPD_201503': False,
 'EPD_201504': False,
 'EPD_201505': False,
 'EPD_201506': False,
 'EPD_201507': False,
 'EPD_201508': False,
 'EPD_201509': False,
 'EPD_201510': False,
 'EPD_201511': False,
 'EPD_201512': False,
 'EPD_201601': False,
 'EPD_201602': False,
 'EPD_201603': False,
 'EPD_201604': False,
 'EPD_201605': False,
 'EPD_201606': False,
 'EPD_201607': False,
 'EPD_201608': False,
 'EPD_201609': False,
 'EPD_201610': False,
 'EPD_201611': False,
 'EPD_201612': False,
 'EPD_201701': False,
 'EPD_201702': False,
 'EPD_201703': False,
 'EPD_201704': False,
 'EPD_201705': False,
 'EPD_201706': False,
 'EPD_201707': False,
 'EPD_201708': False,
 'EPD_201709': False,
 'EPD_2017

In [38]:
MetaDataResponse[CkanMetaData].model_validate(metadata_repsonse).result.tableschema_records()

[TableSchemaModelRecords(name='YEAR_MONTH', title='Year and Month as YYYYMM', type='integer', description='Example: `201401`'),
 TableSchemaModelRecords(name='REGIONAL_OFFICE_NAME', title='Regional Office Name', type='string', description='The name given to a geographical region by NHS England. Each region supports local systems to provide more joined up and care for patients.'),
 TableSchemaModelRecords(name='REGIONAL_OFFICE_CODE', title='Regional Office Code', type='string', description='The unique code used to refer to a Regional Office.'),
 TableSchemaModelRecords(name='AREA_TEAM_NAME', title='Area Team Name', type='string', description='The name given to a geographical area by NHS England that is a smaller division of a Regional Office. (This column is only present in data from Jan 2014 to Mar 2020)'),
 TableSchemaModelRecords(name='AREA_TEAM_CODE', title='Area Team Code', type='string', description='The unique code used to refer to an Area Team. (This column is only present in da

In [None]:
class ResourceIdModel(BaseModel, extra="allow"):
    name: str
    id: str
    bytes: int
    url: Url # can download the csv from here

list(metadata_repsonse["result"]["resources"][0].items())[15:]

[('mimetype', None),
 ('mimetype_inner', None),
 ('name', 'EPD_201401'),
 ('package_id', '65050ec0-5abd-48ce-989d-defc08ed837e'),
 ('position', 0),
 ('resource_type', None),
 ('restricted', ''),
 ('schema',
  '{u\'fields\': [{u\'description\': u\'Example: `201401`\', u\'type\': u\'integer\', u\'name\': u\'YEAR_MONTH\', u\'title\': u\'Year and Month as YYYYMM\'}, {u\'description\': u\'The name given to a geographical region by NHS England. Each region supports local systems to provide more joined up and care for patients.\', u\'type\': u\'string\', u\'name\': u\'REGIONAL_OFFICE_NAME\', u\'title\': u\'Regional Office Name\'}, {u\'description\': u\'The unique code used to refer to a Regional Office.\', u\'type\': u\'string\', u\'name\': u\'REGIONAL_OFFICE_CODE\', u\'title\': u\'Regional Office Code\'}, {u\'description\': u\'The name given to a geographical area by NHS England that is a smaller division of a Region.  STP stands for Sustainability and Transformation Partnership\', u\'type\'

In [None]:
single_month_response_raw.url

'https://opendata.nhsbsa.net/api/3/action/datastore_search_sql?resource_id=EPD_202101&sql=SELECT%20year_month%2C%20practice_code%2C%20postcode%2C%20SUM%28items%29%20total_items%20FROM%20%60EPD_202101%60%20GROUP%20BY%20practice_code%2C%20postcode%2C%20year_month%20ORDER%20BY%20practice_code%2C%20postcode%2C%20year_month&limit=1000&start=0'

In [None]:
single_month_response_params.url

'https://opendata.nhsbsa.net/api/3/action/datastore_search_sql'

In [None]:
# class CkanErrorResponseBase(BaseModel):
#     help: str
#     success: bool
#     error: ...

# class CkanSuccessResponseBase(BaseModel):
#     help: str
#     success: bool
#     result: ...


single_month_response

{'help': 'https://opendata.nhsbsa.net/api/3/action/help_show?name=datastore_search_sql',
 'error': {'__type': 'Internal Server Error',
  'message': 'Internal Server Error'},
 'success': False}

In [None]:
from typing import Annotated

from sqlmodel import SQLModel, col, select, func, Field


class EpdPydanticBase(BaseModel):
    YEAR_MONTH: str = Field(
        description="Year and Month as YYYYMM"
    )
    REGIONAL_OFFICE_NAME: str = Field(
        description="The name given to a geographical region by NHS England. Each region supports local systems to provide more joined up and care for patients."
    )
    REGIONAL_OFFICE_CODE: str = Field(
        description="The unique code used to refer to a Regional Office."
    )
    STP_NAME: str = Field(
        description="The unique code used to refer to an STP. STP stands for Sustainability and Transformation Partnership.",
        alias="AREA_TEAM_NAME"
    )
    STP_CODE: str = Field(
        description="The unique code used to refer to an STP. STP stands for Sustainability and Transformation Partnership.",
        alias="AREA_TEAM_CODE"
    )
    PCO_NAME: str = Field(
        description="An NHS organisation that commissions or provides care services involving prescriptions that are dispensed in the community. "
        "\nFor example: a Clinical Commissioning Group (CCG), an NHS Trust."
    )
    PCO_CODE: str = Field(
        description="The unique code used to refer to a PCO. PCO stands for Primary Care Organisation"
    )
    PRACTICE_NAME: str = Field(
        description="The name of an organisation that employs one or more prescribers who issue prescriptions that may be dispensed in the community. For example:\na GP Practice, an Out-of-Hours service, a hospital department within an NHS Trust."
    )
    PRACTICE_CODE: str = Field(
        description="The unique code supplied for the practice"
    )
    ADDRESS_1: str = Field(
        description="The Address used by a Practice. This data is supplied by Primary Care Support England (PSCE)"
    )
    ADDRESS_2: str = Field(
        description="The Address used by a Practice. This data is supplied by Primary Care Support England (PSCE)"
    )
    ADDRESS_3: str = Field(
        description="The Address used by a Practice. This data is supplied by Primary Care Support England (PSCE)"
    )
    ADDRESS_4: str = Field(
        description="The Address used by a Practice. This data is supplied by Primary Care Support England (PSCE)"
    )
    POSTCODE: str = Field(
        description=""
    )
    BNF_CHEMICAL_SUBSTANCE: str = Field(
        description=(
            "A unique code 9 character code used to refer to a BNF Chemical Substance.\n\nFor example, `0501013B0`"
        )
    )
    CHEMICAL_SUBSTANCE_BNF_DESCR: str = Field(
        description="The name of the main active ingredient in a drug or the type of an appliance. \n\nDetermined by the British National Formulatory (BNF) for drugs, or the NHS BSA for appliances. For example, `Amoxicillin`"
    )
    BNF_CODE: str = Field(
        description="The unique code used to refer to a BNF Presentation. For example, `0501013B0AAABAB`"
    )
    BNF_DESCRIPTION: str = Field(
        description="The unique code used to refer to a BNF Presentation. For example, `0501013B0AAABAB`"
    )
    BNF_CHAPTER_PLUS_CODE: str = Field(
        description=("The name of the Chapter within the BNF, categorised by the system or area of the body on which it acts."
        " plus the two digits at the beginning of the BNF code which specify that BNF Chapter. ")
    )
    QUANTITY: float = Field(
        description=(
            "The quantity of a medicine, dressing or appliance for which an individual "
            "item was prescribed and dispensed, for each BNF Presentation. This represents a "
            "pseudo pack size, to illustrate the typical range of prescribed quantities of a given "
            "presentation")
    )
    ITEMS: int = Field(
        description=(
            "The total number of times that the medicine, dressing or appliance appeared on "
            "prescription forms that were prescribed or dispensed"
        )
    )
    TOTAL_QUANTITY: float = Field(
        description=(
            "the total quantity of a medicine, dressing or appliance prescribed "
            "and dispensed for each BNF Presentation. This is calculated by multiplying Quantity" 
            "by Items"
        )
    )
    ADQUSAGE: float = Field(
        description=(
            "Average Daily Quantity (ADQ) is the typical daily dose of a "
            "medication, prescribed to adult patients by GP Practices;")
    )
    NIC: float = Field(
        description=(
            " The ‘Net Ingredient Cost (NIC)’ of the items normally based on the price given in "
            "the Drug Tariff or published wholesale prices;")
    )
    ACTUAL_COST: float = Field(
        description=(
            " The ‘Actual Cost’ that accounts for the national average discount and some "
            "payments to dispensers;")
    )
    UNIDENTIFIED: bool = Field(
        description=(
            "This dataset includes some records where the prescription form cannot be linked to a "
            "practice or cost centre. This field is an indicator of prescriptions from unidentified practices"
            "These typically sccount for less than 0.1% of all prescribed items")
    )

    @classmethod
    def query(cls, year: int, month: int):
        tablename = f"`EPD_{year:0>4}{month:0>2}`" # table name must be quoted with backtiks
        group_cols = (
            "YEAR_MONTH",
            "PRACTICE_CODE",
            "POSTCODE",
        )

In [None]:
from sqlmodel import SQLModel, Field, func
from sqlalchemy.dialects import postgresql
"CHEMICAL_SUBSTANCE_BNF_DESCR, BNF_CHEMICAL_SUBSTANCE, POSTCODE, PRACTICE_CODE, SUM(ITEMS)"

class EpdSqlBase(EpdPydanticBase, SQLModel):
    __abstract__ = True
    GENPK: int | None = Field(primary_key=True, default=None)
    @classmethod
    def select_stmt(cls):
        stmt = (
            select(cls.CHEMICAL_SUBSTANCE_BNF_DESCR, cls.BNF_CHEMICAL_SUBSTANCE, cls.POSTCODE, cls.PRACTICE_CODE)
            .add_columns(func.sum(cls.ITEMS))
            .where(col(cls.UNIDENTIFIED)==False)
            .group_by(cls.CHEMICAL_SUBSTANCE_BNF_DESCR, cls.BNF_CHEMICAL_SUBSTANCE, cls.POSTCODE, cls.PRACTICE_CODE)
        )
        return stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
    
    @classmethod
    def CreateTable(cls, year: int, month: int) -> type["EpdSqlBase"]:
        tablename = f"EPD_{year:0>4}{month:0>2}"
        if tablename in EpdSqlBase.metadata.tables:
            EpdSqlBase.metadata.remove(EpdSqlBase.metadata.tables[tablename])
        return type(tablename, (cls, ), {'__tablename__': tablename}, table=True)


In [None]:
x = EpdSqlBase.CreateTable(2020,2)

  DeclarativeMeta.__init__(cls, classname, bases, dict_, **kw)


In [None]:
str(x.select_stmt()).replace('"EPD_202002"', '')

'SELECT ."CHEMICAL_SUBSTANCE_BNF_DESCR", ."BNF_CHEMICAL_SUBSTANCE", ."POSTCODE", ."PRACTICE_CODE", sum(."ITEMS") AS sum_1 \nFROM  \nWHERE ."UNIDENTIFIED" = false GROUP BY ."CHEMICAL_SUBSTANCE_BNF_DESCR", ."BNF_CHEMICAL_SUBSTANCE", ."POSTCODE", ."PRACTICE_CODE"'

In [None]:
AnonTable.__dict__

NameError: name 'AnonTable' is not defined

In [None]:
EpdSqlBase.CreateTable(2023, 1).select_stmt()

CompileError: Cannot compile Column object until its 'name' is assigned.

In [None]:
y=2020
m=1
f"EPD_{y:0>4}{m:0>2}"

'EPD_202001'

In [None]:

dataset_id = "english-prescribing-data-epd"
metadata_repsonse  = requests.get(f"{base_endpoint}" \
                                  f"{package_show_method}" \
                                  f"{dataset_id}").json()

In [None]:
metadata_repsonse["result"].keys()'resources'

dict_keys(['author', 'author_email', 'creator_user_id', 'id', 'isopen', 'license_id', 'license_title', 'maintainer', 'maintainer_email', 'metadata_created', 'metadata_modified', 'name', 'notes', 'num_resources', 'num_tags', 'organization', 'owner_org', 'private', 'state', 'title', 'type', 'upload_to_bigquery', 'url', 'version', 'extras', 'resources', 'tags', 'groups', 'relationships_as_subject', 'relationships_as_object'])

In [None]:
eval(metadata_repsonse["result"]['resources'][0]['schema'])["fields"]

[{'description': 'Example: `201401`',
  'type': 'integer',
  'name': 'YEAR_MONTH',
  'title': 'Year and Month as YYYYMM'},
 {'description': 'The name given to a geographical region by NHS England. Each region supports local systems to provide more joined up and care for patients.',
  'type': 'string',
  'name': 'REGIONAL_OFFICE_NAME',
  'title': 'Regional Office Name'},
 {'description': 'The unique code used to refer to a Regional Office.',
  'type': 'string',
  'name': 'REGIONAL_OFFICE_CODE',
  'title': 'Regional Office Code'},
 {'description': 'The name given to a geographical area by NHS England that is a smaller division of a Region.  STP stands for Sustainability and Transformation Partnership',
  'type': 'string',
  'name': 'STP_NAME',
  'title': 'Area Team Name'},
 {'description': 'The unique code used to refer to an STP. STP stands for Sustainability and Transformation Partnership.',
  'type': 'string',
  'name': 'STP_CODE',
  'title': 'Area Team Code'},
 {'description': 'An NH