### Debug Resource Transformation

One of the key errors we get in the pipline is when an error is thrown when processing a resource. Specifically after collection and when we are running it through the pipeline. this notebook offers a quick way for developers to process a resource using python without having to understand where in the pipeline a single resource is processed.


In [45]:
%load_ext autoreload
%autoreload 2

import urllib
from pathlib import Path

from digital_land.commands import pipeline_run
from digital_land.specification import Specification
from digital_land.configuration.main import Config
from digital_land.collection import Collection
from digital_land.pipeline import Pipeline
from digital_land.organisation import Organisation

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


#### Imports

In [46]:
# a resource hash is included but just change 
resource_hash = '1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836'
# you need to tell the system which dataset it is processing the resource for. Most will only be for a single dataset but there are outliers where it's processed for more than one.
dataset = 'article-4-direction-area'
# endpoint hash TODO we should look at automating the retrieval of this in the future
# endpoint_hash = 'f4bfb0e3a3f0f0e2e5e1f3c6e4b2a7d8c9e0f1a2b3c4d5e6f7g8h9i0j1k2l3m4'

# can leave these as default
data_dir = Path('./data/debug_resource_transformation')
data_dir.mkdir(parents=True, exist_ok=True)

#### Download required data

* resource - download the resource from the data collection s3 bucket to do this we'll need to identify the collection the dataset is part of and then we can formulate a link to download it from the CDN
* specification - need access to the current spec so will download
* configuration files - we only need the pipeline config files for the collection.


In [47]:
# Specification first as can be used for the others
specification_dir = data_dir / 'specification'
specification_dir.mkdir(parents=True, exist_ok=True)
Specification.download(specification_dir)

spec = Specification(specification_dir)
collection = spec.dataset[dataset]['collection']

if not collection:
    raise ValueError(f'Dataset {dataset} does not have a collection defined in the specification')

# download the configuration files for that collection
pipeline_dir = data_dir / 'pipeline'
pipeline_dir.mkdir(parents=True, exist_ok=True)

Config.download_pipeline_files(path=pipeline_dir, collection=collection)

# download the resource
data_collection_url = 'https://files.planning.data.gov.uk/'
resource_url = f'{data_collection_url}{collection}-collection/collection/resource/{resource_hash}'
resource_path = data_dir /'resource' / f'{resource_hash}'
resource_path.parent.mkdir(parents=True, exist_ok=True)
if not resource_path.exists():
    print(f'Downloading {resource_url} to {resource_path}')
    urllib.request.urlretrieve(resource_url, resource_path)
else:
    print(f'Using existing file {resource_path}')

# we need to know the endpoint hash for the resource so will need to download the logs
collection_dir = data_dir / 'collection'
collection_dir.mkdir(parents=True, exist_ok=True)
Collection.download(path=collection_dir,collection=collection)

# download organisation data
cache_dir = data_dir / 'cache'
cache_dir.mkdir(parents=True, exist_ok=True)
org_path = cache_dir / 'organisation.csv'
Organisation.download(path=org_path)




Using existing file data/debug_resource_transformation/resource/1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836


#### Process resource

now we have the resource downloaded we can run the code to process a resource. this will require some manipulation of the collection logs to get the relevant details

In [92]:

output_path = data_dir / 'transformed' / dataset / f'{resource_hash}.csv'
output_path.parent.mkdir(parents=True, exist_ok=True)
converted_path = data_dir / 'converted' / dataset / f'{resource_hash}.csv'
converted_path.parent.mkdir(parents=True, exist_ok=True)

# create pipeline object
pipeline = Pipeline(pipeline_dir, dataset)

# create logs
issue_dir = data_dir / 'issues' / dataset
issue_dir.mkdir(parents=True, exist_ok=True)
operational_issue_dir = data_dir / 'performance' / 'operational_issues'
operational_issue_dir.mkdir(parents=True, exist_ok=True)
column_field_dir = cache_dir / 'column_field' / dataset
column_field_dir.mkdir(parents=True, exist_ok=True)
dataset_resource_dir = cache_dir / 'dataset_resource' / dataset
dataset_resource_dir.mkdir(parents=True, exist_ok=True)
converted_resource_dir = cache_dir / 'converted_resource' / dataset
converted_resource_dir.mkdir(parents=True, exist_ok=True)
output_log_dir = data_dir / 'log'
output_log_dir.mkdir(parents=True, exist_ok=True)

# get endpoints from the collection TODO include redirects
collection = Collection(directory = collection_dir)
collection.load()
endpoints = collection.resource_endpoints(resource_hash)
organisations = collection.resource_organisations(resource_hash)
entry_date = collection.resource_start_date(resource_hash)

# build config from downloaded files 
config_path = cache_dir / 'config.sqlite3'
config = Config(path=config_path, specification=spec)
config.create()
tables = {key: pipeline.path for key in config.tables.keys()}
config.load(tables)

pipeline_run(
    dataset=dataset,
    pipeline=pipeline,
    specification=spec,
    input_path=resource_path,
    output_path=output_path,
    collection_dir=collection_dir,  # TBD: remove, replaced by endpoints, organisations and entry_date
    null_path=None,  # TBD: remove this
    issue_dir=issue_dir,
    operational_issue_dir=operational_issue_dir,
    organisation_path=org_path,
    save_harmonised=False,
    #  TBD save all logs in  a log directory, this will mean only one path passed in.
    column_field_dir=column_field_dir,
    dataset_resource_dir=dataset_resource_dir,
    converted_resource_dir=converted_resource_dir,
    cache_dir=cache_dir,
    endpoints=endpoints,
    organisations=organisations,
    entry_date=entry_date,
    config_path=config_path,
    resource=resource_hash,
    output_log_dir=output_log_dir,
    converted_path=converted_path,
)

print(f'resource {resource_hash} for dataset {dataset} transformed to {output_path}')

resource 1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836 for dataset article-4-direction-area transformed to data/debug_resource_transformation/transformed/article-4-direction-area/1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836.csv
