In [1]:
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from cognite.client import CogniteClient 
from cognite.client.config import ClientConfig
from cognite.client.credentials import OAuthInteractive, OAuthClientCredentials
from cognite.client.data_classes import TimeSeries
from statsmodels.nonparametric.smoothers_lowess import lowess
from statsmodels.tsa.seasonal import seasonal_decompose

In [2]:
TENANT_ID = "3b7e4170-8348-4aa4-bfae-06a3e1867469"
CDF_CLUSTER = "api"
CLIENT_NAME = "akerbp"
CLIENT_ID = "779f2b3b-b599-401a-96aa-48bd29132a27"  #Cognite API User access- app registration
COGNITE_PROJECT = "akerbp"
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]

AUTHORITY_HOST_URI = "https://login.microsoftonline.com"
AUTHORITY_URI = AUTHORITY_HOST_URI + "/" + TENANT_ID
PORT = 53000

creds = OAuthInteractive(client_id=CLIENT_ID, authority_url=AUTHORITY_URI, scopes=SCOPES)

client_cnf = ClientConfig(client_name=CLIENT_NAME, 
                base_url=f"https://{CDF_CLUSTER}.cognitedata.com", 
                project=COGNITE_PROJECT, credentials=creds)
client = CogniteClient(client_cnf)

status = client.iam.token.inspect() #verify your client token and status
#print(status)
if "projects" not in vars(status):
    raise Exception("Token Error!")

ts_input_name = "VAL_11-LT-95034A:X.Value"
ts_output_name = "VAL_11-LT-95034A:X.CDF.D.AVG.LeakValue"
tank_volume = 1400
derivative_value_excl = 0.002
start_date = datetime(2023, 3, 21, 1, 0, 0)

In [3]:
def handle(client: CogniteClient, data: dict) -> pd.DataFrame:
    """Calculate drainage rate per timestamp and per day from tank,
    using Lowess filtering on volume percentage data from the tank.
    Large positive derivatives of signal are excluded to ignore 
    human interventions (filling) of tank.
    Data of drainage rate helps detecting leakages.

    Args:
        client (CogniteClient): client used to authenticate cognite session
        data (dict): data input to the handle

    Returns:
        pd.DataFrame: dataframe with drainage rate and trend (derivative)
    """
    # STEP 0: Unfold data
    tank_volume = data['tank_volume']
    derivative_value_excl = data['derivative_value_excl']
    start_date = data['start_date']
    end_date = start_date + timedelta(days=data['tot_days'])
    ts_input_name = data['ts_input_name']
    ts_output_name = data['ts_output_name']
    dataset_id = data['dataset_id']

    # STEP 1: Load time series from name and aggregate

    ts_all = client.time_series.search(name=ts_input_name) # find time series by name
    cdf_ext_id = ts_all[0].external_id # extract its external id
    df_cdf = client.time_series.data.retrieve(external_id=cdf_ext_id, 
                                        aggregates="average", 
                                        granularity="1m", 
                                        start=start_date, 
                                        end=end_date) # load time series by external id

    df = df_cdf.to_pandas()
    df = df.rename(columns = {cdf_ext_id + "|average": ts_input_name})

    # STEP 2: Filter signal
    df['time_sec'] = (df.index - datetime(1970,1,1)).total_seconds() # total seconds elapsed of each data point since 1970
    vol_perc = df[ts_input_name]
    smooth = lowess(vol_perc, df['time_sec'], is_sorted=True, frac=0.01, it=0)
    df_smooth = pd.DataFrame(smooth, columns=["time_sec", "smooth"])

    df.reset_index(inplace=True)
    df = df.rename(columns = {'index':'time_stamp'})
    df = pd.merge(df, df_smooth, on='time_sec') # merge smooth signal into origianl dataframe
    df.set_index('time_stamp', drop=True, append=False, inplace=True, verify_integrity=False)

    # STEP 3: Create new time series
    ts_output = client.time_series.create(TimeSeries(name=ts_output_name, external_id=ts_output_name, data_set_id=dataset_id))

    # STEP 4: Calculate daily average drainage rate
    df["derivative"] = np.gradient(df['smooth'], df["time_sec"]) # Unit: vol_percentage/time [% of tank vol / sec]
    # replace when derivative is greater than alfa
    derivative_value_excl = data['derivative_value_excl']
    df['derivative_excl_filling'] = df["derivative"].apply(lambda x: 0 if x > derivative_value_excl or pd.isna(x) else x)

    df.reset_index(inplace=True)
    df['Date'] = pd.to_datetime(df['time_stamp']).dt.date
    #df['Time'] = pd.to_datetime(df['time_stamp']).dt.time
    mean_drainage_day = df.groupby('Date')['derivative_excl_filling'].mean()*tank_volume/100 # avg drainage rate per DAY
    #mean_df = pd.DataFrame({'mean_derivative_by_day': mean_drainage_day})
    mean_df = pd.DataFrame({ts_output_name: mean_drainage_day}) # Use external ID as column name

    new_df = pd.merge(df, mean_df, on="Date")
    new_df["draining_rate [L/min]"] = new_df["derivative_excl_filling"]*tank_volume/100 # drainage rate per TIME STAMP

    ts_inserted = client.time_series.data.insert_dataframe(mean_df)

    return new_df, ts_output, ts_inserted

In [None]:
# Create function
func_drainage = client.functions.create(
    name="avg-drainage-rate",
    function_handle=handle
)

In [None]:
# Call function
data_dict = {'start_date':start_date, 'tot_days':25, 
                'ts_input_name':ts_input_name, 'ts_output_name':ts_output_name,
            'derivative_value_excl':derivative_value_excl, 'tank_volume':tank_volume}
func_info = {'function_id':'avg-drainage-rate'}

call_func_drainage = func_drainage.call(data=data_dict, function_call_info=func_info)