# OTE-API

*Connect anything - build a processing pipeline - run from anywhere!*

The Ontology-based Translation Environment Application Programming Interface is
a framework for connecting heterogenous data resources, semantic interoperability frameworks, open simulation platforms and standalone simulation tools. The OTE-API allows for building complex use-case representations by combining a set of simple reusable functions.



![./oteapi-Strategies.drawio.png](./oteapi-Strategies.drawio.png)

The OTEAPI provides a unified way to manage complex software
processing. By combining reusable elements for accessing or
downloading data, parsing information, transforming information and
performing business logic, the OTEAPI is highly adoptable to different
business cases.

OTEAPI is an implementation of the pipe and filter architectural
pattern. This simply describes a set of connected components that
process a stream of data from an input source to a reciever such as a
data sink. Once a pipeline is constructed, it is possible to execute
data processing as if it were a single component. The pipeline(s) can
furher be embedded in a larger workflow system.

A complex processing scenario is defined by constructing a
"pipeline" from reusable and interchangable parts. OTEAPI supports 6
main categories of tasks:

* Data access
* Data filtering (Data subset extraction)
* Syntactic analysis (Parsing)
* Ontological mapping (Semantic Data Modelling)
* Synchronous Information Processing (Functions)
* Asynchronous Processing (Transformations running in the background)



## Data access (Information Retrieval)

Accessing data will typically involve authorization, transportation
protocols and query languages. A simple data access module will be
able to initiate a transport protocol (for instance http) and access
an artifact given a unique URL.

## Data Filtering

Data filtering allows for extracting a subset of the available data,
by defining a view or a size limitation. The data filter is usually
very closesly related with the data accessor, and may include data
source specific query languages.

## Syntactic Analysis

Heterogenous datasources have different syntactic ways of defining the
structure and contents. Some datasource will provide schemas and
metadata (SQL, HDF5), some are documented fileformats (CSV, XLSX,
++). In order to manage the rich variety of formats, a parser or
syntactic analyser is needed for further processing of the data.

## Semantic Mapping

A data stream associated with a semantic datamodel allows for semantic
interoperability in the sense that the data can be correctly
interpreted from a common language or domain definition (ontology).

Training of a machine learning model is a practical example where
semantic mapping becomes very useful. The machine learning software is
agnostic to the meaning of the training data, however the datamodel
representing the input data can be dynamically mapped to appropriate
concepts. 


## Synchronous Information Processing

OTEAPI defines two different methods of generating new information
from a stream of data. With synchrounous information processing an
operation or *function* will be performed and completed the next
step in the pipeline is called. This task is very generic, and 
supports a wide range of different types of operations from performing
operations on the data to serializing data.

## Asynchronous Processing

Asynchronous processing differs from the synchronous processing in that 
it will start a background task and not wait for it to finish. Asynchronous 
processing is useful for instance when starting long-running computations.

# Pipe and Filter

![./context-filter-session-interaction.png](./context-filter-session-interaction.png)

The context-filter-session interaction diagrams illustrates the communication between a context class, a filter and the session. The session is simply an in-memory key-value storage that is used as temporal memory management in the communication between different filter types. The session data will first be fetched into the context class as sent as argument to the filter initialize method. Note that initialize is a pure function as the filter strategy is stateless, and the only purpose of the initialize method is to create an object that will be added or updated in the current session. 

In a composition that includes several pipes and filter connections, the initialize() method will be sequencially called going upstream from the last filter in the pipeline to the first. This allows the last filter to send a message to the first filters (via the session). This is needed when constraints cannot be determined at configuration time. See Filter Composision Interaction

# Filter Composision Interaction
![./pipe-and-filter-connection.png](./pipe-and-filter-connection.png)


dsds

![./pipe-filter-interaction.drawio.png](./pipe-filter-interaction.drawio.png)

# High Level Overview - Execution order

When a pipeline is composed of a set of *pipes* and *filters*, the pipeline can be executed as if it was a simple component. By calling the .get() method on the pipeline, a sequence of operations on the different strategies are executed.

![./pipeline.png](./pipeline.png)
A pipeline composed of 3 filters (FilterA, FilterB and FilterC) and two pipes

![./pipeline-get.png](./pipeline-get.png)

## Execution order

![./execution-order.png](./execution-order.png)



## Strategies everywhere!


A simplification of common materials modelling use cases is as following: data is read from a data source, filtered and converted into a suitable representation, and used as input in a simulation. Output from the simulation is used in analysis or post-processed for further simulation. One pattern that emerge is that we have a stream of data, and a successive steps of transformations on the data. This allows us to generalize the use-cases into the following:

data source --(pipe)--> filter --(pipe)--> filter --(pipe)--> data target

A pipe is simply a source that consumes data by an input *filter*.
A "filter" can be any transformation of the data or operations that receives the stream of data from an input pipe, and delivers data as a stream to an output pipe. There can be many different types of filters, but in general they will share the same generic interfaces, and can in principle be connected to any pipe.

Key advantages to the pipe & filter patterns is that it allows for loose and flexible coupling of interchangable filters, the filters are re-usable, a set of pipelines can be run parallel. Another important factor is that filters can be "anything" as long as it honours the interface specification, and is treated as "black-boxes" by the system.

A common design pattern for managing a set of (run-time) interchangable modules with different implementations is the *strategy pattern*. A module will run the same *named* function, but the instanse of the object that is called will changed dependent on the context. 

In the score of the Ontology based Translation Environment there are four main categories of operations. 
    * Resource operations administrates data sources and target sources. This includes downloading data using various protocols, parsing specific file formats, interacting with web services or database management systems, generating target data and uploading information
    * Mapping between source specific metadata and general or domain specific semantic representations
    * Defining filtering operations that specifies a subset of information
    * Transformations that operates on data and produce new information
    
    

## Pipeline execution order


### Resource operations

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| downloadUrl | string | Definition: The URL of the downloadable file in a given format. E.g. CSV file or RDF file. Usage: `downloadURL` *SHOULD* be used for the URL at which this distribution is available directly, typically through a HTTPS GET request or SFTP. |
| mediaType | string | The media type of the distribution as defined by IANA [[IANA-MEDIA-TYPES](https://www.w3.org/TR/vocab-dcat-2/#bib-iana-media-types)]. Usage: This property *SHOULD* be used when the media type of the distribution is defined in IANA [[IANA-MEDIA-TYPES](https://www.w3.org/TR/vocab-dcat-2/#bib-iana-media-types)]. |
| accessUrl | string | A URL of the resource that gives access to a distribution of the dataset. E.g. landing page, feed, SPARQL endpoint. Usage: `accessURL` *SHOULD* be used for the URL of a service or location that can provide access to this distribution, typically through a Web form, query or API call. `downloadURL` is preferred for direct links to downloadable resources. |
| accessService | string | A data service that gives access to the distribution of the dataset. |
| license | string | A legal document under which the distribution is made available. |
| accessRights | string | A rights statement that concerns how the distribution is accessed. |
| description | string | A free-text account of the distribution. |
| publisher | string | The entity responsible for making the resource/item available. |
| configuration | | Resource-specific configuration options given as key/value-pairs. |


The configuration can include a reference to the DataCache. The DataCache is an internal storage mechanism that allows for temporarily storing artifacts for an active period to avoid the need to accessing the same resource multiple times over the network.

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| cacheDir | string | Cache directory. |
| accessKey | string | Key with which the downloaded content can be accessed. Should preferable be the hash (corresponding to `hashType`) of the content if it is known. |
| hashType | string | Hash algorithm to use for creating hash keys for stored data. Can be any algorithm supported by hashlib. |
| expireTime | integer | Number of seconds before the cache entry expires. Zero means no expiration. Default is two weeks. |
| tag | string | Tag assigned to the downloaded content, typically identifying a session. Used with the `evict()` method to clean up a all cache entries with a given tag. |

### Mapping operations

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| mappingType | string | Type of registered mapping strategy. E.g., `mapping/demo`. |
| prefixes | object | List of shortnames that expands to an IRI given as local value/IRI-expansion-pairs. |
| triples | array | List of semantic triples given as (subject, predicate, object). |
| configuration | object | Mapping-specific configuration options given as key/value-pairs. |

### filtering operations

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| filterType | string | Type of registered filter strategy. E.g., `filter/sql`. |
| query | string | Define a query operation. |
| condition | string | Logical statement indicating when a filter should be applied. |
| limit | integer | Number of items remaining after a filter expression. |
| configuration | object | Filter-specific configuration options given as key/value-pairs. |

### Transformations

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| transformation_type | string | Type of registered transformation strategy. E.g., `celery/remote`. |
| name | string | Human-readable name of the transformation strategy. |
| description | string | A free-text account of the transformation. |
| due | string | Optional field to indicate a due data/time for when a transformation should finish. |
| priority |  | Define the process priority of the transformation execution. |
| secret | string | Authorization secret given when running a transformation. |
| configuration | object | Transformation-specific configuration options given as key/value-pairs. |


#### Transformation status

| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| id | string | ID for the given transformation process. |
| status | string | Status for the transformation process. |
| messages | array | Messages related to the transformation process. |
| created | string | Time of creation for the transformation process. Given in UTC. |
| startTime | string | Time when the transformation process started. Given in UTC. |
| finishTime | string | Time when the tranformation process finished. Given in UTC. |

In [1]:
from typing import Any, Dict, Optional, Union, List
from pathlib import Path
from pydantic import AnyUrl, BaseModel, Field, root_validator, conlist
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional

from pydantic import BaseModel, Field


SemanticTriple = conlist(str, min_items=3, max_items=3)

class FilterConfig(BaseModel):
    """Filter Strategy Data Configuration."""

    filterType: str = Field(
        ..., description="Type of registered filter strategy. E.g., `filter/sql`."
    )
    query: Optional[str] = Field(None, description="Define a query operation.")
    condition: Optional[str] = Field(
        None,
        description="Logical statement indicating when a filter should be applied.",
    )
    limit: Optional[int] = Field(
        None, description="Number of items remaining after a filter expression."
    )
    configuration: Optional[Dict] = Field(
        None,
        description="Filter-specific configuration options given as key/value-pairs.",
    )

class MappingConfig(BaseModel):
    """Mapping Strategy Data Configuration."""

    mappingType: str = Field(
        ..., description="Type of registered mapping strategy. E.g., `mapping/demo`."
    )
    prefixes: Optional[Dict[str, str]] = Field(
        None,
        description=(
            "List of shortnames that expands to an IRI "
            "given as local value/IRI-expansion-pairs."
        ),
    )
    triples: Optional[List[SemanticTriple]] = Field(  # type: ignore[valid-type]
        None,
        description="List of semantic triples given as (subject, predicate, object).",
    )
    configuration: Optional[Dict] = Field(
        None,
        description="Mapping-specific configuration options given as key/value-pairs.",
    )
    
class DataCacheConfig(BaseModel):
    """DataCache Configuration."""

    cacheDir: Path = Field("oteapi", description="Cache directory.")
    accessKey: str = Field(
        None,
        description="Key with which the downloaded content can be accessed. "
        "Should preferable be the hash (corresponding to `hashType`) of the "
        "content if it is known.",
    )
    hashType: str = Field(
        "md5",
        description="Hash algorithm to use for creating hash keys for stored "
        "data. Can be any algorithm supported by hashlib.",
    )
    expireTime: int = Field(
        3600 * 24 * 14,
        description="Number of seconds before the cache entry expires. "
        "Zero means no expiration. Default is two weeks.",
    )
    tag: str = Field(
        None,
        description="Tag assigned to the downloaded content, typically "
        "identifying a session. Used with the `evict()` method to clean up a "
        "all cache entries with a given tag.",
    )

class ResourceConfig(BaseModel):
    """Resource Strategy Data Configuration.
    Important:
        Either of the pairs of attributes `downloadUrl`/`mediaType` or
        `accessUrl`/`accessService` MUST be specified.
    """

    downloadUrl: Optional[AnyUrl] = Field(
        None,
        description=(
            "Definition: The URL of the downloadable file in a given format. E.g. CSV "
            "file or RDF file.\n\nUsage: `downloadURL` *SHOULD* be used for the URL at"
            " which this distribution is available directly, typically through a HTTPS"
            " GET request or SFTP."
        ),
    )
    mediaType: Optional[str] = Field(
        None,
        description=(
            "The media type of the distribution as defined by IANA "
            "[[IANA-MEDIA-TYPES](https://www.w3.org/TR/vocab-dcat-2/#bib-iana-media-types)]"
            ".\n\nUsage: This property *SHOULD* be used when the media"
            " type of the distribution is defined in IANA "
            "[[IANA-MEDIA-TYPES](https://www.w3.org/TR/vocab-dcat-2/#bib-iana-media-types)]."
        ),
    )
    accessUrl: Optional[AnyUrl] = Field(
        None,
        description=(
            "A URL of the resource that gives access to a distribution of "
            "the dataset. E.g. landing page, feed, SPARQL endpoint.\n\nUsage: "
            "`accessURL` *SHOULD* be used for the URL of a service or location that "
            "can provide access to this distribution, typically through a Web form, "
            "query or API call.\n`downloadURL` is preferred for direct links to "
            "downloadable resources."
        ),
    )
    accessService: Optional[str] = Field(
        None,
        description=(
            "A data service that gives access to the distribution of the dataset."
        ),
    )
    license: Optional[str] = Field(
        None,
        description=(
            "A legal document under which the distribution is made available."
        ),
    )
    accessRights: Optional[str] = Field(
        None,
        description=(
            "A rights statement that concerns how the distribution is accessed."
        ),
    )
    description: Optional[str] = Field(
        None, description="A free-text account of the distribution."
    )
    publisher: Optional[str] = Field(
        None,
        description="The entity responsible for making the resource/item available.",
    )
    configuration: Union[DataCacheConfig, Dict[str, Any]] = Field(
        {},
        description="Resource-specific configuration options given as key/value-pairs.",
    )
    
class PriorityEnum(str, Enum):
    """Defining process priority enumerators.
    Process priorities:
    - Low
    - Medium
    - High
    """

    LOW = "Low"
    MEDIUM = "Medium"
    HIGH = "High"


class TransformationConfig(BaseModel):
    """Transformation Strategy Data Configuration."""

    transformation_type: str = Field(
        ...,
        description=(
            "Type of registered transformation strategy. E.g., `celery/remote`."
        ),
    )
    name: Optional[str] = Field(
        None, description="Human-readable name of the transformation strategy."
    )
    description: Optional[str] = Field(
        None, description="A free-text account of the transformation."
    )
    due: Optional[datetime] = Field(
        None,
        description=(
            "Optional field to indicate a due data/time for when a transformation "
            "should finish."
        ),
    )
    priority: Optional[PriorityEnum] = Field(
        PriorityEnum.MEDIUM,
        description="Define the process priority of the transformation execution.",
    )
    secret: Optional[str] = Field(
        None,
        description="Authorization secret given when running a transformation.",
    )
    configuration: Optional[Dict] = Field(
        None,
        description=(
            "Transformation-specific configuration options given as key/value-pairs."
        ),
    )


class TransformationStatus(BaseModel):
    """Return from transformation status."""

    id: str = Field(..., description="ID for the given transformation process.")
    status: Optional[str] = Field(
        None, description="Status for the transformation process."
    )
    messages: Optional[List[str]] = Field(
        None, description="Messages related to the transformation process."
    )
    created: Optional[datetime] = Field(
        None,
        description="Time of creation for the transformation process. Given in UTC.",
    )
    startTime: Optional[datetime] = Field(
        None, description="Time when the transformation process started. Given in UTC."
    )
    finishTime: Optional[datetime] = Field(
        None, description="Time when the tranformation process finished. Given in UTC."
    )

In [2]:
properties = TransformationStatus.schema()['properties']
head=(
    "| Property    | Type   | Description                                                                  |\n"
    "|-------------|--------|------------------------------------------------------------------------------|"
    )
print (head)
for prop in properties:
    typ = '' if not 'type' in properties[prop] else properties[prop]['type']
    print ('|', prop, '|', typ, '|', properties[prop]['description'].strip(), '|')


| Property    | Type   | Description                                                                  |
|-------------|--------|------------------------------------------------------------------------------|
| id | string | ID for the given transformation process. |
| status | string | Status for the transformation process. |
| messages | array | Messages related to the transformation process. |
| created | string | Time of creation for the transformation process. Given in UTC. |
| startTime | string | Time when the transformation process started. Given in UTC. |
| finishTime | string | Time when the tranformation process finished. Given in UTC. |
