In [1]:
import pandas as pd
import numpy as np
import sys
import gc

import os
sys.path.append(os.path.abspath(".."))

In [2]:
import s3fs
from typing import List

from utils.common import *
from config.params import *
from preprocessing.transform import transform, tracking_transforming_input
from preprocessing.intervals import get_interval_from_transformed

In [3]:
from preprocessing.prepare_clustering_data import *

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [4]:
from training.models import *
from training.visualize import *

In [5]:
from pyarrow.dataset import field

In [6]:
import sagemaker
from sagemaker import get_execution_role

In [7]:
import matplotlib.pyplot as plt
import seaborn as sns

In [8]:
# get the lastest saved data from mlflow run
import mlflow
from mlflow.tracking import MlflowClient

In [9]:
from sklearn.cluster import KMeans
from datetime import datetime
from pathlib import Path

In [10]:
import sagemaker
import boto3
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

In [11]:
from io import StringIO

In [12]:
from sagemaker.model import Model
from sagemaker.transformer import Transformer

In [13]:
# import boto3

In [14]:
client = MlflowClient()

In [15]:
# Define session, role, and region so we can
# perform any SageMaker tasks we need
sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name

In [16]:
# Provide the ARN of the tracking server that you want to track your training job with
tracking_server_arn = 'arn:aws:sagemaker:ap-southeast-1:771463264346:mlflow-tracking-server/mlflow-RCF-server'

In [17]:
mlflow.set_tracking_uri(tracking_server_arn)

In [18]:
experiment_name = "5. Evaluate RCF Model"
mlflow.set_experiment(experiment_name)

<Experiment: artifact_location='s3://s3-assetcare-bucket/mlflow_server/21', creation_time=1746666809381, experiment_id='21', last_update_time=1746666809381, lifecycle_stage='active', name='5. Evaluate RCF Model', tags={}>

### functions

In [19]:
def get_value(run_id, keys):
    run = mlflow.get_run(run_id)

    # Lấy toàn bộ params
    params = run.data.params
    # print(params)
    
    # Lấy giá trị cụ thể, ví dụ: cluster_id
    filepath = [params.get(key) for key in keys]
    return filepath

### params

In [22]:
lst_run_id = [
    "dcdfba4baf354faa8662f4304a5c67f6",
    # "aa414e38c3044706b146ec532310be99",
    # "cdf61f9465084ee398dcc6384132bb22",
    "bee2d40c5e5147f196b708eab655b850",
    "241a973b95b8426ab168e3a11207e43f",
    
    "921ad6e565d14c70846c791d5caecabb"
]

In [23]:
keys = [
    "Analog Tag name",
    "Model Path",
    "Cluster Nr",
    "Dataset path: S3 Input Scored",
    "Dataset path: S3 Input Evaluation",
    "saved_result"
]

In [24]:
bucket = "s3-assetcare-bucket"

### running evaluation

In [25]:
# create df of params info
rows = []
for run_id in lst_run_id:
    values = get_value(run_id, keys)
    row = [run_id] + values
    rows.append(row)

# create df
columns = ["run_id"] + keys
df = pd.DataFrame(rows, columns=columns)

In [26]:
df

Unnamed: 0,run_id,Analog Tag name,Model Path,Cluster Nr,Dataset path: S3 Input Scored,Dataset path: S3 Input Evaluation,saved_result
0,dcdfba4baf354faa8662f4304a5c67f6,AUXILIARY_HPU_AI_PRESSURE_VALUE,,3,,,
1,bee2d40c5e5147f196b708eab655b850,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,4,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...
2,241a973b95b8426ab168e3a11207e43f,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,2,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...


##### if this filtered df below has no rows then you cannot runfurther script

In [27]:
filtered_df = df[df["Model Path"].notna()]
filtered_df.head()

Unnamed: 0,run_id,Analog Tag name,Model Path,Cluster Nr,Dataset path: S3 Input Scored,Dataset path: S3 Input Evaluation,saved_result
1,bee2d40c5e5147f196b708eab655b850,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,4,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...
2,241a973b95b8426ab168e3a11207e43f,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,2,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...


### save the filtered df to a file to edit the run_id before running to get Anomaly scores

In [73]:
filtered_df.to_csv("get_anomaly_scores_by_runID", index=False)
filtered_df.shape

(2, 7)

### read info from file

In [74]:
filepath = "get_anomaly_scores_by_runID"

In [75]:
filtered_df = pd.read_csv(filepath)
filtered_df.head()

Unnamed: 0,run_id,Analog Tag name,Model Path,Cluster Nr,Dataset path: S3 Input Scored,Dataset path: S3 Input Evaluation,saved_result
0,bee2d40c5e5147f196b708eab655b850,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,4,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...
1,241a973b95b8426ab168e3a11207e43f,DWA_DSU_DC_VOLTAGE,s3://s3-assetcare-bucket/features_store/models...,2,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/traini...,s3://s3-assetcare-bucket/features_store/cluste...


In [76]:
filtered_df.shape

(2, 7)

### get anomaly scores

In [71]:
for i in range(len(filtered_df)):
    current_time = get_current_timestamp_string()

    # for each runID - which has trained model
    # 1. Get the input params for evaluate
    model_path = filtered_df.iloc[i]["Model Path"]
    s3_input_scored = filtered_df.iloc[i]["Dataset path: S3 Input Scored"]
    s3_input_evaluation = filtered_df.iloc[i]["Dataset path: S3 Input Evaluation"]
    tag_analog = filtered_df.iloc[i]["Analog Tag name"]
    cluster_nr = filtered_df.iloc[i]["Cluster Nr"]
    saved_result = filtered_df.iloc[i]["saved_result"]
    
    prefix = s3_input_scored.replace("s3://", "").replace(bucket, "")[1:]
    file_list = list_files_in_s3_folder(bucket=bucket, prefix=prefix, end_with_csv=True)

    prefix_evaluation = s3_input_evaluation.replace("s3://", "").replace(bucket, "")[1:]
    file_list_evaluation = list_files_in_s3_folder(bucket=bucket, prefix=prefix_evaluation)
    
    # 2. Evaluation
    # get newest container of Random Cut Forest on SageMaker
    rcf_container = sagemaker.image_uris.retrieve("randomcutforest", region)
    
    # Load model từ S3
    rcf_model = Model(
        image_uri=rcf_container,  # use container SageMaker Random Cut Forest
        model_data=model_path,  # path to model on S3
        role=role,
        sagemaker_session=sagemaker_session
    )

    for idx in range(len(file_list)):
        feature_s3_path = f's3://{bucket}/{prefix}part_{idx:04d}.csv'
        output_path = f"s3://{bucket}/{prefix}anomaly_scores/"
        
        transformer = rcf_model.transformer(
            instance_count=2,
            instance_type="ml.m5.xlarge",
            output_path=output_path,
            assemble_with="Line",
            accept="text/csv"
        )
        
        # run prediction for data on S3
        transformer.transform(feature_s3_path, content_type="text/csv", split_type="Line", wait=True)

    # 3. Read anomaly score and merge as one result file
    # output file list
    output_file_list = list_files_in_s3_folder(bucket, prefix+"anomaly_scores/")
    df_all = load_csv_files([f"s3://{bucket}/{i}" for i in output_file_list])
    print("df_all shape", df_all.shape)

    evaluation_file_list = list_files_in_s3_folder(bucket, prefix+"anomaly_scores/")
    df_origin = load_csv_files([f"s3://{bucket}/{i}" for i in file_list_evaluation])
    df_origin.columns = ["values", "time_utc"]
    df_origin["anomaly_scores"] = df_all[0]
    df_origin["tag_name"] = tag_analog
    default_threshold = round(max(df_origin["anomaly_scores"]))
    # df_origin['prediction'] = (df_origin['anomaly_scores'] > default_threshold).astype(bool)
    df_origin['prediction'] = False

    # save merged result to S3 bucket
    s3_RCF_training_result = f"s3://s3-assetcare-bucket/features_store/evaluation/{tag_analog}_cluster-{cluster_nr}_{current_time}/"
    save_dataframe_to_s3_in_batches(df=df_origin, 
                                s3_path_prefix=s3_RCF_training_result, 
                                header = True)
    
    
    with mlflow.start_run(run_name=f"{tag_analog}_cluster-{cluster_nr}"):
        mlflow.log_param("Analog Tag name", tag_analog)
        mlflow.log_param("Cluster Nr", cluster_nr)
        mlflow.log_param("Model loading from", model_path)
        mlflow.log_param("Dataset path: S3 Input Scored", s3_input_scored)
        mlflow.log_param("Dataset path: S3 Input Evaluation", s3_input_evaluation)
        mlflow.log_param("Input File list", file_list)
        mlflow.log_param("Output File list", output_file_list)
        mlflow.log_param("Default Threshold", default_threshold)
        mlflow.log_param("Dataset: Output Training path", s3_RCF_training_result)
        # mlflow.log_param("", )


INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating model with name: randomcutforest-2025-05-11-15-26-40-129
INFO:sagemaker:Creating transform job with name: randomcutforest-2025-05-11-15-26-40-919


................................[34mDocker entrypoint called with argument(s): serve[0m
[34mRunning default environment configuration script[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
  if cons['type'] is 'ineq':[0m
  if len(self.X_min) is not 0:[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loaded entry point class algorithm.serve.server_config:config_api[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loading entry points[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] Loaded iterator creator application/x-recordio-protobuf for content type ('application/x-recordio-protobuf', '1.0')[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loaded request iterator application/json[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loaded request iterator application/jsonlines[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loaded request iterator application/x-recordio-protobuf[0m
[34m[05/11/2025 15:32:08 INFO 139656453080896] loaded request iterator te

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating model with name: randomcutforest-2025-05-11-15-32-36-541
INFO:sagemaker:Creating transform job with name: randomcutforest-2025-05-11-15-32-37-149


..................................[34mDocker entrypoint called with argument(s): serve[0m
[34mRunning default environment configuration script[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
  if cons['type'] is 'ineq':[0m
  if len(self.X_min) is not 0:[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loaded entry point class algorithm.serve.server_config:config_api[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loading entry points[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] Loaded iterator creator application/x-recordio-protobuf for content type ('application/x-recordio-protobuf', '1.0')[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loaded request iterator application/json[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loaded request iterator application/jsonlines[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loaded request iterator application/x-recordio-protobuf[0m
[34m[05/11/2025 15:38:21 INFO 140176880023360] loaded request iterator 