In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf
from tensorflow import keras
from keras.models import Model, load_model, Sequential # for assembling a Neural Network model
from keras.layers import ZeroPadding1D, Input, Dense, Embedding, Reshape, Concatenate, Flatten, Dropout, Conv1DTranspose # for adding layers
from keras.layers import Conv2D, Conv2DTranspose, MaxPool2D, ReLU, LeakyReLU, Conv1D, Flatten, MaxPooling1D, BatchNormalization # for adding layers
from tensorflow.keras.utils import plot_model # for plotting model diagram
from tensorflow.keras.optimizers import Adam # for model optimization 
import numpy as np # for data manipulation

from sklearn import preprocessing
from keras.optimizers import RMSprop
from keras.initializers import RandomNormal
from keras.constraints import Constraint
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from keras.models import load_model
import pydot
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn import metrics
from keras import backend as K
# Data manipulation
import pandas as  pd
import numpy.matlib
from numpy.random import randn
from numpy.random import randint
from numpy import zeros
# Visualization
import matplotlib 
import matplotlib.pyplot as plt # for data visualization
import os
from collections import defaultdict
from typing import Dict, List, Tuple
from keras import initializers
np.random.seed(1600)
tf.random.set_seed(1600)


In [None]:
cpu_devices = tf.config.experimental.list_physical_devices('GPU')
for device in cpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

# Load the Spectroscopy DataSet

In [None]:
# Read in all data
read_files = os.listdir('data/container_content')
readings = {z.split('.')[0]:pd.read_csv(os.path.join('./data/container_content',z)) for z in read_files}
# Remove extra data fields from the readings
for key in readings.keys():
    try:
        readings[key] = readings[key].loc[:,list(readings[key].columns)[:-4]].to_numpy().astype(np.int32)
    except Exception as e:
        print(str(e))

### Perform reflectance calibraiton across all data samples

In [None]:
# Read in calibration data
hamamatsu_dark = np.median(pd.read_csv('./calibration/hamamatsu_black_ref.csv').to_numpy().astype(np.int32), axis=0)
hamamatsu_white = np.median(pd.read_csv('./calibration/hamamatsu_white_ref.csv').to_numpy().astype(np.int32), axis=0)
mantispectra_dark = np.median(pd.read_csv('./calibration/mantispectra_black_ref.csv').to_numpy()[:,:-5].astype(np.int32), axis=0)
mantispectra_white = np.median(pd.read_csv('./calibration/mantispectra_white_ref.csv').to_numpy()[:,:-5].astype(np.int32), axis=0)

# Create composite calibration file
white_ref = np.concatenate((hamamatsu_white, mantispectra_white))[1:]
dark_ref = np.concatenate((hamamatsu_dark, mantispectra_dark))[1:]

# Create calibration function
def spectral_calibration(reading):
    t = np.divide((reading-dark_ref), (white_ref-dark_ref), where=(white_ref-dark_ref)!=0)
    # Handle cases where there is null division, which casts values as "None"
    if np.sum(t==None) > 0:
        print('Null readings!')
    t[t== None] = 0
    # Handle edge cases with large spikes in data, clip to be within a factor of the white reference to avoid skewing the model
    t = np.clip(t,-2.5,2.5)
    return t

### Reflectance normalize spectral readings

In [None]:
# Calibrate all the data
readings_cal = {}
for key in readings.keys():
    readings_cal[key] = np.apply_along_axis(spectral_calibration,1,readings[key])

readings_cal

In [None]:
# Read in the container-substrate pairings
pairings = pd.read_csv('./data/container_substrate.csv',header=1, keep_default_na=False)
# Remove blank data rows
pairings = pairings.loc[:18,(pairings.columns)[:20]]
# Unique substances
contents = list(pairings.columns[1:])

In [None]:
pairings

In [None]:
contents

In [None]:
# Containers to exclude - wood, stainless steel, aluminum
exclude_containers = ['O','P','Q','I','K','M']
exclude_contents = [15,2,0,7,10]
# Generalized function to group data by the contents type

def random_scale(reading: np.array) -> np.array:
    return reading * np.random.default_rng().normal(1.0,0.05,1)[0]

def generate_data_labels(readings: Dict) -> defaultdict:
    data_by_contents = np.array([])
    labels_by_contents = np.array([])

    # Iterate over all data_frames types
    for key in readings.keys():
        # Iterate over all containers, but skip Aluminum (P), Stainless Steel (Q), and Wood (R)
        if key[0] in exclude_containers or (len(key) > 1 and int(key[1:]) in exclude_contents): #:or int(key[1:]) in exclude_contents:
            continue
        for index, val in enumerate(contents):
            if key not in list(pairings[val]):
                continue
            # Otherwise the data is useful to use, let's proceed with the data wrangling
            useData = readings[key]
            # ADD SCALING NOISE TO THE DATA HERE
            useData = np.matlib.repmat(useData,3,1)
            useData = np.apply_along_axis(random_scale,1,useData)
            # Get the plain name of the container
            useContainer = pairings[np.equal.outer(pairings.to_numpy(copy=True),  [key]).any(axis=1).all(axis=1)]['container / substrate'].iloc[0]
            # Add the index as the key value
            data_by_contents = np.vstack((data_by_contents, useData)) if data_by_contents.size else useData
            labels_by_contents = np.vstack((labels_by_contents, np.matlib.repmat([val,useContainer],useData.shape[0],1))) if labels_by_contents.size else np.matlib.repmat([val,useContainer],useData.shape[0],1)
    return data_by_contents, labels_by_contents


In [None]:
all_data, labels = generate_data_labels(readings_cal)
all_data = np.hstack((all_data,np.gradient(all_data,axis=1)))

In [None]:
all_data, labels

In [None]:
print(np.unique(labels[:,0]))

In [None]:
# Fit labels to model
le_contents = preprocessing.LabelEncoder()
le_containers = preprocessing.LabelEncoder()
labels_contents = le_contents.fit_transform(labels[:,0])
labels_containers = le_containers.fit_transform(labels[:,1])
encoded_labels = np.vstack((labels_containers,labels_contents)).T

In [None]:
encoded_labels, labels_contents, labels_containers

In [None]:
data = pd.DataFrame(all_data)
data["labels"] = labels_contents
classes = len(np.unique(labels_contents))
classesx = (np.unique(labels_contents))
data

In [None]:
X = data.drop(["labels"], axis=1)
y = data["labels"]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.3, stratify=y_train)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

## Put scaled data in separate dataframe for future visualization

In [None]:
newdata_x = X_train.copy()
newdata_y = y_train.copy()
newdata = pd.DataFrame(newdata_x)
newdata["labels"] = (newdata_y)
newdata

In [None]:
X_train.shape,y_train.shape

### MLP

In [None]:
size_ = data.shape[1] 

In [None]:
def MLP_():
    model = Sequential()
    model.add(Dense(200, activation="relu", input_shape=(X_train.shape[1],)))
    model.add(Dense(100, activation="relu"))
    model.add(Dense(25, activation="relu"))
    
    model.add(Dense(classes, activation = 'softmax'))
    model.compile(loss = 'sparse_categorical_crossentropy', optimizer="SGD",metrics = ['accuracy'])
    model.summary()
    return model

mlp_model = MLP_()
mlp_model.summary()
plot_model(mlp_model,show_shapes=True)

In [None]:
mlp_model.fit(X_train, y_train, batch_size=64,epochs=500, verbose=1, validation_data=(X_val, y_val))

In [None]:
acc = mlp_model.evaluate(X_test, y_test)
print("Loss:", acc[0], " Accuracy:", acc[1])
pred = mlp_model.predict(X_test)
pred_y = pred.argmax(axis=-1)

In [None]:
confusion_matrix = metrics.confusion_matrix(y_test, pred_y)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels =classesx)

cm_display.plot()
plt.show() 

In [None]:
print(f"Actual: {y_test}")
print(f"Predicted: {pred_y}")

### Begin WCGAN


In [None]:
def generate_latent_points(latent_dim, n_samples, n_classes=classes):
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = x_input.reshape(n_samples, latent_dim)
    # generate labels
    labels = randint(0, n_classes, n_samples)
    return [z_input, labels]

In [None]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples):
    # generate points in latent space
    z_input, labels_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    images = generator.predict([z_input, labels_input])
    # create class labels
    y = -np.ones((n_samples, 1))
    return [images, labels_input], y

In [None]:
# generate n real samples with class labels; We randomly select n samples from the real data
def generate_real_samples(n):
    X = data.sample(n)
    x = X.iloc[:,0:606]
    labels = X.iloc[:,-1]
    y = np.ones((n, 1))    
    return [x, labels], y
generate_real_samples(30)

In [None]:
def define_generator(latent_dim, c=classes):
    # label input
    in_label = Input(shape=(1,))
    n_nodes = 1 * X_train.shape[1]

    # image generator input
    n_nodes = 1 * X_train.shape[1]
    # Noise input
    in_lat = Input(shape=(latent_dim,))
    gen = Dense(n_nodes)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    # merge image gen and label input
    merge = Concatenate()([gen, in_label])
    
    gen = Dense(128)(merge)
    gen = LeakyReLU(alpha=0.2)(gen)
    
    gen = Dense(256)(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    
    gen = Dense(512)(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    
    # output
    out_layer = Dense(X_train.shape[1], activation="tanh")(gen)
    model = Model([in_lat, in_label], out_layer)
    return model

In [None]:
generator1 = define_generator(100)
generator1.summary()
plot_model(generator1,show_shapes=True)

In [None]:
# implementation of wasserstein loss
def wasserstein_loss(y_true, y_pred):
     return K.mean((y_true+1e-5) * (y_pred+1e-5))

In [None]:
def gradient_penalty_loss(y_true, y_pred, averaged_samples):
        # Computes gradient penalty based on prediction and weighted real / fake samples
        gradients = K.gradients(y_pred, averaged_samples)[0]
        # compute the euclidean norm by squaring ...
        gradients_sqr = K.square(gradients)
        #   ... summing over the rows ...
        gradients_sqr_sum = K.sum(gradients_sqr, axis=np.arange(1, len(gradients_sqr.shape)))
        #   ... and sqrt
        gradient_l2_norm = K.sqrt(gradients_sqr_sum)
        # compute lambda * (1 - ||grad||)^2 still for each single sample
        gradient_penalty = K.square(1 - gradient_l2_norm)
        # return the mean as loss over all the batch samples
        return K.mean(gradient_penalty)

In [None]:
def generator_loss(fake):
        return -tf.reduce_mean(fake)
    
def discriminator_loss(real, fake):
        real_loss = tf.reduce_mean(real)
        fake_loss = tf.reduce_mean(fake)
        return fake_loss - real_loss


In [None]:
# clip model weights to a given hypercube
class ClipConstraint(Constraint):
    # set clip value when initialized
    def __init__(self, clip_value):
        self.clip_value = clip_value

    # clip model weights to hypercube
    def __call__(self, weights):
        return K.clip(weights, -self.clip_value, self.clip_value)

    # get the config
    def get_config(self):
        return {'clip_value': self.clip_value}

In [None]:
X_train.shape[1]

In [None]:
def define_discriminator(n_inputs=X_train.shape[1], n_c = classes):
    # weight initialization
    init = RandomNormal(stddev=0.02)
     # weight constraint
    const = ClipConstraint(0.01)
    in_shape = (X_train.shape[1],)
    # label input
    in_label = Input((1,))
    n_nodes = X_train.shape[1] * 1
    # image input
    in_image = Input(shape=in_shape)
    # concat label as a channel
    merge = Concatenate()([in_image, in_label])
    
    fe = Dense(32)(merge)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    fe = Dense(64)(fe) 
    fe = LeakyReLU(alpha=0.2)(fe)

    out_layer = Dense(1, activation='sigmoid')(fe)
    # define model
    model = Model([in_image, in_label], out_layer)
    opt = RMSprop(learning_rate=0.00005)
    model.compile(loss=wasserstein_loss, optimizer=opt, metrics=['accuracy'])
    return model
discriminator1 = define_discriminator()
discriminator1.summary()
plot_model(discriminator1,show_shapes=True)

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(generator, discriminator):
    # make weights in the discriminator not trainable
    discriminator.trainable = False    
    # get noise and label inputs from generator model
    gen_noise, gen_label = generator.input
    # get image output from the generator model
    gen_output = generator.output
    # connect image output and label input from generator as inputs to discriminator
    gan_output = discriminator([gen_output, gen_label])
    # define gan model as taking noise and label and outputting a classification
    model = Model([gen_noise, gen_label], gan_output)
    # compile model
    opt = RMSprop(learning_rate=0.00005)
    model.compile(loss=wasserstein_loss, optimizer=opt) 
    return model
gg = define_gan(generator1,discriminator1)
plot_model(gg,show_shapes=True)

In [None]:
# create a line plot of loss for the gan and save to file
def plot_history(d_hist, d2_hist):
    # plot loss
    plt.plot(d_hist, label='crit_real')
    plt.plot(d2_hist, label='crit_fake')
    plt.legend()
    plt.show()
    plt.close()
    
d_optimizer = RMSprop(learning_rate=0.00005)
g_optimizer = RMSprop(learning_rate=0.00005)

In [None]:
# train the generator and discriminator
def train(g_model, d_model, gan_model, latent_dim, n_epochs=30, n_batch=32, dataset=data):    # determine half the size of one batch, for updating the  discriminator
    half_batch = int(n_batch / 2)
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    d_history = []
    g_history = []    # manually enumerate epochs
    for epoch in range(n_epochs):
        # enumerate batches over the training set
        with tf.GradientTape() as tape:
            # prepare fake examples
            [X_real, labels_real], y_real = generate_real_samples(half_batch)   
            # generate 'fake' examples
            [X_fake, labels], y_fake = generate_fake_samples(g_model, latent_dim, half_batch)    
            # update discriminator
            d_loss_real, d_real_acc = d_model.train_on_batch([X_real, labels_real], y_real)
            d_loss_fake, d_fake_acc = d_model.train_on_batch([X_fake, labels], y_fake)
            # using the fake and realloss get discriminator loss
            d_cost = discriminator_loss(d_loss_real, d_loss_fake)
            
            # gradient penalty
            gp = gradient_penalty(half_batch, X_real, X_fake, labels)
            # add gradient penalty to original loss
            d_loss = d_cost + gp * 10 
        d_gradient = tape.gradient(d_loss, d_model.trainable_variables)
        d_optimizer.apply_gradients(zip(d_gradient, d_model.trainable_variables))
        # prepare points in latent space as input for the generator
        [z_input, labels_input] = generate_latent_points(latent_dim, n_batch) 
        # create inverted labels for the fake samples
        y_gan = np.ones((n_batch, 1))    
        # update the generator via the discriminator's error
        g_loss_fake = gan_model.train_on_batch([z_input, labels_input], y_gan)   
        print('>%d, d1=%.3f, d2=%.3f d=%.3f g=%.3f' % (epoch+1, d_loss_real, d_loss_fake, d_loss,  g_loss_fake))    
        d_history.append(d_loss)
        g_history.append(g_loss_fake)    
        plot_history(d_history, g_history)    
        g_model.save('trained_generated_model.h5')

In [None]:
# size of the latent space
latent_dim = 100 # create the discriminator
discriminator = define_discriminator()# create the generator
generator = define_generator(latent_dim)# create the gan
gan_model = define_gan(generator, discriminator)# train model
train(generator, discriminator, gan_model, latent_dim)

### Orange is the loss of the Generator and Blue is the loss of the Discriminator

# Evaluation

### Generate fake data

In [None]:
model =load_model('trained_generated_model.h5')

In [None]:
latent_points, labels = generate_latent_points(100, 16900)

In [None]:
labels = np.asarray([x for _ in range(classes*100) for x in range(classes)])
len(labels)

In [None]:
# generate data and labels
Q  = model.predict([latent_points, labels])
print(np.shape(Q))
Q = (Q + 1) / 2.0
Q = np.squeeze(Q, axis=2)

In [None]:
fdata = pd.DataFrame(Q)
fdata["labels"] = labels
fdata = fdata.sample(frac = 1)
fdata

### Train test split for the original dataset

In [None]:
xX_train, xX_test, xy_train, xy_test = train_test_split(data.iloc[:,0:606], data.iloc[:,-1], test_size=0.10, stratify=data.iloc[:,-1])
xX_train

### Append the fake data to X_train and y_train

In [None]:
xframes = [xX_train, fdata.iloc[:,0:606]]
yframes = [xy_train, fdata.iloc[:,-1]]
xresult = pd.concat(xframes)
yresult = pd.concat(yframes)
xresult

In [None]:
yresult

In [None]:
def MLP_():
    model = Sequential()
    model.add(Dense(200, activation="relu", input_shape=(X_train.shape[1],)))
    model.add(Dense(100, activation="relu"))
    model.add(Dense(25, activation="relu"))
    
    model.add(Dense(classes, activation = 'softmax'))
    model.compile(loss = 'sparse_categorical_crossentropy', optimizer = "adam",metrics = ['accuracy'])
    model.summary()
    return model

mlp_model = MLP_()
mlp_model.summary()
plot_model(mlp_model,show_shapes=True)

### Train the model using the fake and real data

In [None]:
mlp_model.fit(xresult, yresult, batch_size=64,epochs=100, verbose=1, validation_split=0.10)

### Evaluate the model using the X_test and Y_test data

In [None]:
acc = mlp_model.evaluate(xX_test, xy_test)
print("Loss:", acc[0], " Accuracy:", acc[1])
pred = mlp_model.predict(xX_test)
pred_y = pred.argmax(axis=-1)

# Visualization

In [None]:
confusion_matrix = metrics.confusion_matrix(xy_test, pred_y)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels =[0,1,2,3,4,5,6,7,8,9,10,11,12])

cm_display.plot()
plt.show() 

In [None]:
falseplot0 = fdata[fdata['labels']==0].head(50)
falseplot10 = fdata[fdata['labels']==10].head(50)

falseplot0

In [None]:
trueplot0 = newdata[newdata['labels']==0].head(50)
trueplot10 = newdata[newdata['labels']==10].head(50)

trueplot0

In [None]:
def plot_fake_():
    temp = 0
    for i in range(1,50):
        bob  = falseplot10.iloc[temp:i,0:606].values.tolist()
        temp += 1
        ypoints = bob[0]
        plt.plot(ypoints, color="b")
    plt.figure(figsize=(20,12))

    plt.show()
    
def plot_both_():
    temp = 0
    for i in range(1,50):
        bob = trueplot10.iloc[temp:i,0:606].values.tolist()
        tom = falseplot10.iloc[temp:i,0:606].values.tolist()
        temp += 1
        ypoints_real = bob[0]
        ypoints_fake = tom[0]
        plt.plot(ypoints_real, color="r")
        plt.plot(ypoints_fake, color="b")
    plt.figure(figsize=(40,24))
    plt.show()
    
def plot_real_():
    temp = 0
    for i in range(1,50):
        bob  = trueplot10.iloc[temp:i,0:606].values.tolist()
        temp += 1
        ypoints = bob[0]
        plt.plot(ypoints, color="r")
    plt.figure(figsize=(20,12))

    plt.show() 
plot_real_()

In [None]:
plot_fake_()

In [None]:
plot_both_()