In [1]:
# imports
import numpy as np
import json

# modelling
from sklearn.ensemble import IsolationForest
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, precision_recall_curve, accuracy_score, average_precision_score, auc
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.utils import shuffle

# Tree Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# misc
import os

In [2]:
# function to load and preprocess quickdraw data
def load_quickdraw_data(dir, category_name, n_samples=1000):
    file = f'full_numpy_bitmap_{category_name}.npy'
    data = np.load(dir + file)
    if n_samples == -1:
        return data
    else:
        indices = np.random.choice(len(data), n_samples, replace=False)
        sampled_data = data[indices]
        return sampled_data

In [3]:
def data_generator(data, batch_size=512):
    n_samples = len(data)
    for i in range(0, n_samples, batch_size):
        batch = data[i:i + batch_size]
        yield batch

In [4]:
files = os.listdir(r'..\data\\')
categories = [file.split('_')[-1].split('.')[0] for file in files]

In [5]:
# Directory where data files are stored
dir = '../data/'

# Load and preprocess data
all_data = []
labels = []

In [6]:
for category_name in categories:
    category_data = load_quickdraw_data(dir, category_name, 100) # Change this to -1 for all data
    all_data.extend(category_data)
    labels.extend([category_name] * len(category_data))

In [7]:
# split the data into training and test
x_train, x_test, y_train, y_test = train_test_split(all_data, labels, test_size=0.2, random_state=42)

In [8]:
print(len(all_data))

34500


In [9]:
# normalize data
x_train = np.array(x_train).astype(np.float32) / 255
x_test = np.array(x_test).astype(np.float32) / 255

In [10]:
# Add Outliers
num_outliers = 500

# Generate random outliers
outliers = np.random.rand(num_outliers, 784)  # Assuming your data has 784 features

In [11]:
# Append outliers to your training data
x_train_with_outliers = np.vstack((x_train, outliers))

# Create labels for the outliers (e.g., label them as "outlier" or use a different category)
y_train_with_outliers = y_train + ['outlier'] * num_outliers

In [12]:
# Train Isolation Forest with outliers
model = IsolationForest(contamination=0.02, random_state=42)

train_data_generator = data_generator(x_train_with_outliers)

for batch in train_data_generator:
    model.fit(batch)

In [13]:
# Predict labels for x_train_with_outliers to check if outliers are catched
y_pred_train = np.array([])  # Initialize an empty array to store predictions

for batch in data_generator(x_train_with_outliers):
    batch_pred = model.predict(batch)
    y_pred_train = np.concatenate([y_pred_train, batch_pred])

# Map the predictions: 1 for outliers, 0 for inliers
y_pred_train[y_pred_train == 1] = 0  # Inliers
y_pred_train[y_pred_train == -1] = 1  # Outliers

y_train_numeric = [1 if label == 'outlier' else 0 for label in y_train_with_outliers]

In [14]:
# Evaluate the model's performance on training data
precision_train = precision_score(y_train_numeric, y_pred_train)
recall_train = recall_score(y_train_numeric, y_pred_train)
f1_train = f1_score(y_train_numeric, y_pred_train)

print("Training Data Classification Report:")
print(classification_report(y_train_numeric, y_pred_train, target_names=["inlier", "outlier"]))
print("Training Data Precision:", precision_train)
print("Training Data Recall:", recall_train)
print("Training Data F1 Score:", f1_train)

Training Data Classification Report:
              precision    recall  f1-score   support

      inlier       0.00      0.00      0.00     27600
     outlier       0.00      0.03      0.00       500

    accuracy                           0.00     28100
   macro avg       0.00      0.01      0.00     28100
weighted avg       0.00      0.00      0.00     28100

Training Data Precision: 0.0005069892083725647
Training Data Recall: 0.028
Training Data F1 Score: 0.0009959450807426906


In [15]:
# Predict anomalies using data generator
y_pred = []
for batch in data_generator(x_test):
    # Predict anomalies
    batch_pred = model.fit_predict(batch)
    y_pred.extend(batch_pred)

In [16]:
# Anomalies are predicted as -1, so we will save their indexes
anomaly_indexes = np.where(y_pred == -1)[0]

In [17]:
# Adjust the prediction labels for anomalies
y_pred_adjusted = [-1 if pred == -1 else 1 for pred in y_pred]

y_true = [1 if label == 'outlier' else -1 for label in y_test]

In [18]:
precision = precision_score(y_true, y_pred_adjusted)
recall = recall_score(y_true, y_pred_adjusted)
f1 = f1_score(y_true, y_pred_adjusted)
report = classification_report(y_true, y_pred_adjusted)

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
# Print the number of anomalies and accuracy
print("Number of anomalies:", len(anomaly_indexes))
print("Accuracy:", accuracy_score(y_test, (y_pred_adjusted == 1)))

In [None]:
# Print the indexes of anomalies
print("Indexes of Anomalies:")
print(anomaly_indexes)

In [None]:
# Create a classification report
report = classification_report(y_test, (y_pred_adjusted == -1), target_names=[str(i) for i in range(10)])
print("\nClassification Report:\n", report)

In [None]:
# Calculate advanced evaluation metrics
anomaly_scores = model.decision_function(x_test)
y_pred_binary = np.where(y_pred_adjusted == -1, 1, -1)

In [None]:
# Calculate precision, recall, and F1-score for anomaly detection
precision = precision_score(y_test, (y_pred_adjusted == -1), average='weighted')
recall = recall_score(y_test, (y_pred_adjusted == -1), average='weighted')
f1 = f1_score(y_test, (y_pred_adjusted == -1), average='weighted')

# Reshape the anomaly_scores to a 2D array
anomaly_scores = anomaly_scores.reshape(-1, 1)

# Calculate average precision (AUC-PR) for anomaly detection
average_precision = average_precision_score(y_test, anomaly_scores)

In [None]:
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)

print("Average Precision (AUC-PR):", average_precision)

In [None]:
# Get anomaly labels
anomaly_labels = y_test.iloc[anomaly_indexes]

# create a dict of indexes and labels
anomaly_dict = dict(zip(map(int, anomaly_indexes), anomaly_labels))

In [None]:
# write to json file
json_filename = "anomalies.json"
with open(json_filename, 'w') as json_file:
    json.dump(anomaly_dict, json_file)

print(f"Anomaly dictionary saved to {json_filename}")

In [None]:
# Reshape the test data for plotting
x_test_reshaped = x_test.values.reshape(-1, 28, 28)

In [None]:
# Plot some of the anomalies
plt.figure(figsize=(12, 8))
for i, idx in enumerate(anomaly_indexes[:10]):  # Plot the first 10 anomalies
    plt.subplot(2, 5, i + 1)
    plt.imshow(x_test_reshaped[idx], cmap='gray')
    plt.title(f'Anomaly {i+1}')
    plt.axis('off')
plt.tight_layout()
plt.show()

In [None]:
# Calculate precision-recall curve
precision, recall, _ = precision_recall_curve(y_pred_adjusted, anomaly_scores)

# Calculate area under the precision-recall curve
pr_auc = auc(recall, precision)

In [None]:
# Plot precision-recall curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='darkorange', lw=2, label='PR Curve (area = %0.2f)' % pr_auc)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Create a heatmap of anomalies
plt.figure(figsize=(12, 8))
sns.heatmap(y_pred_adjusted.reshape(-1, 28), cmap='coolwarm', cbar=False)
plt.title('Anomaly Detection Heatmap')
plt.xlabel('Pixel Column')
plt.ylabel('Pixel Row')
plt.show()

In [None]:
# Create a box plot of anomaly scores
plt.figure(figsize=(8, 6))
plt.boxplot(anomaly_scores, vert=False)
plt.title('Box Plot of Anomaly Scores')
plt.show()