# DfE Public API Test

https://dfe-analytical-services.github.io/explore-education-statistics-api-docs/getting-started/creating-advanced-data-set-queries

This code is was written in Microsoft Fabric using a spark cluster. Within fabric there is a default spark instance called spark.

 If running outside of Fabric you may have to create a spark instance and pass it into the class.

To do this on a local machine you would need to first install pyspark using:

``` 
pip install pyspark
```
Then:

```
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("Local Spark App Without Cluster")
    .master("local[*]")  # Use all available CPU cores for parallelism
    .getOrCreate())  
``` 

You'd also have to change display to df.show() and write delta tables to path rather than saving to a table name.

## Processing Functions


In [1]:
import requests
import json
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ArrayType, MapType
from pyspark.sql.functions import explode, col, map_keys, map_values
from urllib.parse import urlencode, urlparse, parse_qs, urlunparse

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 3, Finished, Available, Finished)

In [2]:
class DfE_API:
    def __init__(self, data_set_id, spark_session = spark, api_version="1.0"):
        self.spark = spark_session
        self._data_set_id = data_set_id
        self._api_version = api_version
        self.base_endpoint_url = f"https://test.statistics.api.education.gov.uk/api/v{api_version}/data-sets/{data_set_id}" 
        
        # Get summary and meta data for data set.
        self.data_set_summary_json = self._get_data_set_summary_json()
        self.data_set_meta_json = self._get_data_set_meta_json()

        # Populate dataframes from meta data.
        self.indicators_df = self._process_indicators_from_meta()
        self.geographic_levels_df = self._process_geographic_levels_from_meta()
        self.locations_df = self._process_locations_from_meta()
        self.time_periods_df = self._process_time_periods_from_meta()
        self.filters_df = self._process_filters_from_meta()

        # Place holder instant variables.
        self.query_response_json = None
        self.raw_query_response_df = None
        self.query_response_df = None

    # Core API calls:

    def _fetch_all_results(self, base_url, query_body):
        combined_results = []
        current_page = 1
        query_json = query_body
        
        while True:
            # Add or update the page parameter in the URL
            if current_page > 1:
                query_json['page'] = current_page
            
            response = requests.post(base_url, json=query_json)
            response.raise_for_status()  # Check for HTTP request errors
            
            data = response.json()

            max_pages = data['paging']['totalPages']

            print(f"Retrived page {current_page} of {max_pages}.")
            combined_results.extend(data['results'])
            
            # Check if we reached the last page
            if current_page >= max_pages:
                break
            
            current_page += 1  # Move to the next page
        
        return combined_results

    def _get_data_set_summary_json(self):
        response = requests.get(self.base_endpoint_url)
        return response.json()

    def _get_data_set_meta_json(self):
        url = f"{self.base_endpoint_url}/meta"
        response = requests.get(url)
        return response.json() 

    def post_data_set_query(self, query_body, data_set_version = None):
        # Uses POST to send query in body. At least one indicator must be specificed
        url = f"{self.base_endpoint_url}/query"
        if data_set_version is not None:
            url = f"{url}?dataSetVersion={data_set_version}"

        results = self._fetch_all_results(url, query_body)
        self.query_response_json = results
        self._process_query_response()
        return "Query Suceeded"

    def _process_indicators_from_meta(self):
        # Process Indicators from Meta Data JSON Response
        indicators_schema = StructType([
            StructField("id", StringType(), True),
            StructField("column", StringType(), True),
            StructField("label", StringType(), True),
            StructField("decimalPlaces", IntegerType(), True)
        ])

        indicators_df = self.spark.createDataFrame(self.data_set_meta_json["indicators"], schema=indicators_schema)
        return indicators_df

    def _process_geographic_levels_from_meta(self):
        # Process Geographic Levels from Meta Data JSON Response
        geographic_levels_schema = StructType([
            StructField("code", StringType(), True),
            StructField("label", StringType(), True)
        ])

        geographic_levels_df = self.spark.createDataFrame(self.data_set_meta_json["geographicLevels"], schema=geographic_levels_schema)
        return geographic_levels_df       

    def _process_locations_from_meta(self):
        # Process Locations from Meta Data JSON Response
        locations_schema = StructType([
            StructField("level", StructType([
                StructField("code", StringType(), True),
                StructField("label", StringType(), True)
            ]), True),
            StructField("options", ArrayType(
                StructType([
                    StructField("id", StringType(), True),
                    StructField("label", StringType(), True),
                    StructField("code", StringType(), True)
                ])
            ), True)
        ])

        locations_df = self.spark.createDataFrame(self.data_set_meta_json["locations"], schema=locations_schema)

        # Flatten the options array in locations
        flattened_locations_df = (locations_df
            .withColumn("option", explode("options"))
            .withColumn("option_id", col("option.id"))
            .withColumn("option_label", col("option.label"))
            .withColumn("option_code", col("option.code"))
            .withColumn("level_code", col("level.code"))
            .withColumn("level_label", col("level.label"))
            .drop("options", "option", "level")
            )
        
        return flattened_locations_df   

   
    def _process_time_periods_from_meta(self):
        # Process Time Periods from Meta Data JSON Response
        time_periods_schema = StructType([
            StructField("code", StringType(), True),
            StructField("period", StringType(), True),
            StructField("label", StringType(), True)
        ])

        time_periods_df = self.spark.createDataFrame(self.data_set_meta_json["timePeriods"], schema=time_periods_schema)
        return time_periods_df                                                                                      

    def _process_filters_from_meta(self):
        # Process Filters from Meta Data JSON Response
        filters_schema = StructType([
            StructField("id", StringType(), True),
            StructField("column", StringType(), True),
            StructField("label", StringType(), True),
            StructField("options", ArrayType(
                StructType([
                    StructField("id", StringType(), True),
                    StructField("label", StringType(), True),
                    StructField("isAggregate", BooleanType(), True)
                ])
            ), True)
        ])
        filters_df = self.spark.createDataFrame(self.data_set_meta_json["filters"], schema=filters_schema)

        # Flatten the options array
        flattened_filters_df = (filters_df
                                    .withColumn("option", explode("options"))
                                    .withColumn("option_id", col("option.id"))
                                    .withColumn("option_label", col("option.label"))
                                    .withColumn("is_aggregate", col("option.isAggregate"))
                                    .drop("options", "option")
            )

        return flattened_filters_df

    def _process_query_response(self):
        # Process response from query
        
        # Define schema for results
        results_schema = StructType([
            StructField("timePeriod", StructType([
                StructField("code", StringType(), True),
                StructField("period", StringType(), True)
            ])),
            StructField("geographicLevel", StringType(), True),
            StructField("locations", MapType(StringType(), StringType()), True),
            StructField("filters", MapType(StringType(), StringType()), True),
            StructField("values", MapType(StringType(), StringType()), True)
        ])

        # Create DataFrame for results
        #.get("results", [])
        results_df = spark.createDataFrame(self.query_response_json, schema=results_schema)

        # Expand and alias columns
        # This assumes there is only one filter pair and location pair per row.
        # Explodes the values out for a row per indicator.
        self.raw_query_response_df = results_df.select(
                col("timePeriod.code").alias("period_code"),
                col("timePeriod.period").alias("period"),
                col("geographicLevel").alias("geographic_level"),
                map_keys(col("locations")).getItem(0).alias("location_level_code"),
                map_values(col("locations")).getItem(0).alias("location_option_id"),
                map_keys(col("filters")).getItem(0).alias("filter_id"),
                map_values(col("filters")).getItem(0).alias("filter_option_id"),
                explode("values").alias("indicator_id", "value")
            )
        
        # Join onto meta data dataframes to produce final result.
        self.query_response_df = (
            self.raw_query_response_df
                .join(
                    self.indicators_df,
                    (self.raw_query_response_df["indicator_id"] == self.indicators_df["id"]),
                    how="left"
                )
                .join(
                    self.locations_df,
                    (self.raw_query_response_df["location_level_code"] == self.locations_df["level_code"]) &
                    (self.raw_query_response_df["location_option_id"] == self.locations_df["option_id"]),
                    how="left"
                )
                .join(
                    self.filters_df,
                    (self.raw_query_response_df["filter_id"] == self.filters_df["id"]) &
                    (self.raw_query_response_df["filter_option_id"] == self.filters_df["option_id"]),
                    how="left"
                )
                .join(
                    self.time_periods_df,
                    (self.raw_query_response_df["period_code"] == self.time_periods_df["code"]) &
                    (self.raw_query_response_df["period"] == self.time_periods_df["period"]),
                    how="left"
                )
                .join(
                    self.geographic_levels_df,
                    (self.raw_query_response_df["geographic_level"] == self.geographic_levels_df["code"]),
                    how="left"
                )
                .select(
                    self.indicators_df['label'].alias("indictor_label"),
                    self.locations_df["level_label"].alias("location_level_label"),
                    self.locations_df["option_label"].alias("location_option_label"),
                    self.time_periods_df["label"].alias("period_label"),
                    self.geographic_levels_df["label"].alias("geographic_level_label"),
                    self.filters_df["label"].alias("filter_label"),
                    self.filters_df["option_label"].alias("filter_option_label"),
                    self.filters_df["is_aggregate"],
                    self.raw_query_response_df['value']
                )
            )

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 4, Finished, Available, Finished)

## Test data set A: Pupil absence

In [3]:
data_set_id = "e1ae9201-2fff-d376-8fa3-bd3c3660d4c8"

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 5, Finished, Available, Finished)

In [4]:
SetA = DfE_API(data_set_id, spark_session = spark)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 6, Finished, Available, Finished)

### Dataset Summary

In [5]:
print(json.dumps(SetA.data_set_summary_json, indent=4))

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 7, Finished, Available, Finished)

{
    "id": "e1ae9201-2fff-d376-8fa3-bd3c3660d4c8",
    "title": "Pupil absence",
    "summary": "Test file",
    "status": "Published",
    "latestVersion": {
        "version": "1.0",
        "published": "2024-10-21T12:15:26.171566+00:00",
        "totalResults": 12,
        "file": {
            "id": "5296244d-5487-4416-bec0-43ed4b50b72a"
        },
        "timePeriods": {
            "start": "2021/22",
            "end": "2023/24"
        },
        "geographicLevels": [
            "National"
        ],
        "filters": [
            "Type of school"
        ],
        "indicators": [
            "Number of overall absence sessions",
            "Number of pupil enrolments",
            "Number of sessions possible"
        ]
    }
}


### Dataset Meta Data

In [6]:
display(SetA.filters_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e29d4f88-05a5-4a72-8ba8-5f6833a16a28)

In [7]:
display(SetA.indicators_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f5829cc6-423d-4a49-ad1f-e466bab8371d)

In [8]:
display(SetA.locations_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f04b5d18-8f52-4084-9ed1-20e31d6b4a0a)

In [9]:
display(SetA.geographic_levels_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4a45dfc9-1c66-442a-9720-1793879300d4)

In [10]:
display(SetA.time_periods_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 128436d0-3176-4a84-be96-29eb12c24760)

### Example Dataset POST Query

In [11]:
data = {
    "criteria": {
        "and": [
            {
                "timePeriods": {
                    "in": [
                        {"period": "2023/2024","code": "AY"},
                        {"period": "2023/2024","code": "AY"}
                    ]
                }
            },
            {
                "locations": {
                    "eq": {"level": "NAT","code": "E92000001"}
                }
            }
        ]
    },
    "indicators": ["dPe0Z", "OBXCL", "7YFXo"]
}

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 13, Finished, Available, Finished)

In [12]:
SetA.post_data_set_query(data)
#print(json.dumps(response_json, indent=4))

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 14, Finished, Available, Finished)

Retrived page 1 of 1.


'Query Suceeded'

In [13]:
display(SetA.query_response_df)

StatementMeta(, 9269343a-a3a3-45cf-ba3d-7121fe405269, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0dd2de97-877b-4ecc-a1e2-12ec2c740aff)

In [22]:
SetA.query_response_df.write.format("delta").mode("overwrite").saveAsTable("LH_DFE.raw.set_a")

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 24, Finished, Available, Finished)

## Test data set B: Pupil Attendance Sessions

In [14]:
version = "1.0"
data_set_id = "57b69201-033a-2c77-a19f-abcce2b11341"

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 16, Finished, Available, Finished)

In [15]:
SetB = DfE_API(data_set_id, version)

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 17, Finished, Available, Finished)

### Data Set Summary

In [16]:
print(json.dumps(SetB.data_set_summary_json, indent=4))

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 18, Finished, Available, Finished)

{
    "id": "57b69201-033a-2c77-a19f-abcce2b11341",
    "title": "Pupil attendance sessions",
    "summary": "",
    "status": "Published",
    "latestVersion": {
        "version": "1.0",
        "published": "2024-10-22T22:34:44.356583+00:00",
        "totalResults": 3315543,
        "file": {
            "id": "cc6745d0-d4bb-4871-bf26-a9ed653a66f1"
        },
        "timePeriods": {
            "start": "2023 Week 37",
            "end": "2024 Week 25"
        },
        "geographicLevels": [
            "Local authority",
            "National",
            "Regional"
        ],
        "filters": [
            "Attendance status",
            "Attendance type",
            "Day number",
            "Establishment phase",
            "Reason"
        ],
        "indicators": [
            "Session count",
            "Session percent",
            "Session scaled"
        ]
    }
}


### Dataset Meta Data

In [17]:
print(json.dumps(SetB.data_set_meta_json, indent=4))

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 19, Finished, Available, Finished)

{
    "filters": [
        {
            "id": "5Zdi9",
            "column": "attendance_status",
            "label": "Attendance status",
            "hint": "",
            "options": [
                {
                    "id": "qGJjG",
                    "label": "Absence"
                },
                {
                    "id": "a2NLP",
                    "label": "Administrative code"
                },
                {
                    "id": "BfP7J",
                    "label": "Attendance"
                },
                {
                    "id": "zvUFQ",
                    "label": "Late sessions"
                },
                {
                    "id": "kaNhs",
                    "label": "Not a possible session"
                },
                {
                    "id": "u9Mo4",
                    "label": "Possible sessions"
                }
            ]
        },
        {
            "id": "tdEm5",
            "column": "attendance_typ

### Example Query

In [18]:
data = {
    "indicators": ["tj0Em", "fzaYF", "jgoAM"]
}

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 20, Finished, Available, Finished)

In [19]:
SetB.post_data_set_query(data)

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 21, Finished, Available, Finished)

Retrived page 1 of 3316.
Retrived page 2 of 3316.
Retrived page 3 of 3316.
Retrived page 4 of 3316.
Retrived page 5 of 3316.
Retrived page 6 of 3316.
Retrived page 7 of 3316.
Retrived page 8 of 3316.
Retrived page 9 of 3316.
Retrived page 10 of 3316.
Retrived page 11 of 3316.
Retrived page 12 of 3316.
Retrived page 13 of 3316.
Retrived page 14 of 3316.
Retrived page 15 of 3316.
Retrived page 16 of 3316.
Retrived page 17 of 3316.
Retrived page 18 of 3316.
Retrived page 19 of 3316.
Retrived page 20 of 3316.
Retrived page 21 of 3316.
Retrived page 22 of 3316.
Retrived page 23 of 3316.
Retrived page 24 of 3316.
Retrived page 25 of 3316.
Retrived page 26 of 3316.
Retrived page 27 of 3316.
Retrived page 28 of 3316.
Retrived page 29 of 3316.
Retrived page 30 of 3316.
Retrived page 31 of 3316.
Retrived page 32 of 3316.
Retrived page 33 of 3316.
Retrived page 34 of 3316.
Retrived page 35 of 3316.
Retrived page 36 of 3316.
Retrived page 37 of 3316.
Retrived page 38 of 3316.
Retrived page 39 of 3

'Query Suceeded'

In [21]:
SetB.query_response_df.write.format("delta").mode("overwrite").saveAsTable("LH_DFE.raw.set_b")

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 23, Finished, Available, Finished)

## Test data set C: A Level and equivalent results

In [23]:
version = "1.0"
data_set_id = "3bb59201-0d1e-1e74-a745-8f65fe5895cf"

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 25, Finished, Available, Finished)

In [24]:
SetC = DfE_API(data_set_id, version)

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 26, Finished, Available, Finished)

In [28]:
print(json.dumps(SetC.data_set_meta_json["indicators"], indent=4))

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 30, Finished, Available, Finished)

[
    {
        "id": "zvUFQ",
        "column": "retention_pupil_count",
        "label": "Number of students retained",
        "decimalPlaces": 0
    },
    {
        "id": "kaNhs",
        "column": "retention_2ndyr_pupil_count",
        "label": "Number of students retained (2nd year)",
        "decimalPlaces": 0
    },
    {
        "id": "u9Mo4",
        "column": "points_per_entry",
        "label": "Points per entry",
        "decimalPlaces": 2
    },
    {
        "id": "XH4fK",
        "column": "points_per_entry_grade",
        "label": "Points per entry (grade)",
        "decimalPlaces": 0
    },
    {
        "id": "TuxPJ",
        "column": "retention_perc",
        "label": "Retained",
        "unit": "%",
        "decimalPlaces": 2
    },
    {
        "id": "cZO31",
        "column": "pupil_count",
        "label": "Total students",
        "decimalPlaces": 0
    },
    {
        "id": "5Tsdi",
        "column": "pupil_trigger_count",
        "label": "Total triggered

### Example Query

In [29]:
data = {
    "indicators": ["zvUFQ", "cZO31", "5Tsdi","kaNhs","u9Mo4","XH4fK", "TuxPJ"]
}

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 31, Finished, Available, Finished)

In [30]:
SetC.post_data_set_query(data)

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 32, Finished, Available, Finished)

Retrived page 1 of 23.
Retrived page 2 of 23.
Retrived page 3 of 23.
Retrived page 4 of 23.
Retrived page 5 of 23.
Retrived page 6 of 23.
Retrived page 7 of 23.
Retrived page 8 of 23.
Retrived page 9 of 23.
Retrived page 10 of 23.
Retrived page 11 of 23.
Retrived page 12 of 23.
Retrived page 13 of 23.
Retrived page 14 of 23.
Retrived page 15 of 23.
Retrived page 16 of 23.
Retrived page 17 of 23.
Retrived page 18 of 23.
Retrived page 19 of 23.
Retrived page 20 of 23.
Retrived page 21 of 23.
Retrived page 22 of 23.
Retrived page 23 of 23.


'Query Suceeded'

In [31]:
display(SetC.query_response_df)

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 12f3d0a4-87be-4395-9c62-89f2dc615d85)

In [32]:
SetC.query_response_df.write.format("delta").mode("overwrite").saveAsTable("LH_DFE.raw.set_c")

StatementMeta(, 0629b5ab-3a61-49fd-a407-1d0b1160af0c, 34, Finished, Available, Finished)