# Transform Data

This notebook will transform the extracted data and store it after transformation in parquet files.

For detailed description see [documentation](../docs/iot-time-series-data.md).

In [None]:
import os

# get the current working directory
current_dir = os.getcwd()
print(f"current work directory: {current_dir}")

# if the current working directory not ends with "notebooks", change it to the parent directory
if not current_dir.endswith("notebooks"):

    workspace_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
    os.chdir(workspace_root)

    print(f"changed root work directory to: {workspace_root}")

from modules.util.database import SQLAlchemyClient  # noqa: E402

# ------------------------------------------------------------------------------ #
# Configuration
# ------------------------------------------------------------------------------ #

CONFIG_ID = "CUSTOM_TEST"

# setup database

db = SQLAlchemyClient(CONFIG_ID)
db.table_create_all()

# Transform Data

We'll iterate over all csv files in the download folder and process each file after another.
We expect that each file has a unique property set type or indicator group as this was the
key when downloading the file. 

Next we need to determine the APM and eIOT ID's for each Thing / Indicator from PAI. To avoid
unneccesary API calls we also store the mapping in an own DB table for later lookup. So, if the
mapping was already determined before, we can return the values from the DB. Otherwise we need
to do the following steps:
- determine the modelId and modelType from PAI
- from external id api we can now determine the technical object number for the source S4 system
- after finally having the SSID, Number and Type of the technical object we can get the
metadata for this TO from the eIOT Metadata API. This will return the needed information as
managedObjectId and all the measuringNodeId's for the assigned indicators.
- also important to check that the status of the metadata sync is done ("synced")
- to map the "old" PAI indicator to the new indicator in APM we use the own database view
V_POST_LOAD_INDICATORS which holds the information about the newly created indicators in APM.
- if the PAI indicator was created in APM we save this indicator data from eIOT in our internal
mapping table.

After the mapping for the technical objects and indicators are derived, we are creating a dataset
for each measurement with the assigned _time, managedObjectId, measuringNodeId, characteristic and
value. As this dataset is loaded into a dataframe, we can easily pivot the data to have all
characteristics in one line, based on the key fields of _time, managedObjectId, measuringNodeId.

The schema of the dataset will be set afterwards. All indicators with numeric or numeric flexible
get the datatype `float` and a date indicator get the datatype of `date`. Indicators with
string can't be migrated.

Finally the dataframe will be written as a parquet file to the location you have configured in the
config file under `["transform"]["time-series"]["directory"]`.

In [None]:
import os
import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq



from custom.tools import equipment_mapper # noqa: E402
from modules.acf.external_id_api import ApiExternalId  # noqa: E402
from modules.util.config import get_config_by_id  # noqa: E402
from modules.util.helpers import (  # noqa: E402
    Logger,  # noqa: E402
    convert_unix_to_iso,  # noqa: E402
    get_parquet_schema,  # noqa: E402
)  # noqa: E402
from modules.acf.model_api import ApiModel  # noqa: E402
from modules.apm.eiot import EIoTApi  # noqa: E402

from modules.util.database import (  # noqa: E402
    EIotMapping,  # noqa: E402
    EIotMappingIndicators,  # noqa: E402
    SQLAlchemyClient,  # noqa: E402
    V_PostLoad_Indicators,  # noqa: E402
)  # noqa: E402

# ------------------------------------------------------------------------------ #
# Configuration
# ------------------------------------------------------------------------------ #

CONFIG_ID = "CUSTOM_TEST"
log = Logger.get_logger(CONFIG_ID)
config = get_config_by_id(CONFIG_ID)
api_external_id = ApiExternalId(CONFIG_ID)
api_eiot = EIoTApi(CONFIG_ID)
api_model = ApiModel(CONFIG_ID)
db = SQLAlchemyClient(CONFIG_ID)

db.truncate(EIotMapping)
db.truncate(model=EIotMappingIndicators)

indicator_mapping = dict()

# TO-DO: move to configuration and create folders if not exist  (use pathlib)
DOWNLOAD_FOLDER = config["extract"]["time-series"]["directory"]
TRANSFORMED_FOLDER = config["transform"]["time-series"]["directory"]
COLUMN_IGNORE_LIST = ["_TIME", "equipmentId", "indicatorGroupId", "modelId", "templateId", "nPST"]

# dictionary to store the external ids (technical objects)
# external_ids = dict() # key: thing_id, value: { "ain_id": "1234", "erp_id": "1234" }
SSID = api_eiot.get_ssid()

def get_indicator_mapping(db_session: SQLAlchemyClient, indicator_id: str, acf_id:str, thing_id: str, indicator_mapping: dict) -> dict:
    """ In order not to query the API from APM everytime we store the indicator mapping in a database
    and query it from there.

    Args:
        indicator_id (str): _description_
        acf_id (str): _description_
        thing_id (str): _description_

    Returns:
        dict: _description_
    """
    # if the indicator_id starts with leading I_ we need to remove it
    if indicator_id.startswith("I_"):
        indicator_id = indicator_id[2:]

    if indicator_id in indicator_mapping:
        return indicator_mapping[indicator_id]

    # create where condition
    if acf_id:
        indicator_mapping_db = db_session.select(
            model=EIotMapping,
            where=[
                EIotMapping.acfId == f"{acf_id}",
            ],
            return_dict=False,
        )
    elif thing_id:
        indicator_mapping_db = db_session.select(
            model=EIotMapping,
            where=[
                EIotMapping.acfId
                == f"{thing_id}",
            ],
            return_dict=False,
        )

    if len(indicator_mapping_db) == 0:
        log.error(f"Could not find indicator mapping for indicator id {indicator_id}")
        return None
    elif len(indicator_mapping_db) == 1:
        # Ensure the parent instance is bound to a session
        from sqlalchemy.orm import sessionmaker

        Session = sessionmaker(bind=db_session.engine)
        with Session() as session:
            parent_instance = session.query(EIotMapping).get(indicator_mapping_db[0].id)

            # find the indicator mapping
            for indi in parent_instance.indicators:
                if indi.indicatorIdAcf == indicator_id:
                    indi.managedObjectId = parent_instance.managedObjectId
                    indicator_mapping[indicator_id] = indi
                    return indi
            else:
                log.error(
                    f"Could not find indicator mapping for indicator id {indicator_id}"
                )
                indicator_mapping[indicator_id] = None
                return None


def check_eiot_mapping(db_session: SQLAlchemyClient, acf_id:str, model_id: str) -> bool:
    
    eiot_mapping = db_session.select_one(
        model=EIotMapping,
        where=[EIotMapping.acfId == f"{acf_id}"],
    )

    if eiot_mapping is None:
        # we need to add the mapping
        return add_eiot_mapping(db_session=db, model_id=model_id, acf_id=acf_id)
    else:
        # we need to check if the mapping has indicators
        eiot_mapping_count_indicators = db_session.select_count(
            model=EIotMappingIndicators,
            where=[EIotMappingIndicators.parent_id == f"{eiot_mapping['id']}"],
        )
        if eiot_mapping_count_indicators == 0:
            return False
        elif eiot_mapping_count_indicators > 0:
            return True

def get_eiot_mapping_count(db_session: SQLAlchemyClient, acf_id:str):
    eiot_mapping = db_session.select(
        model=EIotMapping,
        where=[EIotMapping.acfId == f"{acf_id}"],
    )

    if len(eiot_mapping) == 0:
        return 0
    elif len(eiot_mapping) == 1:
        eiot_mapping_count_indicators = db_session.select_count(
            model=EIotMappingIndicators,
            where=[EIotMappingIndicators.parent_id == f"{eiot_mapping[0]['id']}"],
        )
        if eiot_mapping_count_indicators == 0:
            log.error(f"No indicators found for {eiot_mapping[0]['number']}({eiot_mapping[0]['type']})")
            return -1
        elif eiot_mapping_count_indicators > 0:
            return 1
    else:
        log.error(f"Unknown length when getting eiot mapping: EIOT_MAPPING {len(eiot_mapping)}")
        return 0


def get_output_file(indicator_group_id: str) -> str:
    outout_folder = os.path.join(TRANSFORMED_FOLDER, indicator_group_id)
    if not os.path.exists(outout_folder):
        os.makedirs(outout_folder)
    return os.path.join(outout_folder, file.replace(".csv", ".parquet"))

def add_eiot_mapping(db_session: SQLAlchemyClient, model_id: str, acf_id: str = None) -> bool:
    
    eiot_mapping_header = EIotMapping()

    # first we need to get the model id to distinguish between equipment and functional location
    model_data = api_model.get_model_header(model_id=model_id)

    eiot_mapping_header.modelId = model_id
    eiot_mapping_header.type = model_data["modelType"]
    eiot_mapping_header.acfId = acf_id
    eiot_mapping_header.SSID = SSID

    api_external_id_res = api_external_id.get_external_data(
        filter_str=f"objectType eq '{model_data["modelType"]}' and systemType eq 'SAP ERP' and ainObjectId eq '{acf_id}'"
    )

    if len(api_external_id_res) == 0:
        log.error(f"Could not find external id {acf_id} in ACF")
        # add without eiot mapping and without indicators, but return False
        db_session.insert_one(obj=eiot_mapping_header, commit=True)
        return False

    # eiot_mapping_header.number = api_external_id_res[0]["externalId"]
    eiot_mapping_header.number = equipment_mapper(api_external_id_res[0]["externalId"])

    try:
        eiot_sync_status = api_eiot.get_eiot_sync_status_by_to(
            number=eiot_mapping_header.number,
            ssid=eiot_mapping_header.SSID,
            to_type=eiot_mapping_header.type,
        )
    except Exception as e:
        log.error(
            f"Error when getting eiot sync status for {eiot_mapping_header.number}({eiot_mapping_header.type}): {e.status_code}:{e.response}"
        )
        db_session.insert_one(obj=eiot_mapping_header, commit=True)
        return False

    if eiot_sync_status['technicalObjectSyncStatus'] == 'NOT_SYNCED' or eiot_sync_status['eIotSyncTime'] is None:
        log.error(f"Technical object {eiot_mapping_header.number}({eiot_mapping_header.type}) is not synced")
        db_session.insert_one(obj=eiot_mapping_header, commit=True)
        return False

    eiot_mapping_header.managedObjectId = eiot_sync_status[
        "managedObjectId"
    ]

    for indicator in eiot_sync_status["indicators"]:
        # add indicators to mapping
        log.debug(f"check indicator {indicator["indicatorId"]}")
        # read the matching indicator
        apm_indicator = db.select(
            model=V_PostLoad_Indicators,
            distinct=True,
            where=[
                V_PostLoad_Indicators.apm_indicatorId
                == f"{indicator["indicatorId"]}",
                V_PostLoad_Indicators.apm_positionId
                == f"{indicator["positionDetailsId"]}",
                V_PostLoad_Indicators.APMIndicatorCategory
                == f"{indicator["categoryName"]}",
                V_PostLoad_Indicators.CharcInternalID
                == f"{indicator["characteristicsInternalId"]}",
                V_PostLoad_Indicators.externalId
                == f"{eiot_sync_status['number']}",
                V_PostLoad_Indicators.technicalObject_type
                == f"{eiot_sync_status['type']}",
                V_PostLoad_Indicators.ssid
                == f"{eiot_sync_status['SSID']}",
            ],
        )

        if len(apm_indicator) == 0:
            log.debug(
                f"no synced indicator {indicator["indicatorId"]} posId:{indicator["positionDetailsId"]}"
            )
        elif len(apm_indicator) == 1:
            # add an item to eiot_mapping_header
            eiot_mapping_item = EIotMappingIndicators(
                tenantid=apm_indicator[0]["tenantid"],
                indicatorIdAcf=apm_indicator[0]["indicators_id"],
                indicatorIdApm=apm_indicator[0]["apm_indicatorId"],
                categoryName=apm_indicator[0]["APMIndicatorCategory"],
                characteristicsInternalId=apm_indicator[0][
                    "CharcInternalID"
                ],
                positionDetailsId=apm_indicator[0]["apm_positionId"],
                dataType=indicator["dataType"],
                unitOfMeasure=indicator["unitOfMeasure"],
                charcLength=indicator["charcLength"],
                charcDecimals=indicator["charcDecimals"],
                measuringNodeId=indicator["measuringNodeId"],
                technicalGroupId=indicator["technicalGroupId"],
            )

            eiot_mapping_header.indicators.append(eiot_mapping_item)
        else:
            log.error(
                f"Unknown length when getting indicator mapping: V_POST_LOAD_INDICATORS {len(apm_indicator)}"
            )

        # if len(eiot_mapping_header.indicators) == 0:
        #     log.error(f"No indicators found for {eiot_mapping_header.number}({eiot_mapping_header.type})")
        #     db_session.insert_one(obj=eiot_mapping_header, commit=True)
        #     return False

    # due to lazy loading in sqlalchemy we need to save the indicator count
    indicator_count = len(eiot_mapping_header.indicators)

    # insert mapping to db
    db_session.insert_one(obj=eiot_mapping_header, commit=True)

    eiot_mapping_count = db_session.select(
        model=EIotMapping,
        where=[EIotMapping.acfId == f"{acf_id}"],
    )

    if indicator_count == 0:
        log.error(f"No indicators found for {eiot_mapping_count[0]['number']}({eiot_mapping_count[0]['type']})")
        return False
    return True


# if V_PostLoad_Indicators is empty, we need can skip the rest of the script
if db.select_count(model=V_PostLoad_Indicators) == 0:
    log.error("No indicators found in V_PostLoad_Indicators")
    raise Exception("No indicators found in V_PostLoad_Indicators")

# find all files with ending .csv in the download folder and their sub-folders
for root, dirs, files in os.walk(DOWNLOAD_FOLDER):
    for file in files:
        start_time = time.time()
        data = []
        if file.endswith(".csv"):
            log.debug(os.path.join(root, file))
            # read the csv file into a pandas dataframe
            df = pd.read_csv(os.path.join(root, file), sep=",", header=0)
            log.debug(f"Read CSV file in {time.time() - start_time:.2f} seconds")

            # split root string by / and get the last element
            # fallback if ind. group is not unique in the csv files
            # current_ind_group = root.split("/")[len(root.split("/")) - 1].split("+")[0]

            # we have different flavors of csv files, so we need to check if the column names are the same
            if "THING_ID" in df.columns:
                if len(df["nPST"].unique()) > 1:
                    log.error("Indicator group id is not unique")
                    # TO-DO: if this becomes true we need to use the
                    # indicator group extracted from the folder name
                    raise Exception("Indicator group id is not unique")
                else:
                    output_file = get_output_file(indicator_group_id=df["nPST"].unique()[0])

                log.debug("CSV data is based on thing model")

                thing_ids = df["THING_ID"].unique()


                for thing_id in thing_ids:
                    thing_id_mapping = api_external_id.get_acf_object_by_thing_id(external_id=thing_id)

                    model_mapping_iot = api_external_id.get_acf_model_id_by_thing_type(thing_type=df["_ThingType"].unique()[0])
                    
                    if not thing_id_mapping:
                        df = df[df["THING_ID"] != thing_id]
                        continue

                    eiot_result = check_eiot_mapping(db_session=db, acf_id=thing_id_mapping["ainObjectID"], model_id=model_mapping_iot["ainObjectID"])
                    if not eiot_result:
                        df = df[df["THING_ID"] != thing_id]

            elif "equipmentId" in df.columns:
                if len(df["indicatorGroupId"].unique()) > 1:
                    log.error("Indicator group id is not unique")
                    raise Exception("Indicator group id is not unique")
                else:
                    output_file = get_output_file(indicator_group_id=df["indicatorGroupId"].unique()[0])

                log.debug("CSV data is based on abstract model")
                equi_ids = df["equipmentId"].unique()

                for equi_id in equi_ids:
                    eiot_result = check_eiot_mapping(db_session=db, acf_id=equi_id, model_id=df.loc[df["equipmentId"] == equi_id].iloc[0]["modelId"])
                    if not eiot_result:
                        df = df[df["equipmentId"] != equi_id]

            log.debug(f"Processed equipment/thing IDs in {time.time() - start_time:.2f} seconds")

            for index, row in df.iterrows():
                iso_time = convert_unix_to_iso(row["_TIME"])

                if "equipmentId" in df.columns:
                    acf_id = row["equipmentId"]
                    thing_id = None
                elif "THING_ID" in df.columns:
                    acf_id = None
                    thing_id = row["THING_ID"]

                for key, value in row.items():
                    if key in COLUMN_IGNORE_LIST:
                        continue
                    if pd.isna(value):
                        continue
                    ind_mapping = get_indicator_mapping(db_session=db, indicator_id=key, acf_id=acf_id, thing_id=thing_id, indicator_mapping=indicator_mapping)
                    if ind_mapping is None:
                        continue

                    if isinstance(value, bool):
                        value = 1 if value else 0
                    dataset = {
                        "_time": iso_time,
                        "value": value,
                        "managedObjectId": ind_mapping.managedObjectId,
                        "characteristic": f"C_{ind_mapping.characteristicsInternalId}",
                        "measuringNodeId": ind_mapping.measuringNodeId,
                    }
                    data.append(dataset)

            log.debug(f"Processed rows in {time.time() - start_time:.2f} seconds")

            # store data as parquet file
            if len(data) == 0:
                log.debug("No data to store")
                continue
            
            df = pd.DataFrame(data)

            df_pivot = df.pivot_table(
                index=["managedObjectId", "_time", "measuringNodeId"],
                columns="characteristic",
                values="value",
                aggfunc="first",
            ).reset_index()

            # Ensure _time column is in datetime format
            df_pivot["_time"] = pd.to_datetime(df_pivot["_time"], errors='coerce')
            if df_pivot["_time"].dt.tz is None:
                df_pivot["_time"] = df_pivot["_time"].dt.tz_localize('UTC')
            else:
                df_pivot["_time"] = df_pivot["_time"].dt.tz_convert('UTC')

            # Ensure the schema correctly reflects the data types of the columns
            schema = get_parquet_schema(df_pivot, indicator_mapping, log)

            table = pa.Table.from_pandas(df_pivot, schema=schema)
            pq.write_table(table, output_file, compression='GZIP')
            log.debug(f"Stored parquet file in {time.time() - start_time:.2f} seconds")

print("start upload")