This report provides compliance to specification information on the most latest endpoints for a hardcoded list of prioritised list of LPAs, or organisations from an input.

The column 'structure_score' tells us how much data an endpoint is giving us as a fraction of what we ask for. The column 'column_name_score' tells us how many columns are correctly named.

Example: a column name that is incorrect (e.g 'area' instead of 'geometry') but the data in it has been detected as correct data will score in the 'structure_score' column but not the 'column_name' column

The input should be called 'organisation_input.csv' and contain one column, 'organisation' that has the organisation codes for the LPAs to be included in the report.

In [None]:
# %pip install wget
import wget
import pandas as pd
import os
import numpy as np
import urllib


Download helper utility files from GitHub:

In [None]:
util_file = "master_report_endpoint_utils.py"
if os.path.isfile(util_file):
    from master_report_endpoint_utils import *
else:
    url = "https://raw.githubusercontent.com/digital-land/jupyter-analysis/main/service_report/master_report/master_report_endpoint_utils.py"
    wget.download(url)
    from master_report_endpoint_utils import *

The default prioritised LPAs are used unless a specific set of LPAs is detected using an 'organisation_input.csv' file in the same directory as this notebook.

In [None]:
# Get input from .csv or use default prioritised LPAs
input_path = './organisation_input.csv'
if os.path.isfile(input_path):
    input_df = pd.read_csv(input_path)
    organisation_list = input_df['organisation'].tolist()
    print('Input file found. Using', len(organisation_list), 'organisations from input file.')
else:
    provision_df = get_provisions()
    organisation_list = provision_df["organisation"].str.replace(":","-eng:")
    print('Input file not found. Using default list of organisations.')

In [None]:
def get_endpoint_resource_data():
    datasette_url = "https://datasette.planning.data.gov.uk/"
  
    params = urllib.parse.urlencode({
        "sql": f"""
        select
            e.endpoint_url,
            l.endpoint,
            l.status,
            l.exception,
            s.collection,
            l.resource,
            sp.pipeline,
            s.organisation,
            o.name,
            l.entry_date as log_entry_date,
            e.entry_date as endpoint_entry_date,
            e.end_date as endpoint_end_date,
            r.start_date as resource_start_date,
            r.end_date as resource_end_date
        from
            most_recent_log l
            inner join source s on l.endpoint = s.endpoint
            inner join endpoint e on l.endpoint = e.endpoint
            inner join organisation o on o.organisation = replace(s.organisation, '-eng', '')
            inner join source_pipeline sp on s.source = sp.source
            left join resource r on l.resource = r.resource
        where
            sp.pipeline IN ('article-4-direction', 'article-4-direction-area', 'conservation-area', 'conservation-area-document', 'listed-building-outline', 'tree-preservation-order', 'tree-preservation-zone', 'tree')

        order by s.organisation, sp.pipeline, log_entry_date desc
        """,
        "_size": "max"
    })
    
    url = f"{datasette_url}digital-land.csv?{params}"
    df = pd.read_csv(url)
    return df

def get_fields_for_resource(resource, dataset):
    datasette_url = "https://datasette.planning.data.gov.uk/"
    params = urllib.parse.urlencode({
        "sql": f"""
        select f.field, fr.resource
        from 
            fact_resource fr
            inner join fact f on fr.fact = f.fact
        where 
            resource = '{resource}'
        group by
            f.field
        """,
        "_size": "max"
    })
    url = f"{datasette_url}{dataset}.csv?{params}"
    facts_df = pd.read_csv(url)
    # facts_list = facts_df['field'].tolist()
    return facts_df

def get_column_mappings_for_resource(resource, dataset):
    datasette_url = "https://datasette.planning.data.gov.uk/"
    params = urllib.parse.urlencode({
        "sql": f"""
        select column, field
        from 
          column_field  
        where 
            resource = '{resource}'
        """,
        "_size": "max"
    })
    url = f"{datasette_url}{dataset}.csv?{params}"
    column_field_df = pd.read_csv(url)
    return column_field_df



## Get endpoint data

In [None]:
# get data from datasette
endpoint_resource_df = get_endpoint_resource_data()

# filter to org_list, valid, active endpoints and resources
endpoint_resource_filtered_df = endpoint_resource_df[
    (endpoint_resource_df["organisation"].isin(organisation_list)) &
    (endpoint_resource_df["status"] == 200) &
    (endpoint_resource_df["endpoint_end_date"].isnull()) &
    (endpoint_resource_df["resource_end_date"].isnull())
].copy()

print(len(endpoint_resource_df))
print(len(endpoint_resource_filtered_df))

print(len(endpoint_resource_filtered_df[["endpoint", "pipeline"]].drop_duplicates()))
print(len(endpoint_resource_filtered_df[["resource"]].drop_duplicates()))
print(len(endpoint_resource_filtered_df[["endpoint"]].drop_duplicates()))

## Get field and col mapping data

In [None]:
# table of unique resources and pipelines
resource_df = endpoint_resource_filtered_df[["pipeline", "resource"]].drop_duplicates().dropna(axis = 0)
print(len(resource_df))

In [None]:
# generic function to try the resource datasette queries 
# will return a df with resource and dataset fields as keys, and query results as other fields
def try_results(function, resource, dataset):

    # try grabbing results
    try:
        df = function(resource, dataset)

        # if empty response give NaNs
        if len(df) == 0:
            df = pd.DataFrame({"field" : [np.nan]
            })

        df["resource"] = resource
        df["dataset"] = dataset

    # if error record resource and dataset
    except:
        df = pd.DataFrame({"resource" : [resource],
                           "dataset" : [dataset]
        })

    return df


# get results for col mappings and fields in arrays
results_col_map = [try_results(get_column_mappings_for_resource, r["resource"], r["pipeline"]) for index, r in resource_df.iterrows()]
results_field_resource = [try_results(get_fields_for_resource, r["resource"], r["pipeline"]) for index, r in resource_df.iterrows()]

# concat the results, resources which errored with have NaNs in query results fields
results_col_map_df = pd.concat(results_col_map)
results_field_resource_df = pd.concat(results_field_resource)

# no. of resources in each query response array
print(len(results_col_map))
print(len(results_field_resource))

# no of records in each results df
print(len(results_col_map_df))
print(len(results_field_resource_df))


In [None]:
# add in match field for column mappings 
results_col_map_df["field_matched"] = np.where(
        (results_col_map_df["field"].isin(["geometry", "point"])) |
        (results_col_map_df["field"] == results_col_map_df["column"]),
        1, 
        0
)

# add in flag for fields supplied (i.e. they're in the mapping table)
results_col_map_df["field_supplied"] = 1

# add in flag for fields present
results_field_resource_df["field_loaded"] = 1

## Calculating match rates

In [None]:
dataset_field_df = pd.read_csv('https://raw.githubusercontent.com/digital-land/specification/main/specification/dataset-field.csv')

# remove the pipeline-created fields from the spec field table
# ("entity", "organisation", "prefix", "point" for all but tree, and "entity", "organisation", "prefix" for tree)
dataset_field_subset_df = dataset_field_df[
    ((dataset_field_df["dataset"] != "tree") & (~dataset_field_df["field"].isin(["entity", "organisation", "prefix", "point"])) |
     (dataset_field_df["dataset"] == "tree") & (~dataset_field_df["field"].isin(["entity", "organisation", "prefix"])))
]

dataset_field_subset_df.head()

In [None]:
# rename pipeline to dataset in endpoint_resource table
endpoint_resource_filtered_df.rename(columns={"pipeline":"dataset"}, inplace=True)

# left join from endpoint resource table to all the fields that each dataset should have
resource_spec_fields_df = endpoint_resource_filtered_df[
    ["organisation", "name", "dataset", "endpoint", "status", "log_entry_date", "endpoint_entry_date", "resource"]
    ].merge(
        dataset_field_subset_df[["dataset", "field"]],
        on = "dataset"
)

print(len(resource_spec_fields_df))
resource_spec_fields_df.head()

In [None]:
# join on field present flag for each resource
resource_fields_match = resource_spec_fields_df.merge(
    results_field_resource_df[["dataset", "resource", "field", "field_loaded"]],
    how = "left",
    on = ["dataset", "resource", "field"]
)

print(len(resource_fields_match))
resource_fields_match.head()

In [None]:
# join on field present flag for each resource
resource_fields_map_match = resource_fields_match.merge(
    results_col_map_df[["dataset", "resource", "field", "field_supplied", "field_matched"]],
    how = "left",
    on = ["dataset", "resource", "field"]
)

print(len(resource_fields_map_match))
resource_fields_map_match.head()

In [None]:
resource_fields_map_match.replace(np.nan, 0, inplace=True)

final_count = resource_fields_map_match.groupby(
    ["organisation", "name", "dataset", "endpoint", "resource", "status", "log_entry_date", "endpoint_entry_date"]
    ).agg(
        {"field":"count",
         "field_supplied" : "sum",
         "field_matched" : "sum",
         "field_loaded" : "sum"}
         ).reset_index(
         ).sort_values(["name"])

# add a field for the endpoint number (so that orgs and datasets with multiple endpoints are split out and in index)
final_count["endpoint_number"] = final_count.groupby(["organisation", "name", "dataset"]).cumcount() + 1
# create % columns
final_count["field_supplied_pct"] = final_count["field_supplied"] / final_count["field"] 
final_count["field_matched_pct"] = final_count["field_matched"] / final_count["field"] 
final_count["field_loaded_pct"] = final_count["field_loaded"] / final_count["field"] 

# final_count.reset_index(drop=True, inplace=True)

final_count.head()

Changes to make to this report:

* Make sure list of orgs and datasets is exhaustive in report table
* Sense-check metric results
* Sort index


In [None]:
def make_pretty(styler):
    styler.relabel_index(["Fields Supplied", "Fields Loaded", "Field Names Matched"], axis=1)
    styler.format("{:.0%}")
    styler.background_gradient(axis=None, vmin=0, vmax=1, cmap="PiYG")
    return styler

final_count_out = final_count[
    ["name", "dataset", "endpoint_number", "field", "field_supplied_pct", "field_loaded_pct", "field_matched_pct"]
].copy()

final_count_out.sort_values(["name", "dataset", "endpoint_number"])
final_count_out.set_index(["name", "dataset", "field", "endpoint_number"], inplace=True)
final_count_out.style.pipe(make_pretty)