In [None]:
import os
import re
import json
import datetime
import pandas as pd

from typing import Optional, List, Dict, Union
from enum import Enum
from uuid import UUID, uuid4
from keyword import iskeyword

from pydantic import BaseModel, Field, ConstrainedStr

In [None]:
class TREncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, UUID):
            return str(obj)
        if isinstance(obj, datetime.datetime):
            return pd.Timestamp(obj).isoformat()
        return json.JSONEncoder.default(self, obj)

In [None]:
class DataType(str, Enum):
    """hetida designer data types

    These are the types available for component/workflow inputs/outputs.
    """

    Integer = "INT"
    Float = "FLOAT"
    String = "STRING"
    DataFrame = "DATAFRAME"
    Series = "SERIES"
    Boolean = "BOOLEAN"
    Any = "ANY"
    PlotlyJson = "PLOTLYJSON"

In [None]:
# allow only some special characters for category, description, name and version tag
ALLOWED_CHARS_RAW_STRING = (
    r"\w ,\.\-\(\)=/"  # pylint: disable=anomalous-backslash-in-string
)
# The special sequence \w matches unicode word characters;
# this includes most characters that can be part of a word in any language, as well as numbers
# and the underscore. If the ASCII flag is used, only [a-zA-Z0-9_] is matched.

class NonEmptyValidStr(ConstrainedStr):
    min_length = 1
    max_length = 60
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]+$")


class ShortNonEmptyValidStr(ConstrainedStr):
    min_length = 1
    max_length = 20
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]+$")


class ValidStr(ConstrainedStr):
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]*$")

class ComponentInfo(BaseModel):
    input_types_by_name: Dict[str, DataType]
    output_types_by_name: Dict[str, DataType]
    id: UUID = Field(default_factory=uuid4)
    revision_group_id: UUID = Field(default_factory=uuid4)
    name: NonEmptyValidStr
    category: NonEmptyValidStr
    description: ValidStr
    version_tag: ShortNonEmptyValidStr

In [None]:
class State(str, Enum):
    """Representing state of component/workflow"""

    DRAFT = "DRAFT"
    RELEASED = "RELEASED"
    DISABLED = "DISABLED"


class Type(str, Enum):
    COMPONENT = "COMPONENT"
    WORKFLOW = "WORKFLOW"
    

class IO(BaseModel):
    id: UUID = Field(default_factory=uuid4)
    name: Optional[str] = Field(
        None,
        description="Must be a valid python identifier because it will be used for computation",
    )
    data_type: DataType


class IOInterface(BaseModel):
    """Represents combination of inputs and outputs.

    Note: The names in the list of inputs and outputs must be unique, respectively.
    """

    inputs: List[IO] = []
    outputs: List[IO] = []
        

class RefIdType(str, Enum):
    """Reference Id type as required for some adapters (notably generic rest adapter)"""

    SOURCE = "SOURCE"
    SINK = "SINK"
    THINGNODE = "THINGNODE"

    
class ExternalType(str, Enum):
    METADATA_INT = "metadata(int)"
    METADATA_FLOAT = "metadata(float)"
    METADATA_STR = "metadata(str)"
    METADATA_BOOLEAN = "metadata(bool)"
    METADATA_ANY = "metadata(any)"

    TIMESERIES_INT = "timeseries(int)"
    TIMESERIES_FLOAT = "timeseries(float)"
    TIMESERIES_STR = "timeseries(str)"
    TIMESERIES_BOOLEAN = "timeseries(bool)"
    TIMESERIES_ANY = "timeseries(any)"

    SERIES_INT = "series(int)"
    SERIES_FLOAT = "series(float)"
    SERIES_STR = "series(str)"
    SERIES_BOOLEAN = "series(bool)"
    SERIES_ANY = "series(any)"

    DATAFRAME = "dataframe"
    

class InputWiring(BaseModel):
    workflow_input_name: str = Field(..., alias="workflow_input_name")
    adapter_id: Union[int, str] = Field(..., alias="adapter_id")

    ref_id: Optional[str] = Field(
        None,
        description=(
            "Id referencing the source in external systems."
            " Not necessary for direct provisioning."
        ),
    )
    ref_id_type: Optional[RefIdType] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then describes to what kind of object in the tree the metadatum is attached. "
        "Must then be one of "
        ", ".join(['"' + x.value + '"' for x in list(RefIdType)]),
    )
    ref_key: Optional[str] = None
    type: Optional[ExternalType] = Field(
        None,
        description="Type of data. If present then must be one of "
        + ", ".join(['"' + x.value + '"' for x in list(ExternalType)]),
    )
    filters: dict = {}
        
        
class OutputWiring(BaseModel):
    workflow_output_name: str = Field(..., alias="workflow_output_name")
    adapter_id: Union[int, str] = Field(..., alias="adapter_id")
    ref_id: Optional[str] = Field(
        None,
        description=(
            "Id referencing the sink in external systems."
            " Not necessary for direct provisioning."
        ),
    )
    ref_id_type: Optional[RefIdType] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then describes to what kind of object in the tree the metadatum is attached. "
        "Must then be one of "
        ", ".join(['"' + x.value + '"' for x in list(RefIdType)]),
    )
    ref_key: Optional[str] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then is the key of the metdatum.",
    )
    type: Optional[ExternalType] = Field(
        None,
        description="Type of data. If present then must be one of "
        + ", ".join(['"' + x.value + '"' for x in list(ExternalType)]),
    )
        
        
class WorkflowWiring(BaseModel):
    input_wirings: List[InputWiring] = []
    output_wirings: List[OutputWiring] = []
        

class TransformationRevision(BaseModel):
    """Either a component revision or a workflow revision

    Both can be instantiated as an operator in a workflow revision
    (yes, workflow in workflow in workflow... is possible) and are therefore
    able to transform input data to output result data.

    Note that there is no actual component or workflow entity, only revisions. Revisions are tied
    together via the group id, and otherwise do not need to have anything in common, i.e. their
    name and their interface etc. can differ completely.

    Revisions with state RELEASED are what makes execution reproducible - they cannot be edited any
    more and only they can be instantiated as operators.

    Additionally RELEASED revisions cannot be deleted, but their state can be changed to
    DISABLED. DISABLED revisions cannot be instantiated as new operators anymore but existing
    operators from them still work (for reproducibility). Note that in the Frontend the DISABLED
    state is called "DEPRECATED". The frontend then allows to replace deprecated operators by other
    (possibly newer) released revisions from the the same revision group (i.e. same group id).
    """

    id: UUID
    revision_group_id: UUID
    name: str
    description: str = ""
    category: str = Field(
        "Other",
        description='Category in which this is classified, i.e. the "drawer" in the User Interface',
    )
    version_tag: str
    released_timestamp: Optional[datetime.datetime] = Field(
        None,
        description="If the revision is RELEASED then this should be release timestamp",
    )

    disabled_timestamp: Optional[datetime.datetime] = Field(
        None,
        description="If the revision is DISABLED then this should be disable/deprecation timestamp",
    )
    state: State = Field(
        ...,
        description="one of " + ", ".join(['"' + x.value + '"' for x in list(State)]),
    )
    type: Type = Field(
        ...,
        description="one of " + ", ".join(['"' + x.value + '"' for x in list(Type)]),
    )

    documentation: str = Field(
        (
            "\n"
            "# New Component/Workflow\n"
            "## Description\n"
            "## Inputs\n"
            "## Outputs\n"
            "## Details\n"
            "## Examples\n"
        ),
        description="Documentation in markdown format.",
    )
    content: str

    io_interface: IOInterface = Field(
        ...,
        description=(
            "In case of type WORKFLOW determined from content. "
            "To change from state DRAFT to state RELEASED all inputs and outputs must have names."
        ),
    )
        
    test_wiring: WorkflowWiring = Field(
        ...,
        description=(
            "The input and output wirings must match "
            "the inputs and outputs of the io_interface"
        ),
    )

In [None]:
def load_json(path):
    try:
        with open(path, encoding="utf-8") as f:
            workflow_json = json.load(f)
    except FileNotFoundError:
        logger.error("Could not find json file at path %s", path)
        workflow_json = None
    return workflow_json

In [None]:
json_files = []
for root, _, files in os.walk("./transformations/components"):
    for file in files:
        json_files.append(os.path.join(root, file))
        print(json_files[-1])

In [None]:
def io_match(component_ios, tr_ios):
    if len(component_ios) != len(tr_ios):
        return False
    tr_ios_by_name = {io.name: io for io in tr_ios}
    
    for cp_io in component_ios:
        if cp_io["name"] in tr_ios_by_name:
            if cp_io["type"] != tr_ios_by_name[cp_io["name"]].data_type:
                return False
        else:
            print
            return False
    
    return True

In [None]:
json_file_by_name = {}
tr_by_name = {}

for file in json_files:
    tr_json = load_json(file)
    tr = TransformationRevision(**tr_json)
    if tr.name in json_file_by_name:
        raise Exception("Name", tr.name, "is not unique!")
    json_file_by_name[tr.name] = file
    tr_by_name[tr.name] = tr

for component_name, json_file in json_file_by_name.items():
    component_json = load_json(json_file)
    tr = TransformationRevision(**component_json)
    old_code = tr_component.content
    if ">>>" in old_code:
        split_at_low_string = "    # ***** DO NOT EDIT LINES ABOVE *****\n"
        split_at_top_string = '    """ Usage example:\n'
        docstring = '    """ Usage example:\n'+old_code.split(
            split_at_top_string
        )[1].split(split_at_low_string)[0]
        split_code_top, split_code_low = tr.content.split(split_at_top_string)
        _, split_code_low = split_code_low.split(split_at_low_string)
        tr_content = split_code_top + split_at_low_string + docstring + split_code_low
        print(tr_content)
        tr.content = tr_content
        tr_json = json.dumps(tr.dict(exclude_unset=True), cls=TREncoder, indent=2, sort_keys=True)
        with open(json_file_by_name[component_name], "w", encoding="utf8") as f:
            json.dump(json.loads(tr_json), f, cls=TREncoder, indent=2, sort_keys=True)
    else:
        print(tr_component.name+": seems to have no doctest docstring")