In [1]:
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WriteOptions
import stumpy
import numpy as np
import datetime as dt
import numpy.testing as npt
import requests
import paho.mqtt.client as mqtt
import json
import random
from datetime import datetime

# InfluxDB configuration
url = 'http://localhost:8880' 
token = 'gqFHvV2iH1VjEXIODhTneELHj8BWrGOsXWRVR7ARDAYUzjo3KQ9vQSnitxehJAONRq4MRLEklv7h8R0V71-l1Q=='
org = 'PT SATU'
bucket = 'IoTData'

# MQTT configuration for receiving messages
receive_broker_address = '103.175.218.182'
receive_port = 1883  # Default MQTT port
receive_topic = "/optmqtt/cycle/ESP32_Coba"
client_id = f'python-mqtt-{random.randint(0, 1000)}'

# MQTT configuration for sending messages
send_broker_address = 'mqtt.thingsboard.cloud'
send_port = 1883  # Default MQTT port
send_topic = "v1/devices/me/attributes"
acc_token = 'yR2psnIkjpXPP1tYY65P'
send_password = ''

# Referensi Data (Change here)
Q_df = pd.read_csv('ref_press2.csv')

In [4]:
def influxGather(start_time, end_time):
    client = InfluxDBClient(url=url, token=token, org=org)
    start = int(start_time)
    stop = int(end_time)
    
    query = f'from(bucket: "IoTData")\
      |> range(start: {start}, stop: {stop})\
      |> filter(fn: (r) => r["_measurement"] == "PanelSTR")\
      |> filter(fn: (r) => r["_field"] == "Timestamp" or r["_field"] == "Pressure5" or r["_field"] == "Pressure6" or r["_field"] == "Pressure7")'
    
    result = client.query_api().query(query, org=org)
    
    data = {"ts": [], "STR5": [], "STR6": [], "STR7": []}
    
    for table in result:
        for record in table.records:
            field_name = record.get_field()
            value = record.get_value()
            if field_name == "Timestamp":
                data["ts"].append(value)
            elif field_name == "Pressure5":
                data["STR5"].append(value)
            elif field_name == "Pressure6":
                data["STR6"].append(value)
            elif field_name == "Pressure7":
                data["STR7"].append(value)
    
  
    T_df = pd.DataFrame(data)
    return T_df

def cycleCount(Q_df, T_df):
    Q_df["Pressure (bar)"] = pd.to_numeric(Q_df["Pressure (bar)"], errors="coerce")
    T_df["STR5"] = pd.to_numeric(T_df["STR5"], errors="coerce")
    T_df["STR6"] = pd.to_numeric(T_df["STR6"], errors="coerce")
    T_df["STR7"] = pd.to_numeric(T_df["STR7"], errors="coerce")

    similarity_scores = {"STR5": [], "STR6": [], "STR7": []}

    Q_z_norm = stumpy.core.z_norm(Q_df["Pressure (bar)"].values)

    for field in ["STR5", "STR6", "STR7"]:
        matches = stumpy.match(Q_df["Pressure (bar)"], T_df[field])

        for match_distance, match_idx in matches:
            match_z_norm = stumpy.core.z_norm(T_df[field].values[match_idx:match_idx + len(Q_df)])
            cosine_similarity = np.dot(Q_z_norm, match_z_norm) / (np.linalg.norm(Q_z_norm) * np.linalg.norm(match_z_norm)) * 100
            match_idx = int(match_idx)
            cosine_similarity = float(cosine_similarity)
            cycle_start_ts = T_df.iloc[match_idx]["ts"]
            similarity_scores[field].append({"cycle": match_idx + 1, "ts": cycle_start_ts, "score": format(cosine_similarity, ".2f")})

    for field in similarity_scores:
        similarity_scores[field].sort(key=lambda x: x["cycle"])

        for i, score in enumerate(similarity_scores[field]):
            score["cycle"] = i + 1

    final_output = json.dumps(similarity_scores, indent=2)

    return final_output

def seqCalc(json_output):
    similarity_scores = json.loads(json_output)
    processed_data = []
    time_diff_list = []

    for field, scores in similarity_scores.items():
        for score in scores:
            processed_data.append({
                "Cycles": score["cycle"],
                "Score": score["score"],
                "ts": score["ts"],
                "Machine": field
            })

    processed_data.sort(key=lambda x: datetime.strptime(x["ts"], "%m/%d/%Y, %I:%M:%S %p"))

    for i in range(1, len(processed_data)):
        current_time = datetime.strptime(processed_data[i]["ts"], "%m/%d/%Y, %I:%M:%S %p")
        prev_time = datetime.strptime(processed_data[i - 1]["ts"], "%m/%d/%Y, %I:%M:%S %p")
        time_diff = (current_time - prev_time).total_seconds() / 60 
        time_diff = round(time_diff, 1)
        time_diff_list.append(time_diff)
        processed_data[i]["Sequence Time"] = time_diff

    if not np.isnan(time_diff_list).any():
        time_diff_list[-1] = np.nan

    mean_time = np.nanmean(time_diff_list)

    processed_data_json = json.dumps({"Callback": processed_data, "meanTime": mean_time})

    return processed_data_json

def on_message(client, userdata, msg):
    message = json.loads(msg.payload.decode())
    startTime = int(message["start"]) / 1000
    endTime = int(message["end"]) / 1000
    field_name = message["field"]
    print(message)
    data = influxGather(startTime, endTime)
    output = cycleCount(Q_df, data)
    fix_output = seqCalc(output)
    publish(fix_output)

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.subscribe(receive_topic)
    else:
        print(f"Failed to connect to MQTT broker A, return code: {rc}")

def on_publish(client, userdata, mid):
    pass

def on_disconnect(client, userdata, rc):
    pass

def publish(message):
    send_client = mqtt.Client("send")
    send_client.username_pw_set(acc_token)
    send_client.on_publish = on_publish
    
    # Define on_connect callback for send_client
    send_client.on_connect = on_connect_send

    # Attempt to connect and publish message
    send_client.connect(send_broker_address, send_port)

    # Define a function to handle sending the message once connected
    def send_message(client):
        client.publish(send_topic, message)
        client.disconnect()

    # Schedule the message to be sent once the client is connected
    send_client.on_connect = lambda client, userdata, flags, rc: send_message(client)

    # Start the loop to allow the on_connect_send callback to trigger
    send_client.loop_start()

def on_connect_send(client, userdata, flags, rc):
    pass  # This function is just a placeholder

client = mqtt.Client(client_id=f'python-mqtt-{random.randint(0, 1000)}')
client.on_message = on_message
client.on_connect = on_connect
client.connect(receive_broker_address, receive_port)

# Start loop to listen for messages from broker A
client.loop_forever()

{'start': '1709571600000', 'end': '1709599500000', 'field': 'Pressure5'}
{
  "STR5": [
    {
      "cycle": 1,
      "ts": "3/5/2024, 2:28:55 AM",
      "score": "96.76"
    },
    {
      "cycle": 2,
      "ts": "3/5/2024, 5:10:32 AM",
      "score": "97.81"
    }
  ],
  "STR6": [
    {
      "cycle": 1,
      "ts": "3/5/2024, 12:36:18 AM",
      "score": "94.95"
    },
    {
      "cycle": 2,
      "ts": "3/5/2024, 3:26:57 AM",
      "score": "97.63"
    }
  ],
  "STR7": [
    {
      "cycle": 1,
      "ts": "3/5/2024, 1:37:31 AM",
      "score": "95.52"
    },
    {
      "cycle": 2,
      "ts": "3/5/2024, 4:20:20 AM",
      "score": "96.41"
    }
  ]
}
{"Callback": [{"Cycles": 1, "Score": "94.95", "ts": "3/5/2024, 12:36:18 AM", "Machine": "STR6"}, {"Cycles": 1, "Score": "95.52", "ts": "3/5/2024, 1:37:31 AM", "Machine": "STR7", "Sequence Time": 61.2}, {"Cycles": 1, "Score": "96.76", "ts": "3/5/2024, 2:28:55 AM", "Machine": "STR5", "Sequence Time": 51.4}, {"Cycles": 2, "Score": "97.6

KeyboardInterrupt: 