In [57]:
import kfp

from typing import NamedTuple
import kfp.components as comp
from kfp import compiler, dsl
from kfp import dsl
from kfp.components import InputPath, OutputPath
from kubernetes.client.models import V1EnvVar

In [None]:
def GetThresholds(output_thresholds_path: OutputPath(str)):

    import requests
    import json

    thresholds = {
        "heat_meter":{
            "building1": [3,5],
            "building2": [3,5],
            "datacenter1": [3,5],
            "psnc_garden": [3,5],
            "hvac": [3,5],
            "office":[3,5],
            "flat1": [3,5],
            "eagle": [3,5],
            "altair": [3,5]
        },
        "electricity_meter": {
            "building1": [3,5],
            "building2": [3,5],
            "datacenter1": [3,5],
            "psnc_garden": [3,5],
            "hvac": [3,5],
            "office": [3,5],
            "flat1": [3,5],
            "eagle": [3,5],
            "altair": [3,5]
        }
    }

    with open(output_thresholds_path, "w") as file:
        json.dump(thresholds, file)
    


In [58]:
def GetData(measurement_name: str, output_data_forecast: OutputPath(str)):

    import requests # To REQUIREMENTS
    import json
    import pandas as pd # To REQUIREMENTS
    import maya # To REQUIREMENTS
    from tqdm import tqdm

    def GetRequest(url, headers ={}, payload = {}):
        response = requests.request("GET", url, headers = headers, data = payload)
        try:
            return response.json()
        except:
            dict_ = {
                "status_code": response.status_code,
                "text": response.text
            }
            return dict_
    def DownloadAssetsData(measurement_name, bucket = "renergetic", min_date = "yesterday", max_date = "tomorrow"):
        
        from datetime import datetime
        import pandas as pd
        import maya
        from tqdm import tqdm

        try:
            min_date_from = maya.when(min_date).datetime()
        except:
            ValueError("Please introduce correct time format for MIN_DATE")
        
        try: 
            max_date_from = maya.when(max_date).datetime()
        except:
            ValueError("Please introduce correct time format for MAX_DATE")
        
        datelist = pd.date_range(min_date_from, max_date_from)

        data_ = []

        for i in tqdm(range(len(datelist)-1)):
            from_obj = datelist[i]
            to_obj = datelist[i+1]
            from_ = datetime.strftime(from_obj, "%Y-%m-%d 00:00:00")
            to_ = datetime.strftime(to_obj, "%Y-%m-%d 00:00:00")

            url = "http://influx-api-ren-prototype.apps.paas-dev.psnc.pl/api/measurement/data?measurements={measurement_name}&from={from_}&to={to_}"\
                .format(measurement_name = measurement_name, from_ = from_, to_= to_)
            
            info_ = GetRequest(url)

            if type(info_) == list:
                data_ = data_ + info_
            elif type(info_) == dict:
                print("Error")
                print(from_)
                print(to_)
        return data_
    
    def DataFrameAssests(list_data, name_field):
        dicts = []
        for data in list_data:
            try:
                dict_ = {
                    "ds": data["tags"]["time_prediction"],
                    "asset_name": data["tags"]["asset_name"],
                    "direction": data["tags"]["direction"],
                    "type": data["tags"]["typeData"],
                    "value": float(data["fields"][name_field]),
                    "time_registered": data["fields"]["time"]
                }
                dicts.append(dict_)
            except:
                continue
        return pd.DataFrame(dicts)
    
    
    max_date = maya.now().add(days = 3).iso8601()
    list_ = DownloadAssetsData(measurement_name, min_date = "13 Nov 2022 13:15:00", max_date = max_date)
    data = DataFrameAssests(list_, "energy")
    data = data[data.type == "simulated"]

    data_output = {
        "ds": data["ds"].tolist(),
        "value": data["value"].tolist(),
        "time_registered": data["time_registered"].tolist(),
        "asset_name": data["asset_name"].tolist()
    }

    with open(output_data_forecast, "w") as file:
        json.dump(data_output, file)

In [59]:
def ForecastProcess(input_data_path: InputPath(str),input_thresholds_path : InputPath(str), measurement_name,
    path_minio,
    access_key,
    secret_key
    ):

    import maya
    from discord_webhook import DiscordWebhook
    import json
    from icecream import ic
    import requests
    import pandas as pd
    from prophet.serialize import model_to_json
    from minio import Minio

    with open(input_thresholds_path) as file:
        thresholds = json.load(file)

    client = Minio(
        path_minio,
        access_key=access_key,
        secret_key=secret_key,
        secure = False
    )
    
    def ForecastData(data, asset_name, measurement_name):
        data_ds = data[data.asset_name == asset_name][["time_registered", "value"]]
        data_ds.columns = ["ds", "y"]
        from prophet import Prophet

        m = Prophet(daily_seasonality=True, weekly_seasonality=True)
        m.fit(data_ds)
        future = m.make_future_dataframe(periods=24*4, freq="15T")
        forecast = m.predict(future)
        
        with open("/tmp/model_prophet.json", 'w') as fout:
            fout.write(model_to_json(m))  # Save model
        

        date = maya.when("now").rfc2822()
        f_name = "model_{domain}_{asset}_{date}.json"\
            .format(domain = measurement_name, asset = asset_name, date = date)

        result = client.fput_object(
            "test", f_name, "/tmp/model_prophet.json"
        )

        print(
            "created {0} object; etag: {1}, version-id: {2}".format(
                result.object_name, result.etag, result.version_id,
            ),
        )

        return forecast[forecast.ds > max(data_ds.ds)]


    if measurement_name == "electricity_meter":
        domain_ = "electricity"
    elif measurement_name == "heat_meter":
        domain_ = "heat"
    else:
        url_disc = "https://discord.com/api/webhooks/1002537248622923816/_9XY9Hi_mjzh2LTVqnmSKXlIFJ5rgBO2b8xna5pynUrzALgtC4aXSFq89uMdlW_v-ZzT"
        message = "There is a problem with the measurement_name, {measurement_name}".format(measurement_name = measurement_name)
        webhook = DiscordWebhook(url = url_disc, content = message)
        webhook.execute()
        raise ValueError

    asset_list_dict = {
        "electricity_meter": ["building1", "building2", "pv_panel_1", "wind_farm_1"],
        "heat_meter": ["building1", "building2", "solar_collector1"]
    }

    asset_list = asset_list_dict[measurement_name]

    with open(input_data_path) as file:
        data_str = json.load(file)
    
    data = pd.DataFrame(data_str)

    for asset_name in asset_list:

        ic(asset_name)
        try:
            threshold_min = thresholds[measurement_name][asset_name][0]
        except:
            threhold_min = 0
        
        try:
            threshold_max = thresholds[measurement_name][asset_name][1]
        except:
            threhold_max = 10000
        

        forecasted_data = ForecastData(data, asset_name, measurement_name)
        for index, row in forecasted_data.iterrows():
            time_ = str(row["ds"])
            time_epoch = maya.parse(time_).epoch
            value = row["yhat"]

            if asset_name in ["building1", "building2"]:
                direction_energy = "in"
                type_ = "None"
            else:
                direction_energy = "out"
                type_ = "renewable"


            data_post = {
                    "bucket": "renergetic",
                    "measurement": measurement_name,
                    "fields":{
                        "energy": value,
                        "time": time_,
                    },
                    "tags":{
                        "domain": domain_,
                        "typeData": "forecasting",
                        "direction": direction_energy,
                        "prediction_window": "24h",
                        "asset_name": asset_name,
                        "measurement_type": type_,
                        "time_prediction": maya.when("now").epoch
                    }
                }
            
            # SEND NOTIFICATION

            if value < threhold_min:
                msg = "[{time_pred}]Anomaly of minimum forecast in domain: {domain} and asset_name {asset_name}."\
                    .format(time_pred = time_, domain = domain_, asset_name = asset_name)
                
                print(msg)
            
            if value > threhold_max:
                msg = "[{time_pred}]Anomaly of maximum forecast in domain: {domain} and asset_name {asset_name}."\
                    .format(time_pred = time_, domain = domain_, asset_name = asset_name)
                
                print(msg)

            url = "http://influx-api-ren-prototype.apps.paas-dev.psnc.pl/api/measurement"
            headers = {
                "Content-Type": "application/json"
            }

            try:
                # response = requests.request("POST", url, headers=headers, data=json.dumps(data_post))
                # status_code = response.status_code
                status_code = 200
            except:
                url_disc = "https://discord.com/api/webhooks/1002537248622923816/_9XY9Hi_mjzh2LTVqnmSKXlIFJ5rgBO2b8xna5pynUrzALgtC4aXSFq89uMdlW_v-ZzT"
                message = "Error in updating value for measurement name: {measurement_name} in asset: {asset_name} in time {time_pred}"\
                    .format(measurement_name = "electricity_meter", asset_name = asset_name, time_pred = data_post["fields"]["time"])
                webhook = DiscordWebhook(url = url_disc, content = message)
                webhook.execute()
                status_code = 500

    url_disc = "https://discord.com/api/webhooks/1002537248622923816/_9XY9Hi_mjzh2LTVqnmSKXlIFJ5rgBO2b8xna5pynUrzALgtC4aXSFq89uMdlW_v-ZzT"
    message = "Forecasting done for {domain}".format(domain = domain_)
    webhook = DiscordWebhook(url = url_disc, content = message)
    webhook.execute()


In [60]:
def ExportModelToMinio(input_model_path: InputPath(str),measurement_name, 
    path_minio = "minio.kubeflow-renergetic.svc:9000",
    access_key = "minio",
    secret_key = "DaTkKc45Hxr1YLR4LxR2xJP2"
    ):

    from minio import Minio
    import json
    with open(input_model_path) as file:
        model_serialiazed = json.load()
    client = Minio(
        path_minio,
        access_key=access_key,
        secret_key=secret_key,
    )


In [61]:
def REN_Forecast_Test_Pipeline(url = "minio-kubeflow-renergetic.apps.dcw1-test.paas.psnc.pl",
    access_key="minio",
    secret_key="DaTkKc45Hxr1YLR4LxR2xJP2"):

    env_var = V1EnvVar(name='HOME', value='/tmp')
    download_data_op = comp.create_component_from_func(
        GetData, packages_to_install = ["requests", "numpy", "maya","pandas", "icecream", "tqdm"], output_component_file = "download_data_op_component.yaml")
    get_thresholds_op = comp.create_component_from_func(
        GetThresholds, packages_to_install= ["requests"], output_component_file= "thresholds_component.yaml"
    )
    forecast_data_op = comp.create_component_from_func(
        ForecastProcess, packages_to_install = ["requests", "numpy", "maya","pandas", "icecream", "prophet", "discord-webhook", "tqdm", "minio"], output_component_file = "forecast_data_op_component.yaml")
    
    get_thresholds_task = get_thresholds_op()
    
    with dsl.ParallelFor(["electricity_meter", "heat_meter"]) as measurement:
        download_task = (download_data_op(measurement).add_env_variable(env_var)
                        .set_memory_request('500M')
                        .set_memory_limit('1Gi')
                        .set_cpu_request('1')
                        .set_cpu_limit('2'))
        forecast_task = (forecast_data_op(download_task.output, measurement, url, access_key, secret_key).add_env_variable(env_var)
                        .set_memory_request('500M')
                        .set_memory_limit('1Gi')
                        .set_cpu_request('1')
                        .set_cpu_limit('2'))
    
compiler.Compiler().compile(pipeline_func = REN_Forecast_Test_Pipeline, package_path ="Forecast_Data_Pipeline.yaml")
    

    
    