In [1]:
from base import LocalGateway, base_logger, PeriodicTrigger, BaseEventFabric, ExampleEventFabric

from influxdb_client import InfluxDBClient
from fastapi import FastAPI, Request
from datetime import datetime, timedelta
from minio import Minio
from io import BytesIO
import pandas as pd

# -------------------------- CONFIGURATION --------------------------

# InfluxDB credentials and database details
INFLUX_TOKEN = "http://192.168.1.132:8086"
INFLUX_ORG = "wise2024"
INFLUX_USER = "admin"
INFLUX_PASS = "secure_influx_iot_user"

VIZ_COMPONENT_URL = "http://192.168.1.132:9000"
SIF_SCHEDULER = ("SCH_SERVICE_NAME", "192.168.1.132:30032")

# Minio credentials and database details
MINIO_ENDPOINT = "192.168.1.132:9090"
MINIO_ACCESS_KEY = "peUyeVUBhKS7DvpFZgJu"
MINIO_SECRET_KEY = "J5VLWMfzNXBnhrm1kKHmO7DRbnU5XzqUO1iKWJfi"
MINIO_BUCKET = "models"

# Influx Buckets
BUCKETS = ["1_2_2", "1_2_7", "1_3_10", "1_3_11", "1_3_14", "1_4_12", "1_4_13"]  
BUCKETS_PIR = ["1_2_2", "1_3_11", "1_3_14", "1_4_13"]
BUCKET_CORRIDOR = "1_2_2"
BUCKET_BATHROOM = "1_3_11"
BUCKET_DOOR = "1_3_14"
BUCKET_BED = "1_4_13"

pir_buckets = {
        "1_2_2": "corridor",
        "1_3_11": "bathroom",
        "1_3_14": "door",
        "1_4_13": "bed",
    }

# ------------------------ INFLUXDB FUNCTIONS ------------------------

# Fetch data (inspired by the sif-viz-component fetch data structure)
def fetch_data(bucket, measurement, field):

    with InfluxDBClient(
        url=INFLUX_TOKEN, 
        org=INFLUX_ORG, 
        username=INFLUX_USER, 
        password=INFLUX_PASS, 
        verify_ssl=False) as client:
            p = {
                "_start": timedelta(days=-50),  # "fetch data starting from 7 days ago".
            }

            query_api = client.query_api()
            tables = query_api.query(f'''
                                    from(bucket: "{bucket}") |> range(start: _start)
                                    |> filter(fn: (r) => r["_measurement"] == "{measurement}")
                                    |> filter(fn: (r) => r["_type"] == "{"sensor-value"}")
                                    |> filter(fn: (r) => r["_field"] == "{field}")
                                    ''', params=p)          
            obj = []
            
            base_logger.info(tables)
            for table in tables:
                for record in table.records:
                    val = {}
                    base_logger.info(record)
                    val["bucket"] = bucket
                    val["timestamp"] = record["_time"].timestamp() * 1000
                    val["value"] = record["_value"]
                    if len(val.keys()) != 0:
                        obj.append(val)

            return obj

In [2]:
def pandas_data():   #fetch_influx_data():
    influx_data = []  #all_fata
    for bucket in BUCKETS_PIR:
        if bucket == BUCKET_DOOR:
            data = fetch_data(bucket, "door", "roomID")
        else:
            data = fetch_data(bucket, "PIR", "roomID")
        influx_data.extend(data)
    
    df = pd.DataFrame(influx_data)
    base_logger.info(f"Original data shape: {df.shape}")

    df["timestamp"] = pd.to_datetime(df["timestamp"], unit='ms')
    df = df.sort_values('timestamp')

    return df

In [3]:
fetched = fetch_data(BUCKET_BATHROOM, "PIR", "roomID")
fetched2 = fetch_data(BUCKET_CORRIDOR, "PIR", "roomID")
fetched3 = fetch_data(BUCKET_BED, "PIR", "roomID")
doorfetch = fetch_data(BUCKET_DOOR, "door", "roomID2")

# Merge all fetched data
all_fetched = fetched + fetched2 + fetched3 + doorfetch

# Create a pandas dataframe
df = pd.DataFrame(all_fetched)



ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x000001423EF2FA40>, 'Connection to 131.159.85.238 timed out. (connect timeout=10.0)')

In [73]:
df

Unnamed: 0,bucket,timestamp,value
0,1_3_11,1.732448e+12,corridor
1,1_3_11,1.732448e+12,corridor
2,1_3_11,1.732449e+12,corridor
3,1_3_11,1.732449e+12,corridor
4,1_3_11,1.732449e+12,corridor
...,...,...,...
1265,1_3_14,1.732502e+12,entrance
1266,1_3_14,1.732502e+12,entrance
1267,1_3_14,1.732502e+12,entrance
1268,1_3_14,1.732502e+12,entrance


In [62]:
df_sorted = df.sort_values(by='timestamp')
df_sorted['human_readable_time'] = pd.to_datetime(df_sorted['timestamp'], unit='ms')
print(df_sorted.head(40))


KeyError: 'timestamp'

In [57]:
# Calculate the time differences between consecutive rows
df_sorted['time_diff'] = df_sorted['timestamp'].diff()

# Filter out rows where the value changes
df_filtered = df_sorted[df_sorted['value'] != df_sorted['value'].shift()]

# Group by the value column and calculate the mean and standard deviation of the time differences
time_stats = df_filtered.groupby('value')['time_diff'].agg(['mean', 'std'])

print(time_stats)


                  mean            std
value                                
corridor  37394.497021  734453.310073
entrance  17937.270019  114856.892581


In [44]:

def save_model_minio(model):
    # Minio Initialize Client
    client = Minio(
        endpoint=MINIO_ENDPOINT, 
        access_key=MINIO_ACCESS_KEY, 
        secret_key=MINIO_SECRET_KEY, 
        secure=False
    )
    base_logger.info("Initialized MinIO client successfully.")

    found = client.bucket_exists(MINIO_BUCKET)
    if not found:
        client.make_bucket(MINIO_BUCKET)
        base_logger.info(f"Bucket '{MINIO_BUCKET}' created.")
    else:
        base_logger.info(f"Bucket '{MINIO_BUCKET}' already exists.")

    # Timestamp object 
    current_time = datetime.now().strftime("%d-%m-%y_%H-%M-%S")
    data = pd.DataFrame.to_json(model).encode("utf-8")
    object_name = f"model_{current_time}.json"

    # Save model to Minio
    try:
        client.put_object(
            bucket_name=MINIO_BUCKET,
            object_name=object_name,
            data=BytesIO(data),
            length=len(data),
            content_type="application/json",
            metadata={'time': current_time}
        )
        base_logger.info(f"Model saved to MinIO as '{object_name}' in bucket '{MINIO_BUCKET}'.")

        # Save the version of the model to a text file
        version_file_name = "latest_model_version.txt"
        client.put_object(
            bucket_name=MINIO_BUCKET,
            object_name=version_file_name,
            data=BytesIO(object_name.encode("utf-8")),
            length=len(object_name),
            content_type="text/plain"
        )
        base_logger.info(f"Model version saved to MinIO as '{version_file_name}' in bucket '{MINIO_BUCKET}'.")

    except Exception as e:
        base_logger.error(f"Error storing model to Minio: {e}")
    base_logger.info(f"Stored model_{current_time}.json to Minio")

def load_latest_model_minio():
    # Minio Initialize Client
    client = Minio(
        endpoint=MINIO_ENDPOINT, 
        access_key=MINIO_ACCESS_KEY, 
        secret_key=MINIO_SECRET_KEY, 
        secure=False
    )
    base_logger.info("Initialized MinIO client successfully.")

    try:
        # Get the latest model version
        version_file_name = "latest_model_version.txt"
        response = client.get_object(MINIO_BUCKET, version_file_name)
        latest_model_name = response.data.decode("utf-8")
        response.close()
        response.release_conn()
        base_logger.info(f"Latest model version: {latest_model_name}")

        # Load the latest model
        response = client.get_object(MINIO_BUCKET, latest_model_name)
        model_data = response.data.decode("utf-8")
        response.close()
        response.release_conn()
        model = pd.read_json(BytesIO(model_data.encode("utf-8")))
        base_logger.info(f"Loaded model from MinIO: {latest_model_name}")

        return model

    except Exception as e:
        base_logger.error(f"Error loading model from Minio: {e}")
        return None
                   

In [58]:
save_model_minio(time_stats)

In [59]:
load_latest_model_minio()

Unnamed: 0,mean,std
corridor,37394.497021,734453.310073
entrance,17937.270019,114856.892581
