In [None]:
import requests as r
import re
import os
import logging
from datetime import datetime, timedelta
from dotenv import load_dotenv
import logging
from shapely import from_wkt
from shapely.geometry import Polygon, MultiPolygon

In [None]:
load_dotenv()

True

In [None]:
def get_access_token(client_id: str, client_secret: str) -> str:
    '''
    Supplies a temporary access token for of the OS NGD API
    Times out after 5 minutes
    Takes the project client_id and client_secret as input
    '''

    url = "https://api.os.uk/oauth2/token/v1"

    data = {
        "grant_type": "client_credentials"
    }

    response = r.post(
        url, 
        auth=(client_id, client_secret),
        data=data
    )

    json_response = response.json()
    if response.status_code >= 400:
        raise Exception(json_response)
    token = json_response["access_token"]

    return token

In [None]:
def OAauth2Manager(func: callable):

    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            client_id = os.environ.get('CLIENT_ID')
            client_secret = os.environ.get('CLIENT_SECRET')
            access_token = get_access_token(
                client_id=client_id,
                client_secret=client_secret
            )
            kwargs['access_token'] = access_token
            return func(*args, **kwargs)

    return wrapper

In [None]:
def generic_request(url: str, headers: dict = {}, access_token: str = None, **kwargs) -> dict:

    if access_token:
        headers['Authorization'] = f"Bearer {access_token}"
    response = r.get(url, headers=headers, **kwargs)
    json_response = response.json()
    if response.status_code >= 400:
        raise Exception(json_response)

    return json_response

In [None]:
def construct_query_params(**kwargs) -> str:
    '''
    Constructs a query string from a dictionary of key-value pairs.
    Refer to https://osdatahub.os.uk/docs/ofa/technicalSpecification for details about query parameters.
    The options are:
        - bbox
        - bbox-crs
        - crs
        - datetime
        - filter
        - filter-crs
        - filter-lang
        - limit
        - offset
    '''
    params_list = [f'{k}={v}' for k, v in kwargs.items()]
    return '?' + '&'.join(params_list)

In [None]:
def wkt_to_spatial_filter(wkt):
    return f'(INTERSECTS(geometry,{wkt}))'

In [None]:
def construct_filter_param(**kwargs):
    for k, v in kwargs.items():
        if type(v) == str:
            kwargs[k] = f"'{v}'"
    filter_list = [f"({k}={v})" for k, v in kwargs.items()]
    return 'and'.join(filter_list)

In [None]:
def ngd_items_request(
    collection: str,
    query_params: dict = {},
    filter_params: dict = {},
    headers: dict = {},
    access_token: str = None,
    **kwargs
) -> dict:
    
    filter_params = filter_params.copy()
    query_params = query_params.copy()

    if filter_params:
        filters = construct_filter_param(**filter_params)
        current_filters = query_params.get('filter')
        query_params['filter'] = f'({current_filters})and{filters}' if current_filters else filters

    wkt = kwargs.get('wkt')
    del kwargs['wkt']
    if wkt:
        spatial_filter = wkt_to_spatial_filter(wkt)
        current_filters = query_params.get('filter')
        query_params['filter'] = f'({current_filters})and{spatial_filter}' if current_filters else spatial_filter


    query_params_string = construct_query_params(**query_params)
    url = f'https://api.os.uk/features/ngd/ofa/v1/collections/{collection}/items/{query_params_string}'
    print(url)

    if access_token:
        headers['Authorization'] = f"Bearer {access_token}"
    response = r.get(url, headers=headers, **kwargs)
    json_response = response.json()

    if response.status_code >= 400:
        raise Exception(json_response)

    return json_response

In [None]:
def get_latest_collection_versions(flag_recent_updates: bool = True, recent_update_days: int = 31) -> tuple[dict[str: str], list[str]]:
    '''
    Returns the latest collection versions of each NGD collection.
    Feature collections follow the following naming convention: theme-collection-featuretype-version (eg. bld-fts-buildingline-2)
    The output of this function maps base feature collection names (theme-collection-featuretype) to the full name, including the latest version.
    This can be used to ensure that software is always using the latest version of a feature collection.
    More details on feature collection naming can be found at https://docs.os.uk/osngd/accessing-os-ngd/access-the-os-ngd-api/os-ngd-api-features/what-data-is-available
    '''
    response = r.get('https://api.os.uk/features/ngd/ofa/v1/collections/')
    collections_data = response.json()['collections']
    collections_list = [collection['id'] for collection in collections_data]
    collection_base_names = set([re.sub(r'-\d+$', '', c) for c in collections_list])
    output_lookup = dict()
    for base_name in collection_base_names:
        all_versions = [c for c in collections_list if c.startswith(base_name)]
        latest_version = max(all_versions, key=lambda c: int(c.split('-')[-1]))
        output_lookup[base_name] = latest_version
    recent_collections = None
    if flag_recent_updates:
        time_format = r'%Y-%m-%dT%H:%M:%SZ'
        recent_update_cutoff = datetime.now() - timedelta(days=recent_update_days)
        latest_versions_data = [c for c in collections_data if c['id'] in output_lookup.values()]
        recent_collections = list()
        for collection_data in latest_versions_data:
            version_startdate = collection_data['extent']['temporal']['interval'][0][0]
            time_obj = datetime.strptime(version_startdate, time_format)
            if time_obj > recent_update_cutoff:
                collection = collection_data['id']
                recent_collections.append(collection)
                logging.warning(f'{collection} is a recent version/update from the last {recent_update_days} days.')
    return output_lookup, recent_collections

In [None]:
def get_single_latest_collection(collection: str, **kwargs) -> str:
    '''
    Returns the latest collection of a given collection base.
    Input must be in the format theme-collection-featuretype (eg. bld-fts-buildingline)
    Output will complete the full name of the feature collection by appending the latest version number (eg. bld-fts-buildingline-2)
    More details on feature collection naming can be found at https://docs.os.uk/osngd/accessing-os-ngd/access-the-os-ngd-api/os-ngd-api-features/what-data-is-available
    '''
    latest_collections = get_latest_collection_versions(**kwargs)
    latest_collection = latest_collections[collection]
    return latest_collection

In [None]:
def call_multiple_collections(collections: list[str], **kwargs) -> dict[str: dict]:
    results = dict()
    for c in collections:
        json_response = ngd_items_request(c, **kwargs)
        results[c] = json_response
    return results

In [None]:
def multiple_collections_extension(func: callable, collections: list[str], **kwargs) -> dict:

    def wrapper():
        results = dict()
        for c in collections:
            json_response = func(c, **kwargs)
            results[c] = json_response
        return results

    return wrapper()

We need a bunch of functional working APIs
We need training API calls
We need API calls in agnostic approach
CSV of API calls

In [None]:
def feature_limit_extension(func: callable):
    f"""
    This is an extension of functions returning ngd features. It serves to extend the maximum number of features returned above the default maximum 100 by looping through multiple requests.
    It takes the following arguments:
    - collection: The name of the collection to be queried.
    - request_limit: The maximum number of requests to be made function. Default is 50.
    - feature_limit: The maximum number of features to be returned. Default is None.
    - query_params: A dictionary of query parameters to be passed to the function. Default is an empty dictionary.
    To prevent indefinite requests and high costs, at least one of feature_limit or request_limit must be provided, although there is no limit to the upper value these can be.
    It will make multiple requests to the function to compile all features from the specified collection, returning a dictionary with the features and metadata.
    """
    def wrapper(
        *args,
        request_limit: int = 50,
        feature_limit: int = None,
        query_params: dict = {},
        **kwargs
    ):

        query_params = query_params.copy()
        if 'offset' in query_params:
            raise AttributeError('offset is not a valid argument for functions using this decorator.')

        items = list()

        batch_count, final_batchsize = divmod(feature_limit, 100) if feature_limit else (None, None)
        request_count = 0
        offset = 0

        if not(feature_limit) and not(request_limit):
            raise AttributeError('At least one of feature_limit or request_limit must be provided to prevent indefinitely numerous requests and high costs. However, there is no upper limit to these values.')

        while (request_count != request_limit) and (not(feature_limit) or offset < feature_limit):

            if request_count == batch_count:
                print('final batch of size', final_batchsize)
                query_params['limit'] = final_batchsize
            query_params['offset'] = offset

            json_response = func(*args, query_params=query_params, **kwargs)
            items += json_response['features']

            if not [link for link in json_response['links'] if link['rel'] == 'next']:
                break

            request_count += 1
            offset += 100

        geojson = {
            "type": "FeatureCollection",
            "numberOfRequests": request_count,
            "totalNumberReturned": len(items),
            "timeStamp": datetime.now().isoformat(),
            "collection": kwargs.get('collection'),
            "source": "Compiled from code by Geovation from Ordnance Survey",
            "features": items
        }
        return geojson

    return wrapper

In [None]:
def multigeometry_search_extension(func: callable):

    def wrapper(*args, wkt: str, **kwargs):

        multi_geom = from_wkt(wkt) if type(wkt) == str else wkt
        search_areas = list()

        for search_area, geom in enumerate(multi_geom.geoms):
            json_response = func(*args, wkt=geom, **kwargs)
            json_response['serach_area'] = search_area
            search_areas.append(json_response)

        geojson = {
            "type": "FeatureCollection",
            "searchAreas": search_areas
        }

        return geojson

    return wrapper

In [None]:
x=Polygon({(400000,400000), (400050,400000), (400050,400050), (400000,40050)})
y=Polygon({(500000,500000), (500050,500000), (500050,500050), (500000,50050)})

In [None]:
z=MultiPolygon([x,y])

In [None]:
ngd_items_request_OAuth = OAauth2Manager(ngd_items_request)
ngd_items_OAuth_muligeom = multigeometry_search_extension(ngd_items_request_OAuth)
ngd_items_OAuth_all_features = feature_limit_extension(ngd_items_request_OAuth)

In [None]:
ngd_all_features = feature_limit_extension(ngd_items_request)
ngd_all_features_OAuth = OAauth2Manager(ngd_all_features)