# Pure Python Pipeline - Initial sketch

The intention here is to sketch our the functions, classes and interactions required to run a purely python data ingestion pipeline.

It's an important design consideration to try and enforce disintct decoupling and seperation of concern between the thing (the sequence of operations that inform/power a pipeline) and the runner (the thing running the pipeline) such that the runner can be freely changed as needed.

## Concepts

It's worth understanding a few high level concepts before reading the code in this notebook - as follows:


### store

All versions of this pipeline run against a directory of files or (in the case of buckets) an abstract representation of this.

So our "store" is a class for interacting with a directory of files in such a way that the code doesn't care if its AWS bucket, local directory, google cloud storage, github etc under the hood - this means all interactions are standard regardless of the directory storage backend. That way  you can freely swap out that storage backend (for example: while developing or when running tests) and not alter the pipeline code.

It _also_ means that since the store class is really just _references to files in a place_ it's _json serialisable_, something that is potentially of use when used with certain runners (airflow for example, can only pass variables between tasks if they can be json serialised).

The abstract for this class (the place that enforces that all variation of store behave the same is) `BaseWritableSingleDirectoryStore` but the easier way to understand what this is is to look at `FakeS3DirectoryStore` in `./deps/store.s32fake.py` - all the methods should have sensible descriptions of what they do.

I'd **strongly** recommend looking at that file first, the rest of this example will make far more sense then.

### deps.other

There other bits of reusable functionality we'll need but I'm not sure yet where they'd go so all associated functions are (for the sake of this example) just getting imported from here.

As with the fake store they're just empty or have hard coded returns.

Again, worth a read to understand the intention, I've tried to add decent comments.

### notify

We know that these pipelines need to notify various parties upon various events (in particular when something goes wrong).

The "notifier" is intended to handle that in one place in a convenient fashion, i.e `notify.data_engineering("Something DEs need to know")`.

The idea is that _how_ it does that happens in this one central place, so if we want to change the notifying logic (such as to send a message to MS Teams rather than Slack) we only need to change the code in one place.

### messages

Given we're sending many messages it made sense to pull them out and assembled them via functions so we can manage them separate to the code.

Doesn't matter much right now, but when you start thinking about formatting complex emails and the like, it'd be nice to have all that separated out so the pipeline code only deals with running the pipeline.

# Overarching Design

This is intended to be pipelines in the form of **decoupled and simple python functions**.

_Where_ we happend to run this code is not a consideration at this point and all efforts should be taken to avoid bringining logic from the runner (airflow, lambdas, whatever) into scope.

The ideal is we build a thing that could literally just be the following functions be called via a script, we're then free to utilise that process on _any_ runer.


# Function 1 - Confirm Configuration

I'm envisioing this as the very first step that's triggered after some amount of files appear in some directory like structure.

It will return the pipeline name and store object for use by the next function.

### Assumptions

- A unique identifier for the pipeline is passed to the function.
- Some files appear in something that functions like a directory (s3 bucket, actual directory).
- That directory is passed to this function.

Therefore some example inputs would/could be:

- `cpih-22-02-2024T01§:01:01` or similar (some kind of unique but useful convention.
- `s3://my-aws-bucker/dataset-cpih/23-01-2024T01:01:00/initial` or `/my-files/localdata/dataset-cpih/23-01-2024T01:01:00/initial`.


In [6]:
import json
from typing import Tuple

from jsonschema import ValidationError

from deps.store.base import BaseWritableSingleDirectoryStore
from deps import notify
from deps import message
from deps.other import (
    heuristically_create_pipeline_config,
    schema_path_from_config,
    directory_store_from_pathlike_source,
    validate_json,
    get_supplementary_distribution_patterns,
    get_required_file_patterns
)

def confirm_configuration(pipeline_identifier: str, pathlike_source: str) -> Tuple[str, BaseWritableSingleDirectoryStore]:
    """
    The purpose of this function is to: 

    - (a) Confirm the submisson directory contains a file "pipeline-config.json"
    - (b) Create one where it does not using the files that were submitted
    - (c) Validate the "pipeline-config.json"

    Returns the pipeline name and the store ready for use by the next function.
    """

    store: BaseWritableSingleDirectoryStore = directory_store_from_pathlike_source(pathlike_source)
    
    # if this submissions does not have a pipeline-config.json....
    we_created_config = False
    if not store.has_lone_file_matching("pipeline-config.json"):
        try:
            heuristically_create_pipeline_config(store)
            store.add_file("pipeline-config.json")
            we_created_config = True
        except Exception as err:
            notify.data_engineering(message.cant_create_config(pipeline, store, err))
            raise err


    config_dict: dict = store.get_lone_matching_json_as_dict("pipeline-config.json")

    try:
        config_schema_path = schema_path_from_config(config_dict)
    except Exception as err:
        notify.data_engineering(message.cant_find_scheama(config_dict, err))
        raise err

    
    # Now we have a config and the schema for that config we validate it.
    try:
        validate_json(config_schema_path, data_dict=config_dict, msg="Some helpful message", indent=2)
    except ValidationError as verr:

        # If the pipeline schema is not valid, tell whoever provided it (can be either us or the external submitter)
        if we_created_config:
            notify.data_engineering(messge.invalid_heuristically_created_config(config_dict))
        else:
            notify.data_submitter(message.invalid_config, error)
            notify.data_engineering(
                message.subitter_notified(
                    "Submitter notified of schema error in pipeline-config.json",
                    config_dict,
                    error)
                )
        raise verr
    except Exception as err:
        # We got an unexpected error while trying to validate, let the DEs know.
        notify.data_engineering(
            message.unexpected_error(
                "Error encounted when trying to validate pipeline-config.json",
                err)
            )
        raise err

    # Confirm that the files required for this pipeline and included in
    # the submitted files.
    for required_file_pattern in get_required_file_patterns(config_dict):
        if not store.has_lone_file_matching(required_file_pattern):
            msg = message.expected_file_submission_missing(
                f"Unable to find required file matching regex {required_file_pattern} in submission",
                config_dict, store, pipeline_name=pipeline_name)
            notify.data_submitter(message.invalid_config, error)
            notify.data_engineering(
                message.subitter_notified(
                    "Submitter notified of schema error in pipeline-config.json",
                    config_dict,
                    error)
                )
    
    # Where one or more supplementary distributions have been specified, confirm those
    # files exist in the directory of submitted files.
    for supplementry_distribution_pattern in get_supplementary_distribution_patterns(config_dict):
        if not store.has_lone_file_matching(supplementry_distribution_pattern):
            msg = message.expected_file_submission_missing(
                f"Unable to find required file matching regex {supplementry_distribution_pattern} in submission",
                config_dict, store, pipeline_name=pipeline_name)

    return pipeline_identifier, store
            

# Key Concept  - Pipeline Details

In all cases the only things that are subject to change in these pipelines are:

- (a) the inputs and how they are sanity checked.
- (b) the transform (if any) that's being used by the pipeline and takes these inputs.

The intention here is to capture these variations via a simple dictionary `all_pipeline_details` so the pipeline code can just be reused.

The keys in this dictionary ("sdmx.default" in the example) will be taken from the pipeline config that's in play whenever a pipeline is ran but otherwise the pipeline code itself remains static.


In [7]:
from deps.transform.sdmx import smdx_default_v1, sdmx_sanity_check_v1

all_pipeline_details = {
	"sdmx.default": {
		"transform": smdx_default_v1,
		"transform_inputs": {
			"*.sdmx": sdmx_sanity_check_v1
		},
		"transform_kwargs": {}
    }
}

The intention here is to capture the steps of a transform with a generic pattern so creating a new transform is just a matter of adding a new entry to the `all_pipeline_details` dictionary rather than recoding or creating bespoke pipelines.

The above is is effectively saying:

- if the "pipeline" specified in pipeline-config.json is `sdmx.default`
- we should use the trasform function `sdmx_default_v1`
- we have one input which is the file from the directory store that matches the regex `*.sdmx` (if a transform required multiple inputs, there'd be multiple key value pairs here).
- the  inputs should be validated by the associated "sanity_check" function, i.e the sdmx in the example should be sanity checked  with the `sdmx_sanity_check_v1` function.
- this transform takes no keyword arguments (future proofing, its concievable DEs might need to do this in  some scenarios to increare reusibility:`{"fillna": True}` or somesuch).


# Function/Step 2 - Transform

The following is a generic transform step that makes use of "pipeline details" provided via this structure.

This will create a new csv and metadata file and write to the current working directory.

It will return the pipeline name and store object for use by the next function.


In [8]:
import json
from typing import Tuple

from jsonschema import ValidationError

from deps.store.base import BaseWritableSingleDirectoryStore
from deps import notify
from deps.other import (
    get_pipeline_identifier_from_config,
    validate_json,
    UploadServiceClient,
    get_supplementary_distribution_patterns,
    validate_csv
)

def transform(pipeline_name, store: BaseWritableSingleDirectoryStore) -> Tuple[str, BaseWritableSingleDirectoryStore]:
	
    config_dict: dict = store.get_lone_matching_json_as_dict("pipeline-config.json")
    pipeline_identifier = get_pipeline_identifier_from_config(config_dict)
    
    # Use that to get the details we need to run this pipeline
    pipeline_details = all_pipeline_details.get(pipeline_identifier, None)
    
    if pipeline_details is None:
        msg = message.unknown_pipeline(pipeline_identifier, config_dict)
        notify.data_engineering(msg)
        raise ValueError(msg)
    
    args = []
    for match, sanity_checker in pipeline_details["transform_inputs"].items():
        try:
            input_file_path: Path = store.save_lone_file_matching(match)
        except Exception as err:
            notify.data_engineering(message.pipeline_input_exception(pipeline_details, store, err))
            raise err

        try:
            sanity_checker(input_file_path)
        except Exception as err:
            notify.data_engineering(message.pipeline_input_sanity_check_exception(pipeline_details, store, err))
            raise err
        
        args.append(input_file_path)

    kwargs = pipeline_details["transform_kwargs"]
    transform_function = pipeline_details["transform"]
    
    try:
        csv_path, metadata_path = transform_function(*args, **kwargs)
    except Exception as err:
        # Something has gone wrong in the transform, let DE team know.
        notify.data_engineering(message.error_in_transform(config_dict, store, err))
        raise err

    # Validate the metadata
    metadata_schema = "" # We get this from somwhere, no idea where yet
    try:
        validate_json(metadata_schema, data_path=metadata_path, msg="Some helpful message", indent=2)
    except ValidationError as verr:
        # THINK - if the artifacts are transient we probably need to store them so an enginner
        # can see precisely what input caused the problem
        notify.data_engineering(messge.metadata_validation_error(metadata_path, verr))
        raise verr
    except Exception as err:
        # We got an unexpected error while trying to validate, let the DEs know.
        # THINK - if the artifacts are transient we probably need to store them so an enginner
        # can see precisely what input caused the problem
        notify.data_engineering(
            message.unexpected_error(
                "Error encounted when trying to validate dataset metadata.",
                err)
            )
        raise err

    # Validate the csv
    try:
        validate_csv(csv_path, metadata_path)
    except Exception as err:
        notify.data_engineering(
            message.unexpected_error(
                "Error encounted when trying to validate csv.",
                err)
            )
        # THINK - if the artifacts are transient we probably need to store them so an enginner
        # can see precisely what input caused the problem
        raise err
        
    return pipeline_name, store


# Function/Step 3 - Upload

The intention here is to upload the metadata and data to the appropriate website backend services.

## Assumptions

- a csv file exists in the working directory as `data.csv`.
- a json metadata exists in the working directoryu as `metadata.json`.


In [9]:
from typing import Callable
from pathlib import Path

from deps.other import DatasetApiClient

def upload(pipeline_name, store: BaseWritableSingleDirectoryStore):
    """
    Upload the data files to the dp-upload-service and the metadata to the dp-dataaset-api.
    """

    config_dict: dict = store.get_lone_matching_json_as_dict("pipeline-config.json")

    csv_path = Path("data.csv")
    if not csv_path.exists():
        msg = f'The expected file "{csv_path}" does not exist'
        notify.data_engineering(
            message.expected_local_file_missing(
                msg,
                csv_path,
                pipeline_name
            )
        )
        raise ValueError(msg)

    metadata_path = Path("metadata.json")
    if not metadata_path.exists():
        msg = f'The expected file "{metadata_path}" does not exist'
        notify.data_engineering(
            message.expected_local_file_missing(
                msg,
                csv_path,
                pipeline_name
            )
        )
        raise ValueError(msg)
        
    # Upload files that are just files
    files_to_upload = [csv_path]

    for supplementry_distribution in get_supplementary_distribution_patterns(config_dict):
        # Download the supplementary file
        file_to_upload: Path = store.save_lone_file_matching(supplementry_distribution)
        files_to_upload.append(file_to_upload)

    upload_service_client = UploadServiceClient()
    for file_to_upload in files_to_upload:
        try:
            upload_service_client.upload(file_to_upload)
        except HttpException as err:
            notify.software_engineering(
                message.unexpected_error(
                    "Error encounted when trying to upload csv data to the upload service.",
                    err)
                )
        
    # Upload the metadata to the dp-dataset-api
    dataset_api_client = DatasetApiClient()
    try:
        new_dataset_url_or_identifier = dataset_api_client.upload_metadata(metadata_path)
    except Exception as err:
        notify.software_engineering(
            message.unexpected_error(
                "Error encounted when trying to upload metadata to the dataset api.",
                err)
            )

    new_dataset_url_or_identifier = "TODO - create me somehow, probably in the metadata"
    
    notify.publishing_support(f"The dataset {new_dataset_url_or_identifier} is ready for use in the publication process")


# Putting it all together

To use (if all the functions and class were implemented) you'd do something like the following:


In [10]:
from deps.store.s3fake import FakeS3DirectoryStore

pipeline_name, store = confirm_configuration(
        "cpih-22-01-2024",
        "s3://some-aws-bucket/dataset-cpih/22-01-2024T00:00:00/initial")

pipeline_name, store = transform(pipeline_name, store)

upload(pipeline_name,
       store)