In [1]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import BernoulliRBM
from sklearn.base import clone
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, mean_squared_error
from sklearn.preprocessing import StandardScaler
from scipy.spatial import distance


In [2]:
def norm(arr):
    arr = arr.astype(np.float)
    arr -= arr.min()
    arr /= arr.max()
    return arr

In [3]:
# load MNIST data set
mnist = load_digits()
X, Y = mnist.data, mnist.target

# normalize inputs to 0-1 range
X = norm(X)

# split into train, validation, and test data sets
X_train, X_test, Y_train, Y_test = train_test_split(X,       Y,       test_size=200, random_state=0)
labels = np.random.binomial(n=1, p=0.5, size=[1000]).reshape(1000,1)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  arr = arr.astype(np.float)


In [3]:
indices = ["0", "1", "10", "11", "12", "13"]
data_S = []
for index in indices:
    data = np.load(f"../data/data{index}.npy")
    data_S.append(data)

In [4]:
for data in data_S:
    print(data.shape)

(291, 275, 442)
(282, 274, 442)
(259, 275, 442)
(271, 278, 442)
(284, 278, 442)
(286, 278, 442)


In [None]:
target_width = 259
target_height = 259
train = []
for data in data_S:
    for i in range(442):
        data_point = data[:,:,i]
        current_height, current_width = data_point.shape

        # Calculate the starting points for cropping
        start_x = (current_width - target_width) // 2
        start_y = (current_height - target_height) // 2

        # Crop the image
        cropped_data = data_point[start_y:start_y+target_height, start_x:start_x+target_width]


        # Scale the 2D array
        #scaled_array = (cropped_data- np.min(cropped_data))/(np.max(cropped_data)-np.min(cropped_data))
        #scaled_array = norm(cropped_data)

        train.append(scaled_array)

train = np.array(train)
nsamples, nx, ny = train.shape
d2_train_dataset = train.reshape((nsamples,nx*ny))
d2_train_dataset = norm(d2_train_dataset)
train , test =  train_test_split(d2_train_dataset, test_size=200, random_state=0)

In [21]:
train.shape

(2452, 67081)

In [22]:
test.shape

(200, 67081)

In [23]:
# --------------------------------------------------------------------------------
# set hyperparameters

learning_rate = 0.001
total_units   =  750 
total_epochs  =   50 
batch_size    =  256 

# --------------------------------------------------------------------------------
# construct models

# RBM
rbm1 = BernoulliRBM(n_components=total_units, learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=2)
rbm2 = BernoulliRBM(n_components=int(total_units/2), learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=2)
rbm3 = BernoulliRBM(n_components=int(total_units/4), learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=2)
rbm4 = BernoulliRBM(n_components=int(total_units/6), learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=1)


models = []                       
model = Pipeline(steps=[('rbm1', clone(rbm1)), ('rbm2', clone(rbm2)),('rbm3', clone(rbm3)),('rbm4', clone(rbm4))])  # RBM stack / DBN
# --------------------------------------------------------------------------------
# train and evaluate models

model.fit(train)

[BernoulliRBM] Iteration 1, pseudo-likelihood = -42548.07, time = 10.74s
[BernoulliRBM] Iteration 2, pseudo-likelihood = -42626.84, time = 12.88s
[BernoulliRBM] Iteration 3, pseudo-likelihood = -36019.48, time = 12.89s
[BernoulliRBM] Iteration 4, pseudo-likelihood = -41847.18, time = 12.72s
[BernoulliRBM] Iteration 5, pseudo-likelihood = -42401.92, time = 12.80s
[BernoulliRBM] Iteration 6, pseudo-likelihood = -36838.59, time = 12.72s
[BernoulliRBM] Iteration 7, pseudo-likelihood = -36201.78, time = 12.76s
[BernoulliRBM] Iteration 8, pseudo-likelihood = -35728.20, time = 13.00s
[BernoulliRBM] Iteration 9, pseudo-likelihood = -35646.21, time = 13.03s
[BernoulliRBM] Iteration 10, pseudo-likelihood = -35397.33, time = 12.81s
[BernoulliRBM] Iteration 11, pseudo-likelihood = -33080.75, time = 12.67s
[BernoulliRBM] Iteration 12, pseudo-likelihood = -32990.95, time = 12.73s
[BernoulliRBM] Iteration 13, pseudo-likelihood = -32232.48, time = 12.91s
[BernoulliRBM] Iteration 14, pseudo-likelihood 

In [11]:
def sigmoid_activation(x):
    """
    Numerically stable sigmoid function with clipping.
    
    :param x: Input array.
    :return: Sigmoid of x.
    """
    # Clip x to avoid overflow in exp
    clipped_x = np.clip(x, -500, 500)

    return np.where(clipped_x >= 0, 
                    1 / (1 + np.exp(-clipped_x)), 
                    np.exp(clipped_x) / (1 + np.exp(clipped_x)))


def reconstruct_visible(rbm, hidden):
    """
    Reconstruct the visible units from the hidden units in an RBM.

    :param rbm: Trained instance of BernoulliRBM.
    :param hidden: Array of hidden units.
    :return: Reconstructed visible units.
    """
    # Compute the activation of the visible units
    v_activation = np.dot(hidden, rbm.components_) + rbm.intercept_visible_

    # Compute the probability of the visible units given the hidden units
    v_prob = sigmoid_activation(v_activation)

    # Sample from these probabilities to get the binary visible units
    # For BernoulliRBM, this step can be binary sampling or directly using probabilities
    v_reconstructed = np.random.binomial(1, v_prob)

    return v_reconstructed

In [26]:
hidden_features = model.transform(test)
# Initialize the reconstructed data with the hidden features
reconstructed_data = hidden_features

# Reconstruct the data from the top RBM to the bottom
for rbm in reversed(model.steps):
    reconstructed_data = reconstruct_visible(rbm[1], reconstructed_data)

In [27]:
# Flatten the original and reconstructed data if they are in 2D or 3D
original_data_flat = test.flatten()
reconstructed_data_flat = reconstructed_data.flatten()

# Compute the reconstruction error for each sample
reconstruction_errors = mean_squared_error(original_data_flat, reconstructed_data_flat)

In [28]:
reconstruction_errors

0.23991734846684787

In [31]:
transformed_features = model.transform(test)

# Calculate the centroid of the transformed features
centroid = np.mean(transformed_features, axis=0)

# Compute distances of each sample from the centroid
distances = distance.cdist(transformed_features, [centroid], 'euclidean').flatten()

# Determine a threshold for considering a point as an outlier
threshold = np.percentile(distances, 95)  # for example, top 5% as outliers

# Flag points as outliers
outliers = distances > threshold

truth = np.zeros(200)

def evaluate_outlier_predictions(predicted_outliers, true_outliers):
    TP = np.sum((predicted_outliers == 1) & (true_outliers == 1))  # True Positives
    TN = np.sum((predicted_outliers == 0) & (true_outliers == 0))  # True Negatives
    FP = np.sum((predicted_outliers == 1) & (true_outliers == 0))  # False Positives
    FN = np.sum((predicted_outliers == 0) & (true_outliers == 1))  # False Negatives

    print(f"Correct Outlier Predictions (True Positives): {TP}")
    print(f"Correct Non-Outlier Predictions (True Negatives): {TN}")
    print(f"Incorrect Outlier Predictions (False Positives): {FP}")
    print(f"Incorrect Non-Outlier Predictions (False Negatives): {FN}")

# Evaluate your model's performance
evaluate_outlier_predictions(outliers, truth)


Correct Outlier Predictions (True Positives): 0
Correct Non-Outlier Predictions (True Negatives): 200
Incorrect Outlier Predictions (False Positives): 0
Incorrect Non-Outlier Predictions (False Negatives): 0


: 

In [None]:
# --------------------------------------------------------------------------------
# set hyperparameters

total_units   =  259 
total_epochs  =   50 
batch_size    =  1 


# RBM
rbm1 = BernoulliRBM(n_components=total_units, learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=1)
rbm2 = BernoulliRBM(n_components=int(total_units/2), learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=1)
rbm3 = BernoulliRBM(n_components=int(total_units/4), learning_rate=learning_rate, batch_size=batch_size, n_iter=total_epochs, verbose=1)


models = []                       
model = Pipeline(steps=[('rbm1', clone(rbm1)), ('rbm2', clone(rbm2)),('rbm3', clone(rbm3))])  # RBM stack / DBN
# --------------------------------------------------------------------------------
# train and evaluate models

model.fit(X_train)

def transform_through_dbn(data, rbms):
    for rbm in rbms:
        data = rbm.transform(data)
    return data
    
# Extract features using DBN
dbn_features = transform_through_dbn(X_train, model)  # Replace with your DBN layers

# Initialize and train the classifier
classifier = LogisticRegression()
classifier.fit(dbn_features, Y_train)

predictions = classifier.predict(transform_through_dbn(X_train, model))


In [29]:
from joblib import dump

dump(model, 'model.joblib')

['model.joblib']