### Fault Detection using Z-score

In [17]:
import numpy as np

def detect_faults(time_series_data, threshold=3):
    """
    Detect faults in time series data using Z-score anomaly detection.
    
    Args:
        time_series_data (list or numpy array): The time series data.
        threshold (float): The threshold value to determine anomalies.
    
    Returns:
        list: A list of tuples containing the index and value of detected anomalies.
    """
    # Convert data to numpy array
    data = np.array(time_series_data)
    
    # Calculate mean and standard deviation
    mean = np.mean(data)
    std = np.std(data)
    
    # Calculate Z-scores for each data point
    z_scores = (data - mean) / std
    
    # Find anomalies based on threshold
    anomalies = [(i, value) for i, value in enumerate(time_series_data) if abs(z_scores[i]) > threshold]
    
    return anomalies

# Example usage
time_series = [1, 2, 3, 4, 5, 50, 6, 7, 8, 9]
detected_faults = detect_faults(time_series, threshold=2)

if len(detected_faults) > 0:
    print("Detected faults:")
    for index, value in detected_faults:
        print(f"Index: {index}, Value: {value}")
else:
    print("No faults detected.")
print(detected_faults)

Detected faults:
Index: 5, Value: 50
[(5, 50)]


In [18]:
time_series_data = [1, 2, 3, 4, 5, 50, 6, 7, 8, 9]
threshold = 3
# Convert data to numpy array
data = np.array(time_series_data)
# Calculate mean and standard deviation
mean = np.mean(data)
std = np.std(data)

# Calculate Z-scores for each data point
z_scores = (data - mean) / std
# Find anomalies based on threshold
anomalies = [(i, value) for i, value in enumerate(time_series_data) if abs(z_scores[i]) > threshold]
print(f'The value of mean is {mean}')
print(f'The value of std is {std}')
print(z_scores)
print(anomalies)


The value of mean is 9.5
The value of std is 13.720422734012244
[-0.61951444 -0.54663039 -0.47374634 -0.40086228 -0.32797823  2.95180409
 -0.25509418 -0.18221013 -0.10932608 -0.03644203]
[]


### Fault detection using LSTM

In [19]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

# Load and preprocess the dataset
data = pd.read_csv('shampoo.csv', index_col=0, header=0, parse_dates=True)  # Replace with your dataset
values = data['value'].values.reshape(-1, 1)  # Assuming 'value' is the column containing data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_values = scaler.fit_transform(values)

# Define sequence length and split into input/output sequences
sequence_length = 10
X = []
y = []
for i in range(len(scaled_values) - sequence_length):
    X.append(scaled_values[i:i+sequence_length])
    y.append(scaled_values[i+sequence_length])
X = np.array(X)
y = np.array(y)

# Split the data into training and testing sets
train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# Build the LSTM model
model = Sequential()
model.add(LSTM(64, input_shape=(sequence_length, 1)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')

# Train the model
model.fit(X_train, y_train, epochs=10, batch_size=32)

# Evaluate the model
train_loss = model.evaluate(X_train, y_train, verbose=0)
test_loss = model.evaluate(X_test, y_test, verbose=0)
print(f'Train loss: {train_loss:.4f}')
print(f'Test loss: {test_loss:.4f}')

# Make predictions on new data
new_data = pd.read_csv('shampoo-test.csv', index_col=0, header=0, parse_dates=True)  # Replace with your new dataset
new_values = new_data['value'].values.reshape(-1, 1)
new_scaled_values = scaler.transform(new_values)
X_new = []
for i in range(len(new_scaled_values) - sequence_length):
    X_new.append(new_scaled_values[i:i+sequence_length])
X_new = np.array(X_new)
predictions = model.predict(X_new)

# Calculate prediction errors
errors = np.abs(predictions - X_new[:, -1])

# Define a threshold for fault detection
threshold = 0.1  # Adjust based on your requirements

# Classify data points as normal or faulty based on the threshold
classifications = ['Normal' if error <= threshold else 'Faulty' for error in errors]
print(len(predictions))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train loss: 0.0519
Test loss: 0.2071
2


In [20]:
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

# Simulated time series data with faults and anomalies
time_series_data = np.array([1.2, 1.3, 1.5, 1.2, 1.4, 100.0, 1.3, 1.2, 1.4, 1.3])

# Fault detection using Isolation Forest
isolation_forest = IsolationForest(contamination='auto')
isolation_forest.fit(time_series_data.reshape(-1, 1))
fault_predictions = isolation_forest.predict(time_series_data.reshape(-1, 1))

# Anomaly detection using One-Class SVM
# one_class_svm = OneClassSVM(nu='auto')
# one_class_svm.fit(time_series_data.reshape(-1, 1))
# anomaly_predictions = one_class_svm.predict(time_series_data.reshape(-1, 1))

# Print the fault and anomaly predictions
print("Fault Predictions:")
print(fault_predictions)
# print("Anomaly Predictions:")
# print(anomaly_predictions)


Fault Predictions:
[ 1  1 -1  1  1 -1  1  1  1  1]


In [21]:
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

# Simulated time series data with faults and anomalies
time_series_data = [1.2, 1.3, 1.5, 1.2, 1.4, 100.0, 1.3, 1.2, 1.4, 1.3]

# Convert time series data to a numpy array
time_series_data = np.array(time_series_data)

# Fault detection using Isolation Forest
isolation_forest = IsolationForest(contamination='auto')
isolation_forest.fit(time_series_data.reshape(-1, 1))
fault_predictions = isolation_forest.predict(time_series_data.reshape(-1, 1))

# # Anomaly detection using One-Class SVM
# one_class_svm = OneClassSVM(nu='auto')
# one_class_svm.fit(time_series_data.reshape(-1, 1))
# anomaly_scores = one_class_svm.score_samples(time_series_data.reshape(-1, 1))

# # Convert anomaly scores to binary predictions
# threshold = 0  # Adjust threshold as needed
# anomaly_predictions = np.where(anomaly_scores < threshold, -1, 1)

# Print the fault and anomaly predictions
print("Fault Predictions:")
print(fault_predictions)
print("Anomaly Predictions:")
# print(anomaly_predictions)


Fault Predictions:
[ 1  1 -1  1  1 -1  1  1  1  1]
Anomaly Predictions:


In [22]:
data1 = [1,2,3,4,5,6,7,8,9,10]
data2 = [2*i for i in data1]
print(data2)

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


In [23]:
from statsmodels.tools.eval_measures import rmse
rmse_val = rmse(data1, data2)
print(rmse_val)

6.2048368229954285


In [24]:
def root_mse(x, y):
    if len(x) != len(y):
        return "Error: The two arguments must have the same length"
    mse = np.square(np.subtract(x, y)).mean()
    return np.sqrt(mse)

print(root_mse(data1, data2))

6.2048368229954285


In [25]:
bytes(4)

b'\x00\x00\x00\x00'

### CUSUM ALGORITHM

In [63]:
pd.Index(list(range(1,21)))

Int64Index([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
            20],
           dtype='int64')

In [57]:
import numpy as np

def cusum(data, target, threshold):
    cumulative_sum = np.zeros_like(data)  # Initialize cumulative sum array
    change_points = []  # List to store detected change points
    
    for i in range(1, len(data)):
#         deviation = data[i] - target  # Calculate deviation from the target value
        deviation = data[i] - data[i-1]  # Calculate deviation from the target value
        cumulative_sum[i] = max(0, cumulative_sum[i-1] + deviation - threshold)  # Update cumulative sum
        
        if cumulative_sum[i] >= threshold:
            change_points.append(i)  # Store the index of detected change point
    
    return change_points

# Example usage
data = [10, 9, 11, 12, 8, 9, 7, 13, 14, 9, 8, 7]
target = 10  # Reference value
threshold = 1  # Threshold for detecting significant deviations

change_points = cusum(data, target, threshold)
print("Change points detected at indices:", change_points)


Change points detected at indices: [2, 3, 7, 8]


In [27]:
dt = [10, 9, 11, 12, 8, 9, 7, 13, 14, 9, 8, 7]



[0 0 0 0 0]


In [29]:
len(data)

12

### Investigate normalisation of different dataset with the same mean and std

In [69]:
def normalise(data):
    means = np.mean(data, axis=0)
    stds = np.std(data, axis=0)
    normalised = (data - means)/stds
    return pd.DataFrame(normalised), means, stds

def normalise2(data, means, stds):
    return pd.DataFrame((data - means)/stds)

data1 = [1,2,3,4,5,6,7,8,9,10]
norm1 = normalise(data1)
norm1[0]

Unnamed: 0,0
0,-1.566699
1,-1.218544
2,-0.870388
3,-0.522233
4,-0.174078
5,0.174078
6,0.522233
7,0.870388
8,1.218544
9,1.566699


In [78]:
m = norm1[1]
s = norm1[2]
data2 = [1,2,3,4,5]
normalise2(data2, m, s)

Unnamed: 0,0
0,-1.566699
1,-1.218544
2,-0.870388
3,-0.522233
4,-0.174078


In [73]:
norm1[2]

2.8722813232690143

In [2]:
my_List = [10,20,30,40,50]
result = [val/2 for val in my_List]
print(result)

[5.0, 10.0, 15.0, 20.0, 25.0]


In [3]:
my_List + my_List

[10, 20, 30, 40, 50, 10, 20, 30, 40, 50]

In [5]:
ans = [2*i for i in zip(my_List, my_List)]
ans

[(10, 10, 10, 10),
 (20, 20, 20, 20),
 (30, 30, 30, 30),
 (40, 40, 40, 40),
 (50, 50, 50, 50)]