In [None]:
from eComply.eComply import eComply
from ParksGIS.ParksGIS import (
    GISFactory,
    LayerServerGen,
    LayerQuery,
    LayerEdits,
    LayerDomainNames,
    Server,
)
# from Transformer import Transformer

Filters

In [None]:
from datetime import datetime
from inspect import currentframe
from json import dumps
from pandas import DataFrame, Series, merge
from typing import Any, Callable, Literal


def exception_handler(ex: Exception) -> None:
    # Get the current frame, then the previous frame (the caller)
    frame = currentframe().f_back
    # Get the name of the function from the frame
    raise Exception(
        {
            "function": frame.f_code.co_name,
            "exception": str(ex),
        }
    )


def to_json(obj: Any) -> str:
    if isinstance(obj, DataFrame):
        return obj.to_json(orient="records", date_format="iso")
    else:
        return dumps(obj)


def apply_edits(context: dict) -> dict | None:
    if context["edits"] is not None and 0 < len(context["edits"]):
        try:
            result = context["repo"].apply_edits(context["edits"])
            print(f"Edits Applied: {len(result)}")
        except Exception as e:
            exception_handler(e)

    return context


def extract_changes(
    server: Server,
    layer_id: int,
    server_gen: int,
    out_fields: list[str],
    where: str = "",
) -> dict[str, Any]:
    try:
        changes = server.extract_changes(
            [
                LayerServerGen(
                    layer_id,
                    server_gen,
                )
            ]
        )

        result = {
            "changes": None,
            "server_gen": changes["layerServerGens"][0]["serverGen"] // 1000,
        }

        object_ids: list[Any] = [
            *changes["edits"][0]["objectIds"]["adds"],
            *changes["edits"][0]["objectIds"]["updates"],
        ]
        if 0 == len(object_ids):
            return result

        result["changes"] = server.query(
            [
                LayerQuery(
                    layer_id,
                    out_fields,
                    (
                        f"OBJECTID IN ({join(object_ids)})" + ""
                        if where == ""
                        else f" AND {where}"
                    ),
                )
            ]
        )
        return result

    except Exception as e:
        raise Exception(f"extract_changes: {e}")


def query_server_gens(context: dict) -> dict | None:
    import time

    seconds = 60 * 60 * 24
    epoch = time.time() - seconds
    context["server_gens"] = DataFrame(
        {
            "Contract": [epoch],
            "WorkOrder": [epoch],
        }
    )

    # try:
    #     layer_id = 3
    #     context["server_gens"] = context["repo"].query(
    #         [LayerQuery(layer_id, ["*"])],
    #     )[layer_id]
    # except Exception as e:
    #     exception_handler(e)

    return context


def get_edits(context: dict) -> DataFrame:
    return context["edits"][context["layerId"]]


def set_edits(context: dict, data: DataFrame) -> None:
    context["edits"][context["layerId"]] = data


def join(
    list: Series | list,
    guid: bool = False,
) -> str:
    if guid:
        return "'" + "','".join(str(i) for i in list) + "'"
    else:
        return ",".join(str(i) for i in list)


def pipeline(
    context: dict[str, Any],
    *funcs: Callable[
        [dict[str, Any]],
        dict[str, Any] | None,
    ],
) -> None | dict[str, Any]:
    if not funcs:
        raise ValueError("At least one function must be provided.")

    result = context

    for i, func in enumerate(funcs):
        if result is None:
            print(f"**Pipeline ended at step {i}**")
            break

        if not isinstance(func, Callable):
            raise TypeError(f"Expected a callable for step {i}")

        try:
            result = func(result)

        except Exception as e:
            print(f"Pipeline Exception: {e}")
            break

    return result


#######################################################################################


# Sending
def apply_server_gens_edit(context: dict) -> dict | None:
    layer_id = 3
    context["edits"] = [LayerEdits(layer_id, updates=context["server_gens"])]
    print(context["server_gens"])
    return apply_edits(context)


def contract_extract_changes(context: dict) -> dict | None:
    layer_id = 1
    context["layerId"] = layer_id

    try:
        result = extract_changes(
            context["repo"],
            layer_id,
            context["server_gens"].at[0, "Contract"],
            ["*"],
            "EcomplyContract = 1",
        )
    except Exception as e:
        exception_handler(e)

    if result["changes"] is None:
        print("No Contract changes.")
        return None

    print(f"Contracts Extracted: {len(result['changes'])}")
    # print(to_json(changes[:1]))

    context["edits"] = result["changes"]
    context["server_gens"].at[0, "Contract"] = result["server_gen"]
    print(context["server_gens"])
    return context


def contract_send_edits(context: dict) -> dict:
    edits = get_edits(context)
    # print(to_json(edits))

    try:
        response = context["service"].post_contracts(edits)
    except Exception as e:
        exception_handler(e)

    print(f"Contracts result: {response}")
    return context


def work_order_extract_changes(context: dict) -> dict | None:
    layer_id = 0
    context["layerId"] = layer_id

    try:
        result = extract_changes(
            context["repo"],
            layer_id,
            context["server_gens"].at[0, "WorkOrder"],
            [
                "InspectionGlobalID",
                "Type",
                "Status",
                "LocationDetails",
                "ActualFinishDate",
                "Comments",
                "Contract",
                "CancelReason",
                "GlobalID",
                "ClosedDate",
                "ClosedByERN",
                "ClosedByName",
                "CancelDate",
                "CancelByERN",
                "CancelByName",
                "CreatedDate",
                "CreatedBYERN",
                "CreatedByName",
                "UpdatedDate",
                "UpdatedByERN",
                "UpdatedByName",
                "WOEntity",
                "PROJSTARTDATE",
                "Project",
                "RecommendedSpecies",
                "ClosedBySystem",
                "OBJECTID",
            ],
        )
    except Exception as e:
        exception_handler(e)

    if result["changes"] is None:
        print("No Work Order changes.")
        return None

    print(f"Work Orders Extracted: {len(result['changes'])}")
    # print(to_json(changes[:1]))

    context["edits"] = result["changes"]
    context["server_gens"].at[0, "WorkOrder"] = result["server_gen"]
    return context


def wo_get_associated_planting_space_globalid(context: dict) -> dict | None:
    layerId = 4
    key = "InspectionGlobalID"

    edits = get_edits(context)
    try:
        inspections = (
            context["repo"]
            .query(
                [
                    LayerQuery(
                        layerId,
                        ["PlantingSpaceGlobalID", "GlobalID"],
                        f"GlobalID IN ({join(edits[~edits[key].isna()][key], True)})",
                    )
                ]
            )[layerId]
            .rename(columns={"GlobalID": key})
        )
    except Exception as e:
        exception_handler(e)

    edits = merge(edits, inspections, on=key, how="left")
    set_edits(context, edits)

    print(f"Planting Space Ids found: {len(inspections)}")
    # print(to_json(edits[:1]))

    return context


def wo_get_associated_planting_space(context: dict) -> dict | None:
    layerId = 2
    key = "PlantingSpaceGlobalID"
    edits = get_edits(context)

    try:
        plantingSpaces = (
            context["repo"]
            .query(
                [
                    LayerQuery(
                        layerId,
                        [
                            "ParkName",
                            "ParkZone",
                            "Borough",
                            "CommunityBoard",
                            "BuildingNumber",
                            "StreetName",
                            "CityCouncil",
                            "StateAssembly",
                            "GISPROPNUM",
                            "CrossStreet1",
                            "CrossStreet2",
                            "PlantingSpaceOnStreet",
                            "ObjectID",
                            "GlobalID",
                        ],
                        f"GlobalID IN ({join(edits[~edits[key].isna()][key], True)})",
                    )
                ]
            )[layerId]
            .rename(
                columns={
                    "GlobalID": "PlantingSpaceGlobalID",
                    "OBJECTID": "PlantingSpaceId",
                    "PlantingSpaceOnStreet": "OnStreetSite",
                }
            )
        )
    except Exception as e:
        exception_handler(e)

    edits = merge(edits, plantingSpaces, on=key, how="left")
    set_edits(context, edits)

    print(f"Planting Spaces hydrated: {len(plantingSpaces)}")
    # print(to_json(edits[:1]))

    return context


def work_order_send_edits(context: dict) -> dict | None:
    edits = get_edits(context)
    # print(to_json(edits))

    try:
        response = context["service"].post_work_orders(edits)
    except Exception as e:
        exception_handler(e)

    print(f"Work Orders result: {response}")
    return context


# Receiving
def work_order_get_changes(context: dict) -> dict | None:
    layerId = 0
    fromDateTime = datetime.fromtimestamp(context["server_gens"].at[0, "WorkOrder"])

    try:
        changes = context["service"].get_work_orders(fromDateTime)
    except Exception as e:
        exception_handler(e)

    if 0 == len(changes):
        return None

    context["edits"] = [LayerEdits(layerId, updates=changes)]
    print(f"WorkOrders Recieved: {len(changes)}")
    return context


def wo_update_associated_inspection(context: dict) -> dict | None:
    layerId = 4
    key = "WorkOrderGlobalID"
    edits = get_edits(context)

    try:
        inspections = context["repo"].query(
            [
                LayerQuery(
                    layerId,
                    [
                        "InspectionGlobalID",
                        "HasActiveWorkOrder",
                    ],
                    f"{key} IN ({join(edits[key], True)})",
                )
            ]
        )[layerId]
    except Exception as e:
        exception_handler(e)

    edits = merge(inspections, edits, on=key, how="left")
    edits.loc[
        edits["Status"] == Literal["Closed", "Canceled"], "HasActiveWorkOrder"
    ] = 0

    context["edits"].append(
        LayerEdits(layerId, updates=edits["InspectionGlobalID", "HasActiveWorkOrder"])
    )

    print(f"Inspections To Update: {len(inspections)}")

    return context


def wo_update_associated_platingSpace(context: dict) -> dict:
    layerId = 2
    key = "PlantingSpaceGlobalId"
    edits = get_edits(context)

    try:
        plantingSpaces = context["repo"].query(
            [
                LayerQuery(
                    layerId,
                    [
                        "GlobalID",
                        "BuildingNumber",
                        "StreetName",
                        "CrossStreet1",
                        "CrossStreet2",
                    ],
                    f"GlobalID IN ({join(edits[key], True)})",
                )
            ]
        )[layerId]
    except Exception as e:
        exception_handler(e)

    # Transformer.update(
    #     plantingSpaces,
    #     edits,
    #     key,
    #     {

    #         "BuildingNumber": {"Source": "BuildingNumber"},
    #         "StreetName": {"Source": "StreetName"},
    #         "CrossStreet1": {"Source": "CrossStreet1"},
    #         "CrossStreet2": {"Source": "CrossStreet2"},
    #     },
    # )

    context["edits"].append(LayerEdits(layerId, updates=plantingSpaces))

    print(f"Planting Spaces To Update: {len(plantingSpaces)}")

    return context


def work_order_line_item_get_changes(context: dict) -> dict | None:
    layerId = 2
    fromDateTime = datetime.fromtimestamp(context["server_gens"].at[0, "WorkOrder"])

    try:
        changes = context["service"].get_work_order_line_items(fromDateTime)
    except Exception as e:
        exception_handler(e)

    if 0 == len(changes):
        return None

    context["edits"] = [LayerEdits(layerId, updates=changes)]
    print(f"Line Items Recieved: {len(changes)}")
    return context


def contract_get_changes(context: dict) -> dict | None:
    layerId = 1
    fromDateTime = datetime.fromtimestamp(
        context["server_gens"].at[0, "Contract"],
    )

    try:
        changes = context["service"].get_contracts(fromDateTime)
    except Exception as e:
        exception_handler(e)

    if 0 == len(changes):
        return None

    context["edits"] = [LayerEdits(layerId, updates=changes)]
    print(f"Contracts Recieved: {len(changes)}")
    return context


def query_domains(context: dict) -> dict | None:
    try:
        context["domainValues"] = context["repo"].query_domains(
            [
                LayerDomainNames(
                    context["layerId"],
                    context["domainNames"],
                )
            ]
        )
        return context
    except Exception as e:
        exception_handler(e)


def send_domains(context: dict) -> dict | None:
    values = [
        {
            "domainName": domain["name"],
            "code": str(value["code"]),
            "value": value["name"],
        }
        for domain in context["domainValues"]
        for value in domain["codedValues"]
    ]

    try:
        context["service"].post_domain_values(values)
    except Exception as e:
        exception_handler(e)

    print(f"Domain values send: {len(values)}")
    return context

Pipeline

In [None]:
proxy = "@bcpxy.nycnet:8080"

import os

# Set proxy incase environment not set
os.environ["HTTP_PROXY"] = proxy
os.environ["HTTPS_PROXY"] = proxy
# bypass proxy on parks domains
os.environ["NO_PROXY"] = ".parks.nycnet"

# arc_gis = "https://formsgisportal.parks.nycnet"
# arc_gis = 'https://stg-formsgisportal.parks.nycnet'
arc_gis = "https://dev-formsgisportal.parks.nycnet"

factory = GISFactory(
    url=arc_gis + "/portal/home",
    username="forms.python_user",
    password="formsPython24*",
)
e_comply_repo = factory.create_feature(
    arc_gis + "/server/rest/services/eComply/eComplyContract/FeatureServer"
)
data_push_repo = factory.create_feature(
    arc_gis + "/server/rest/services/DataPush/ForMSDataPush/FeatureServer"
)

e_comply = eComply(
    url="https://nycparks-stage.ecomply.us/WebAPI",
    username="ff@ecomply.us",
    password="!test123",
)

server_gens = query_server_gens(
    {"repo": e_comply_repo},
)["server_gens"]
print(server_gens)

print("\nStarting Contract Pipelines...")
send_contract_domains_values = pipeline(
    {
        "repo": e_comply_repo,
        "layerId": 1,
        "domainNames": [
            "eComplyContractFundingSource",
            "eComplyContractType",
            "eComplyContractStatus",
            "eComplyContractBorough",
        ],
        "service": e_comply,
    },
    query_domains,
    send_domains,
)
send_contracts = pipeline(
    {
        "service": e_comply,
        "repo": e_comply_repo,
        "server_gens": server_gens.copy(),
    },
    contract_extract_changes,
    contract_send_edits,
    apply_server_gens_edit,
)
get_contact_changes = pipeline(
    {
        "service": e_comply,
        "repo": e_comply_repo,
        "server_gens": server_gens.copy(),
    },
    contract_get_changes,
    apply_edits,
)

print("\nStarting Work Order Pipelines...")
send_work_order_domain_values = pipeline(
    {
        "repo": data_push_repo,
        "layerId": 0,
        "domainNames": [
            "WOContract",
            "WOProject",
            "WOEntity",
            "WOStatus",
            "WOCategory",
            "WOType",
            "GenusSpecies",
        ],
        "service": e_comply,
    },
    query_domains,
    send_domains,
)
send_work_orders = pipeline(
    {
        "service": e_comply,
        "repo": data_push_repo,
        "server_gens": server_gens.copy(),
    },
    work_order_extract_changes,
    wo_get_associated_planting_space_globalid,
    wo_get_associated_planting_space,
    work_order_send_edits,
    apply_server_gens_edit,
)
get_work_order_changes = pipeline(
    {
        "service": e_comply,
        "repo": data_push_repo,
        "server_gens": server_gens.copy(),
    },
    work_order_get_changes,
    wo_update_associated_inspection,
    wo_update_associated_platingSpace,
    apply_edits,
)
get_work_order_line_items = pipeline(
    {
        "service": e_comply,
        "repo": e_comply_repo,
        "server_gens": server_gens.copy(),
    },
    work_order_line_item_get_changes,
    apply_edits,
)