In [1]:
from LSTM_model import LSTM
import numpy as np
import random
import torch
import time
from datetime import timedelta, datetime
from collections import deque
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import ASYNCHRONOUS
from scipy.stats import pearsonr

In [3]:
# Setting random seed for reproducibility
torch.manual_seed(140)
np.random.seed(140)
random.seed(140)

In [5]:
# Functions for RePAD2
def calculate_aare(actual, predicted):
    """
    Calculate the Absolute Relative Error (ARE) between an actual and predicted value.
    
    Parameters:
    actual (deque): The actual value.
    predicted (deque): The predicted value.
    
    Returns:
    float: The Absolute Relative Error.
    """
    # Adding a small value epsilon to avoid division by zero
    #epsilon = 1e-10
    aare_values = []
    
    for act, pred in zip(actual, predicted):
        AARE = abs(act - pred) / max(abs(act), 1)
        aare_values.append(AARE)

    mean_aare = np.mean(aare_values)

    return mean_aare


def calculate_threshold(aare_values):
    """
    Calculate the threshold value (Thd) based on a deque of AARE values.
    Thd is defined as the mean of the AARE values plus three times their standard deviation.

    Parameters:
    - aare_values (array-like): An array of AARE values.

    Returns:
    - float: The calculated threshold value (Thd).
    """
    # Calculate the mean and standard deviation of the AARE values
    mean_aare = np.mean(aare_values)
    std_aare = np.std(aare_values)
    
    # Calculate Thd
    thd = mean_aare + 3 * std_aare
    
    return thd

# Function for creating and training model
def train_model(train_events):
    tensor_y = torch.tensor(train_events, dtype=torch.float32).view(-1, 1, 1)
    tensor_x = torch.tensor([1, 2, 3], dtype=torch.float32).view(-1, 1, 1)
    # Create an instance of the LSTM model
    model = LSTM(tensor_x, tensor_y, input_size=1, hidden_size=10, num_layers=1, output_size=1, num_epochs=50, learning_rate=0.005)
    
    model.train_model() # Train the model

    return model

# Function for reporting anomalies to InfluxDB
def report_anomaly(T, timestamp, actual_value, predicted_value, write_api):
    """
    Sends an anomalous event back to InfluxDB, storing it in the "anomaly" measurement
    with both the same value and time as the original event.

    Parameters:
    - anomalous_event: The event data that was detected as an anomaly, including its value and timestamp.
    """

    point = Point("base_detection-C53")\
        .tag("host", "detector")\
        .field("T", float(T))\
        .field("actual_value", float(actual_value))\
        .field("predicted_value", float(predicted_value))\
        .time(timestamp, WritePrecision.NS)
    
    #write_api.write(bucket="anomalies", org="ORG", record=point)
    #print(f"Anomalous event sent to InfluxDB: Value={actual_value}, Time={timestamp}")

def write_result(timestamp, T, actual_value, predicted_value, AARE, Thd, write_api):
    """
    Sends an anomalous event back to InfluxDB, storing it in the "anomaly" measurement
    with both the same value and time as the original event.

    Parameters:
    - anomalous_event: The event data that was detected as an anomaly, including its value and timestamp.
    """

    point = Point("base_result-C53")\
        .tag("host", "detector")\
        .field("T", float(T))\
        .field("actual_value", float(actual_value))\
        .field("predicted_value", float(predicted_value))\
        .field("AARE", float(AARE))\
        .field("Thd", float(Thd))\
        .time(timestamp, WritePrecision.NS)
    
    write_api.write(bucket="anomalies", org="ORG", record=point)
    print(f'T: {T}, Real Value: {actual_value}, Prediction Value: {predicted_value}, AARE: {AARE}, Thd: {Thd}')

In [7]:
# Functions for RoLA
# ==================

def to_rfc3339(timestamp_str):
	# This function converts a timestamp to RFC3339 format
	dt = datetime.fromisoformat(timestamp_str)  # Parse input timestamp
	return dt.strftime('%Y-%m-%dT%H:%M:%SZ')	# Convert to RFC3339 format



def get_previous_values(bucket, measurement, timestamp, num_values, org, url, token, username, password):
	"""
	This function queries the last "num_values" of a single "measurement" before "timestamp" from 
	InfluxDB multi-dimensional dataset in order to compute the correlation coefficient.
	If values before the time stamp are less than "num_values" it gets all previous values.

	Parameters:
	===========
	- bucket (str): 		InfluxDB bucket name.
	- measurement (str):	The variable name to extract.
	- timestamp (str): 		The reference timestamp in RFC3339 format (e.g., "2024-03-20T00:00:00Z").
	- num_values (int): 	The number of values (p) to extract.
	- org (str): 			InfluxDB organization name.
	- url (str): 			InfluxDB server URL.
	- token (str): 			Authentication token.
	- username (str):		Authentication user name.
	- password (str):		Authentication password.

	Returns:
	- List of extracted values for the given variable.
	"""
	
	client = InfluxDBClient(url=url, token=token, org=org, username=username, password=password)
	query_api = client.query_api()
	formatted_timestamp = to_rfc3339(str(timestamp))

	# Construct the Flux query to extract one variable's values from a multi-dimensional dataset
	query = f'''
	from(bucket: "{bucket}")
	  |> range(start:  time(v:"2021-10-28T00:00:00Z")) // the earliest timestamp
	  |> filter(fn: (r) => r["_field"] == "{measurement}")
	  |> filter(fn: (r) => r["_time"] <= time(v: "{formatted_timestamp}"))  // Before the timestamp
	  |> sort(columns: ["_time"], desc: true)
	  |> limit(n: {num_values})  // Extract up to num_values
		'''

	# Execute query
	results = query_api.query(query=query, org=org)

	# Extract values
	values = [record.get_value() for table in results for record in table.records]

	#print(f"Extracted values of '{measurement}' before {timestamp}:")
	return values


def fetch_previous_values(var, timestamp, num_values):
	"""Fetches historical values for a given variable."""
	try:
		return get_previous_values(
			bucket=bucket, measurement=var, timestamp=timestamp, num_values=num_values,
			org=org, url=influxdb_url, token=token, username=username, password=password
		)
	except Exception as e:
		print(f"Error fetching values for {var}: {e}")
		return []

def fetch_events():
	"""Fetches time-series events from InfluxDB."""
	query = f'''
		from(bucket: "{bucket}")
		|> range(start: time(v: "{start_time}"))
		|> filter(fn: (r) => r["_measurement"] == "{measurement}")
		|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
	'''
	try:
		return list(query_api.query_stream(org=org, query=query))
	except Exception as e:
		print(f"Error querying InfluxDB: {e}")
		return []
      
def is_anomaly(T, variable_name, state):
	"""
	Optimized LDA-based anomaly detection function.
	"""
	# Extract state variables
	variable_state = state[variable_name]
	batch_events = variable_state["batch_events"]
	next_event = variable_state["next_event"]
	M = variable_state["M"]
	flag = variable_state["flag"]
	actual_value = variable_state["actual_value"]
	predicted_value = variable_state["predicted_value"]
	sliding_window_AARE = variable_state["sliding_window_AARE"]

	if T < 2:
		return False

	# Initialize LDA
	if 2 <= T < 5:
		if T == 2:
			M = train_model(list(batch_events))
		else:
			M = train_model(list(batch_events)[1:])
		
		variable_state["M"] = M
		pred_D_T_plus_1 = M.predict_next()

		actual_value.append(next_event)
		predicted_value.append(pred_D_T_plus_1)

		return False

	elif 5 <= T < 7:
		AARE_T = calculate_aare(actual_value, predicted_value)
		sliding_window_AARE.append(AARE_T)

		M = train_model(list(batch_events)[1:])
		pred_D_T_plus_1 = M.predict_next()

		actual_value.append(next_event)
		predicted_value.append(pred_D_T_plus_1)

		variable_state["M"] = M
		return False

	elif T >= 7:
		if flag:
			if T != 7:
				pred_D_T = M.predict_next()
				actual_value.append(batch_events[-1])
				predicted_value.append(pred_D_T)

			AARE_T = calculate_aare(actual_value, predicted_value)
			sliding_window_AARE.append(AARE_T)

			# Calculate the threshold only once
			Thd = calculate_threshold(sliding_window_AARE)

			if AARE_T > Thd:
				model = train_model(list(batch_events)[0:-1])
				pred_D_T = model.predict_next()

				actual_value.append(batch_events[-1])
				predicted_value.append(pred_D_T)

				AARE_T = calculate_aare(actual_value, predicted_value)
				sliding_window_AARE.append(AARE_T)

				Thd = calculate_threshold(sliding_window_AARE)

				if AARE_T > Thd:
					flag = False
				else:
					variable_state["M"] = model
					flag = True
		else:
			model = train_model(list(batch_events)[0:-1])
			pred_D_T = model.predict_next()
			actual_value.append(batch_events[-1])
			predicted_value.append(pred_D_T)

			AARE_T = calculate_aare(actual_value, predicted_value)
			sliding_window_AARE.append(AARE_T)

			Thd = calculate_threshold(sliding_window_AARE)

			if AARE_T > Thd:
				flag = False
			else:
				variable_state["M"] = model
				flag = True

		return not flag




In [9]:
### RoLA Algorithm ###
		
# Setting up the InfluxDB to consume data
influxdb_url = "http://localhost:8086"
token = "random_token"
username = "influx-admin"
password = "ThisIsNotThePasswordYouAreLookingFor"
org = "ORG"
bucket = "system_state"
measurement = "multivariate_dataset"

# Instantiate InfluxDB client
client = InfluxDBClient(url=influxdb_url, token=token, org=org, username=username, password=password)
write_api = client.write_api()
query_api = client.query_api()

# Time Series Parameters
T = 0
p = 2880  # Number of previous values for correlation
thd_pos, thd_neg = 0.95, -0.95
poll_interval = 1  # Polling frequency in seconds
time_increment = 1  # Time step increment
start_time = "2021-10-28T00:00:00Z"

# Variable Names
variables = [
	"SEB45Salinity", "SEB45Conductivity", "OptodeConcentration", "OptodeSaturation",
	"C3Temperature", "FlowTemperature", "OptodeTemperature", "C3Turbidity", "FlowFlow"
]

# State Initialization
state = {
	var: {
		"batch_events": deque(maxlen=4),
		"next_event": deque(maxlen=1),
		"actual_value": deque([0] * 3, maxlen=3),
		"predicted_value": deque([0] * 3, maxlen=3),
		"sliding_window_AARE": deque(maxlen=8064),
		"M": None,
		"flag": True
	}
	for var in variables
}

# List to store processing times
processing_times = []
anomaly_timestamps = []
anomaly_variable_sets = []
anomaly_datapoints_sets = []



while True:

	events = fetch_events()
	if len(events) < 3:
		print("No sufficient events found.")
		time.sleep(poll_interval)
		continue

	timestamp = 0

	for i, event in enumerate(events):
		iteration_start_time = time.time()  # Start time of this iteration
		anomalies = set()  # Reset anomalies for each data point
		timestamp = event["_time"]

		# Process each variable in the event
		for var, value in event.values.items():
			if var in ["result", "table", "_start", "_stop", "_time", "_measurement", "host"]:
				continue

			state[var]["batch_events"].append(value)

			# Set next event for early iterations (0 ≤ T < 7)
			if i < 7:
				state[var]["next_event"] = events[i + 1][var]

			# Run anomaly detection
			if is_anomaly(T, var, state):
				anomalies.add(var)

		# Correlation & Polling Process
		if anomalies:
			correlated_vars = {a: fetch_previous_values(a, timestamp, p) for a in anomalies}

			for a in anomalies:
				C_agree, C_disagree = 1, 0
				L_var, L_data = [a], [event[a]]

				for b in variables:
					if b == a:
						continue

					a_values = correlated_vars.get(a, [])
					b_values = fetch_previous_values(b, timestamp, p)

					if not a_values or not b_values:
						continue

					try:
						E_ab, _ = pearsonr(a_values, b_values)
					except Exception:
						continue  # Handle potential computation errors

					if thd_neg <= E_ab <= thd_pos:
						continue

					if b in anomalies:
						C_agree += 1
						L_data.append(fetch_previous_values(b, timestamp, 1))
						L_var.append(b)
					else:
						C_disagree += 1

				if C_agree > C_disagree and (C_agree + C_disagree) > 1:
					anomaly_timestamps.append(timestamp)
					#anomaly_variable_sets.append(L_var)
					#anomaly_datapoints_sets.append(L_data)
					#print(f"====> T: {T} ===>  {timestamp}\nVariables: {L_var}\nData: {L_data}")

		# Increment T for the next data point
		T += 1
		iteration_end_time = time.time()

		# Compute and store processing time

		processing_time = iteration_end_time - iteration_start_time
		processing_times.append(processing_time)

		#print(f"Processed timestamp: {timestamp}, Processing time: {processing_time:.4f} seconds")

	# Compute mean and standard deviation every 10 iterations
	#if len(processing_times) % 10 == 0:
	mean_time = np.mean(processing_times)
	std_time = np.std(processing_times)
	print(f"\n\nMean Processing Time: {mean_time:.4f} sec, Std Dev: {std_time:.4f} sec")
	print(f"\n\nanomaly timestaps: \n {set(anomaly_timestamps)}")
		
	# Update start time for the next iteration
	start_time = (timestamp + timedelta(seconds=time_increment)).isoformat()

	time.sleep(poll_interval)



Mean Processing Time: 0.0511 sec, Std Dev: 0.4061 sec


anomaly timestaps: 
 {'2021-10-28T12:33:00Z', '2021-10-28T12:36:00Z', '2021-10-28T10:10:00Z', '2021-10-28T12:21:00Z', '2021-10-28T12:22:00Z', '2021-10-28T12:35:00Z', '2021-10-28T12:16:00Z', '2021-10-28T12:37:00Z', '2021-10-28T10:07:00Z', '2021-10-28T12:34:00Z', '2021-10-28T12:19:00Z', '2021-10-28T12:26:00Z', '2021-10-28T12:25:00Z', '2021-10-28T12:24:00Z', '2021-10-28T12:32:00Z', '2021-10-28T12:28:00Z', '2021-10-28T12:30:00Z', '2021-10-28T10:06:00Z', '2021-10-28T12:18:00Z', '2021-10-28T12:38:00Z', '2021-10-28T10:05:00Z', '2021-10-28T12:27:00Z', '2021-10-28T12:17:00Z', '2021-10-28T12:23:00Z', '2021-10-28T12:31:00Z', '2021-10-28T12:29:00Z', '2021-10-28T10:09:00Z', '2021-10-28T12:20:00Z', '2021-10-28T12:39:00Z', '2021-10-28T12:40:00Z', '2021-10-28T10:08:00Z'}
No sufficient events found.
No sufficient events found.
No sufficient events found.
No sufficient events found.
No sufficient events found.
No sufficient events found.
No suffi

KeyboardInterrupt: 