In [None]:
import os
import io
import re
import sys
import json
import doctest
import datetime
import importlib
import unicodedata
import pandas as pd

from types import ModuleType
from typing import Optional, List, Dict, Union
from enum import Enum
from uuid import UUID, uuid4
from contextlib import redirect_stdout

from pydantic import BaseModel, Field, ConstrainedStr

In [None]:
class TREncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, UUID):
            return str(obj)
        if isinstance(obj, datetime.datetime):
            return pd.Timestamp(obj).isoformat()
        return json.JSONEncoder.default(self, obj)

In [None]:
class DataType(str, Enum):
    """hetida designer data types

    These are the types available for component/workflow inputs/outputs.
    """

    Integer = "INT"
    Float = "FLOAT"
    String = "STRING"
    DataFrame = "DATAFRAME"
    Series = "SERIES"
    Boolean = "BOOLEAN"
    Any = "ANY"
    PlotlyJson = "PLOTLYJSON"

In [None]:
# allow only some special characters for category, description, name and version tag
ALLOWED_CHARS_RAW_STRING = (
    r"\w ,\.\-\(\)=/"  # pylint: disable=anomalous-backslash-in-string
)
# The special sequence \w matches unicode word characters;
# this includes most characters that can be part of a word in any language, as well as numbers
# and the underscore. If the ASCII flag is used, only [a-zA-Z0-9_] is matched.

class NonEmptyValidStr(ConstrainedStr):
    min_length = 1
    max_length = 60
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]+$")


class ShortNonEmptyValidStr(ConstrainedStr):
    min_length = 1
    max_length = 20
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]+$")


class ValidStr(ConstrainedStr):
    regex = re.compile(rf"^[{ALLOWED_CHARS_RAW_STRING}]*$")


In [None]:
class State(str, Enum):
    """Representing state of component/workflow"""

    DRAFT = "DRAFT"
    RELEASED = "RELEASED"
    DISABLED = "DISABLED"


class Type(str, Enum):
    COMPONENT = "COMPONENT"
    WORKFLOW = "WORKFLOW"
    

class IO(BaseModel):
    id: UUID = Field(default_factory=uuid4)
    name: Optional[str] = Field(
        None,
        description="Must be a valid python identifier because it will be used for computation",
    )
    data_type: DataType


class IOInterface(BaseModel):
    """Represents combination of inputs and outputs.

    Note: The names in the list of inputs and outputs must be unique, respectively.
    """

    inputs: List[IO] = []
    outputs: List[IO] = []
        

class RefIdType(str, Enum):
    """Reference Id type as required for some adapters (notably generic rest adapter)"""

    SOURCE = "SOURCE"
    SINK = "SINK"
    THINGNODE = "THINGNODE"

    
class ExternalType(str, Enum):
    METADATA_INT = "metadata(int)"
    METADATA_FLOAT = "metadata(float)"
    METADATA_STR = "metadata(str)"
    METADATA_BOOLEAN = "metadata(bool)"
    METADATA_ANY = "metadata(any)"

    TIMESERIES_INT = "timeseries(int)"
    TIMESERIES_FLOAT = "timeseries(float)"
    TIMESERIES_STR = "timeseries(str)"
    TIMESERIES_BOOLEAN = "timeseries(bool)"
    TIMESERIES_ANY = "timeseries(any)"

    SERIES_INT = "series(int)"
    SERIES_FLOAT = "series(float)"
    SERIES_STR = "series(str)"
    SERIES_BOOLEAN = "series(bool)"
    SERIES_ANY = "series(any)"

    DATAFRAME = "dataframe"
    

class InputWiring(BaseModel):
    workflow_input_name: str = Field(..., alias="workflow_input_name")
    adapter_id: Union[int, str] = Field(..., alias="adapter_id")

    ref_id: Optional[str] = Field(
        None,
        description=(
            "Id referencing the source in external systems."
            " Not necessary for direct provisioning."
        ),
    )
    ref_id_type: Optional[RefIdType] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then describes to what kind of object in the tree the metadatum is attached. "
        "Must then be one of "
        ", ".join(['"' + x.value + '"' for x in list(RefIdType)]),
    )
    ref_key: Optional[str] = None
    type: Optional[ExternalType] = Field(
        None,
        description="Type of data. If present then must be one of "
        + ", ".join(['"' + x.value + '"' for x in list(ExternalType)]),
    )
    filters: dict = {}
        
        
class OutputWiring(BaseModel):
    workflow_output_name: str = Field(..., alias="workflow_output_name")
    adapter_id: Union[int, str] = Field(..., alias="adapter_id")
    ref_id: Optional[str] = Field(
        None,
        description=(
            "Id referencing the sink in external systems."
            " Not necessary for direct provisioning."
        ),
    )
    ref_id_type: Optional[RefIdType] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then describes to what kind of object in the tree the metadatum is attached. "
        "Must then be one of "
        ", ".join(['"' + x.value + '"' for x in list(RefIdType)]),
    )
    ref_key: Optional[str] = Field(
        None,
        description="Required if type is specified and is a metadata type. "
        "Then is the key of the metdatum.",
    )
    type: Optional[ExternalType] = Field(
        None,
        description="Type of data. If present then must be one of "
        + ", ".join(['"' + x.value + '"' for x in list(ExternalType)]),
    )
        
        
class WorkflowWiring(BaseModel):
    input_wirings: List[InputWiring] = []
    output_wirings: List[OutputWiring] = []
        

class TransformationRevision(BaseModel):
    """Either a component revision or a workflow revision

    Both can be instantiated as an operator in a workflow revision
    (yes, workflow in workflow in workflow... is possible) and are therefore
    able to transform input data to output result data.

    Note that there is no actual component or workflow entity, only revisions. Revisions are tied
    together via the group id, and otherwise do not need to have anything in common, i.e. their
    name and their interface etc. can differ completely.

    Revisions with state RELEASED are what makes execution reproducible - they cannot be edited any
    more and only they can be instantiated as operators.

    Additionally RELEASED revisions cannot be deleted, but their state can be changed to
    DISABLED. DISABLED revisions cannot be instantiated as new operators anymore but existing
    operators from them still work (for reproducibility). Note that in the Frontend the DISABLED
    state is called "DEPRECATED". The frontend then allows to replace deprecated operators by other
    (possibly newer) released revisions from the the same revision group (i.e. same group id).
    """

    id: UUID
    revision_group_id: UUID
    name: str
    description: str = ""
    category: str = Field(
        "Other",
        description='Category in which this is classified, i.e. the "drawer" in the User Interface',
    )
    version_tag: str
    released_timestamp: Optional[datetime.datetime] = Field(
        None,
        description="If the revision is RELEASED then this should be release timestamp",
    )

    disabled_timestamp: Optional[datetime.datetime] = Field(
        None,
        description="If the revision is DISABLED then this should be disable/deprecation timestamp",
    )
    state: State = Field(
        ...,
        description="one of " + ", ".join(['"' + x.value + '"' for x in list(State)]),
    )
    type: Type = Field(
        ...,
        description="one of " + ", ".join(['"' + x.value + '"' for x in list(Type)]),
    )

    documentation: str = Field(
        (
            "\n"
            "# New Component/Workflow\n"
            "## Description\n"
            "## Inputs\n"
            "## Outputs\n"
            "## Details\n"
            "## Examples\n"
        ),
        description="Documentation in markdown format.",
    )
    content: Union[str,dict]

    io_interface: IOInterface = Field(
        ...,
        description=(
            "In case of type WORKFLOW determined from content. "
            "To change from state DRAFT to state RELEASED all inputs and outputs must have names."
        ),
    )
        
    test_wiring: WorkflowWiring = Field(
        ...,
        description=(
            "The input and output wirings must match "
            "the inputs and outputs of the io_interface"
        ),
    )
    

In [None]:
class ComponentInfo(BaseModel):
    """Provide meta-information about component.

    Used as input for code generation to include meta-information about the component in the code.

    This additional information makes it possible to recover the underlying transformation revision
    object from the code.
    """

    input_types_by_name: Dict[str, DataType]
    output_types_by_name: Dict[str, DataType]
    id: UUID = Field(default_factory=uuid4)
    revision_group_id: UUID = Field(default_factory=uuid4)
    name: NonEmptyValidStr
    category: NonEmptyValidStr
    description: ValidStr
    version_tag: ShortNonEmptyValidStr
    is_coroutine: bool = False
        
    @classmethod
    def from_tr(cls, tr: TransformationRevision) -> "ComponentInfo":
        return ComponentInfo(
            input_types_by_name={io.name: io.data_type for io in tr.io_interface.inputs},
            output_types_by_name={io.name: io.data_type for io in tr.io_interface.outputs},
            id=tr.id,
            revision_group_id=tr.revision_group_id,
            name=tr.name,
            category=tr.category,
            description=tr.description,
            version_tag=tr.version_tag,
        )


In [None]:
imports_template: str = """\
# add your imports here, e.g.
# import pandas as pd
# import numpy as np

"""

function_definition_template: str = """\
# ***** DO NOT EDIT LINES BELOW *****
# These lines may be overwritten if component details or inputs/outputs change.
component_exterior[{id}] = (
    # inputs:
    {input_dict_content},
    # outputs:
    {output_dict_content}, 
    # name:
    {name},
    # description:
    {description},
    # category:
    {category},
    # id:
    {id},
    # revision_group_id:
    {revision_group_id},
    # version_tag:
    {version_tag}
)
{main_func_declaration_start} main({params_list}):
    # entrypoint function for this component
    # ***** DO NOT EDIT LINES ABOVE *****\
"""

function_body_template: str = """\
    # write your function code here.
    pass\
"""


def generate_function_header(component_info: ComponentInfo) -> str:
    """Generate entrypoint function header from the inputs and their types"""
    param_list_str = (
        ""
        if len(component_info.input_types_by_name.keys()) == 0
        else "*, " + ", ".join(component_info.input_types_by_name.keys())
    )

    main_func_declaration_start = "async def" if component_info.is_coroutine else "def"

    return function_definition_template.format(
        input_dict_content="{" + ", ".join(
            [
                '"' + parameter + '": "' + data_type_enum.value + '"'
                for parameter, data_type_enum in component_info.input_types_by_name.items()
            ]
        ) + "}",
        output_dict_content="{" + ", ".join(
            [
                '"' + parameter + '": "' + data_type_enum.name + '"'
                for parameter, data_type_enum in component_info.output_types_by_name.items()
            ]
        ) + "}",
        name='"' + component_info.name + '"',
        description='"' + component_info.description + '"',
        category='"' + component_info.category + '"',
        id='"' + str(component_info.id) + '"',
        revision_group_id='"' + str(component_info.revision_group_id) + '"',
        version_tag='"' + component_info.version_tag + '"',
        params_list=param_list_str,
        main_func_declaration_start=main_func_declaration_start,
    )


def generate_complete_component_module(component_info: ComponentInfo) -> str:
    return (
        imports_template
        + "\n"
        + generate_function_header(component_info)
        + "\n"
        + function_body_template
    )


def update_code(
    existing_code: Optional[str],
    component_info: ComponentInfo,
) -> str:
    """Generate and update component code

    Tries to replace the existing_code with a new version with the correct function definition
    from input_type_dict and output_type_dict.
    If no existing_code is provided it completely generates a component module code stub
    including necessary imports.

    The updating process is rather naive: It does not rely on parsing the abstract syntax tree.
    It only uses basic String methods and does not try to handle every case. It therefore may
    undesirably replace user code in some cases.
    """
    if existing_code is None or existing_code == "":
        return generate_complete_component_module(component_info)

    new_function_header = generate_function_header(component_info)

    try:
        start, remaining = existing_code.split(
            "# ***** DO NOT EDIT LINES BELOW *****", 1
        )
    except ValueError:
        # Cannot find func def, therefore append it (assuming necessary imports are present):
        # This may secretely add a second main entrypoint function!
        return (
            existing_code + "\n\n" + new_function_header + "\n" + function_body_template
        )

    if "    # ***** DO NOT EDIT LINES ABOVE *****" not in remaining:
        # Cannot find end of function definition.
        # Therefore replace all code starting from the detected beginning of the function
        # definition. This deletes all user code below!
        return start + new_function_header + "\n" + function_body_template

    # we now are quite sure that we find a complete existing function definition

    # pylint: disable=unused-variable
    old_func_def, end = remaining.split("    # ***** DO NOT EDIT LINES ABOVE *****", 1)

    old_func_def_lines = old_func_def.split("\n")
    use_async_def = (len(old_func_def_lines) >= 3) and old_func_def_lines[
        -3
    ].startswith("async def")
    component_info.is_coroutine = use_async_def

    new_function_header = generate_function_header(component_info)

    return start + new_function_header + end

In [None]:
def write_code_files(source_path: str, temp_dir: str) -> None:
    os.makedirs(temp_dir, exist_ok=True)
    for root, _, files in os.walk(source_path):
        for file in files:
            current_path = os.path.join(root, file)
            if current_path.endswith("json"):
                print(current_path)
                with open(current_path, "r") as f:
                    file_content = f.read()
                    transformation_revision = json.loads(file_content)
                if transformation_revision["type"] == "COMPONENT":
                    code_file = os.path.join(temp_dir, file.split(".")[0] + ".py")
                    with open(code_file, "w", encoding="utf8") as f:
                        f.write(transformation_revision["content"])

In [None]:
write_code_files(
    "./transformations/components/anomaly-detection/",
    "./transformations/components/anomaly-detection/"
)

In [None]:
file_without_extension =  "simple-volatility-score_100_0c3c74d0-89b6-1948-fedd-753eaa47ca0e"
category_directory = "anomaly-detection"
py_file_path = os.path.join("./transformations/components/", category_directory, file_without_extension+".py")
with open(py_file_path, "r") as f:
    code_from_py_file = f.read()
# print(code_from_py_file)
json_file_path = os.path.join("./transformations/components/", category_directory, file_without_extension+".json")
with open(json_file_path, "r") as f:
    file_content = f.read()
tr_json = json.loads(file_content)
tr = TransformationRevision(**tr_json)
tr.content = update_code(code_from_py_file, ComponentInfo.from_tr(tr))
print(tr.content)
tr_json = json.dumps(tr.dict(exclude_unset=True), cls=TREncoder, indent=2, sort_keys=True)
# with open(json_file_path, "w", encoding="utf8") as f:
#     json.dump(json.loads(tr_json), f, cls=TREncoder, indent=2, sort_keys=True)