### One-class classification

To demonstrate how to implement network models in one-class classification, we will use the IMDb (Internet Movies Database) movie review library, which is also available at the following links: https://www.imdb.com/interfaces/ or https://www.kaggle.com/lakshmi25npathi/datasets. In this dataset, we find $50000$ movie reviews that are classified into one of two classes, i.e. positive or negative review. Moreover, this set was divided into a training set ($25000$ reviews) and a test set ($25000$ reviews). In each subset we have exactly half of positive and negative reviews.

#### Prepare input data

The IMDb collection is included with the Keras package and has been prepared for direct use, i.e. textual reviews have been converted into vectors of integers, with each number representing a word number (index) key in the dictionary. Additionally, each input vector is assigned a label of $0$ or $1$, which indicates a negative or positive review, respectively.

In [None]:
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['TF_XLA_FLAGS'] = '--tf_xla_enable_xla_devices=false --tf_xla_auto_jit=0'
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

from tensorflow.keras.datasets import imdb
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np

# a number of keywords
L=15
# download training and testing datasets limited to L keywords
(train_data,train_labels),(test_data,test_labels)=imdb.load_data(num_words=L+4)
N=len(train_data)
print('The number of training samples:',N)

The imdb.load_data function accepts an argument specifying the number of keywords (the first $L$ most frequently occurring) to be taken into account (the value $L+4$ was entered because the index $0$ is used to complete the vector, $1$ means the start of the review text , $2$ means a word that does not appear in the dictionary, $3$ is not used). The $N$ variable (**N=len(train_data)**) specifies the number of data vectors in the training set. Below is an example input vector describing a sample review with a maximum of $15$ of keywords.

In [None]:
print('Exemplary review:')
print(train_data[0])

This way of representing data is not directly suitable for training a neural network model. First of all, it should be noted that each input vector describing a single review may have a different size. Therefore, a process of preparing input data for the neural network training process is required. Since the number of keywords is strictly defined,
then we have at least two possibilities here: (I) for each review we can create a vector of length $L$ elements and mark the occurrence of specific keywords with the values $1$, where the keywords are represented by the indexes of the elements in the vector, (II) similarly to the case of (I ), but this time we can count the occurrences of specific words.

In [None]:
def prepare_data(name,data,labels):
    global N,L

    x=np.zeros((N,L),float)
    y=np.zeros((N,1),float)
    for i in range(0,N):
        for j in data[i]:
            # here we can decide, wether we want to count or indicate the occurences of words
            # =1 allows to indicate the occurence of a specific words
            # +=1 allows to count the numbers of occurrences
            x[i][j-4]=1
        # we use normalization in case of counting words
        #x[i]/=np.linalg.norm(x[i])
        y[i]=labels[i]
    np.save(name+'_data.npy',x)
    np.save(name+'_labels.npy',y)
    return

# preparation of input data
prepare_data('train',train_data,train_labels)
prepare_data('test',test_data,test_labels)

Of course, it is possible to recreate individual reviews through a reverse mapping process, i.e. keyword-to-keyword index.

In [None]:
def decode_data(data):
    
    dictionary=imdb.get_word_index()
    my_dictionary=dict([(k,v) for (v,k) in dictionary.items()])
    s=' '.join([my_dictionary.get(d-3,'?') for d in data])
    return s

print(decode_data(train_data[0]))

The next step is to divide the test set into two parts, i.e. one used for testiung and the other serving as a validation set. In our example, we divide it in half.

In [None]:
train_x=np.load('train_data.npy')
train_y=np.load('train_labels.npy')
test_x=np.load('test_data.npy')
test_y=np.load('test_labels.npy')
N=len(test_x)
N2=N//2
(test_x,validate_x)=(test_x[0:N2],test_x[N2:N])
(test_y,validate_y)=(test_y[0:N2],test_y[N2:N])
print('Exemplary vector discribing one review:')
print(np.round(test_x[0],3))

#### Construction of a neural network model

In the binary classification task under consideration, the input of the neural network is vectors of size $L$-elements ($L$ is the number of the most important keywords), while the output is one value from the range $[0,1]$, which determines the affiliation to one of two classes. A value of $1$ indicates a positive review. In turn, the value $0$ is a negative review.

In our example study, we will use a network with one layer and the number of $1$ neuron.

In [None]:
# neural network - building the model
model=tf.keras.models.Sequential()
model.add(tf.keras.layers.Dense(1,activation='sigmoid'))
model.build(input_shape=(N,L))
model.compile(optimizer='Adam',loss='binary_crossentropy',metrics=['accuracy'])
model.summary()

We carry out the training process taking into account the validation set.

In [None]:
# siec neuronowa - trenowanie modelu
history=model.fit(train_x,train_y,epochs=20,batch_size=100,validation_data=(validate_x,validate_y))
tf.keras.models.save_model(model,'model_1.h5')

After training, we save our model (weights and network structure) to a file with the ".h5" extension using the save_model() function. Such a model can then be repeatedly loaded into memory and used in a classification task without the need for retraining.

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_loss'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training loss','training accuracy','validation loss','validation accuracy'])
plt.show()

It should be noted here that the fit() function returns the loss function and metric values calculated for each epoch of the training process in the form of a tensor.

We can use the trained model in practice in the following way:

In [None]:
# we use the trained model for classification of one vector (review)
model=tf.keras.models.load_model('model_1.h5')
print(model.predict(test_x[0:1])[0][0])

We can evaluate the classification for the entire test set using the evaluate() function:

In [None]:
# we test our model for the whole test set
model=tf.keras.models.load_model('model_1.h5')
val=model.evaluate(test_x,test_y)
print('Accuracy:',np.round(100*val[1],2),'%')

**Task.**

Further experimental research into this problem may occur
concern:
* increasing the number of keywords;
* changing the way of representation of text data;
* increasing the number of hidden layers of the network and checking how this will affect the classification results obtained for all three sets (training, validation, testing);
* modifying (increasing, decreasing) the number of neurons in hidden layers and checking the impact of changes on the classification results;
* replacing loss function from binary_crossentropy to mse function;
* converting the ReLU activation function into sigmoid functions and checking the impact of such an operation on the learning process itself, as well as on the final results obtained.

---

### <center>Experiments</center>

- keywords: 50, 100, 200
- text representations: Count, One hot
- number of hidden layers: 1, 2, 3
- number of neurons: 128, 256, 512
- loss: Binary Cross Entropy, MSE
- hidden activation functions: ReLU, sigmoid
- output activation functions: linear (MSE), sigmoid (BinaryCrossEntropy)

#### Save data

Get One hot and Count representations

In [None]:
from tensorflow.keras.losses import BinaryCrossentropy, MeanSquaredError
from tensorflow.keras.metrics import BinaryAccuracy, AUC, MeanAbsoluteError, RootMeanSquaredError
from tensorflow.keras import models, layers
import optuna
import pandas as pd


def prepare_data(name, data, labels, keyword, seq_length, count: bool = False):
    x = np.zeros((seq_length, keyword), float)
    y = np.zeros((seq_length, 1), float)
    for i in range(0, N):
        for j in data[i]:
            if count:
                x[i][j - 4] += 1
            else:
                x[i][j - 4] = 1
        if count:
            x[i] /= np.linalg.norm(x[i])
        y[i] = labels[i]

    if "data" not in os.listdir(os.getcwd()):
        os.mkdir("data")

    file_name_data = f"{name}_{keyword}_count_data.npy" if count else f"{name}_{keyword}_one_hot_data.npy"
    file_name_labels = f"{name}_{keyword}_count_labels.npy" if count else f"{name}_{keyword}_one_hot_labels.npy"

    np.save(os.path.join("data", file_name_data), x)
    np.save(os.path.join("data", file_name_labels), y)

def load_data(name: str, keyword: int, count: bool = False) -> tuple:
    if count:
        x, y = np.load(os.path.join("data", f"{name}_{keyword}_count_data.npy")), np.load(os.path.join("data", f"{name}_{keyword}_count_labels.npy"))
    else:
       x, y = np.load(os.path.join("data", f"{name}_{keyword}_one_hot_data.npy")), np.load(os.path.join("data", f"{name}_{keyword}_one_hot_labels.npy"))
    return x, y


search_space = {
    "keywords": [50, 100, 200],
    "number_of_layers": [1, 2, 3],
    "number_of_neurons": [128, 256, 512],
    "activation": ["relu", "sigmoid"],
    "count": [True, False],
    "is_increasing": [True, False]
}

loss_functions = ["binary_crossentropy", "mse"]

if "data" not in os.listdir(os.getcwd()):
    for keywords in search_space["keywords"]:
        (train_data, train_labels), (test_data, test_labels) = imdb.load_data(num_words=keywords + 4)
        seq_len = train_data.shape[-1]
        prepare_data("train", train_data, train_labels, keywords, seq_len, False)
        prepare_data("test", test_data, test_labels, keywords, seq_len, False)
        prepare_data("train", train_data, train_labels, keywords, seq_len, True)
        prepare_data("test", test_data, test_labels, keywords, seq_len, True)

#### Defining model and experiments

In [None]:
def define_nn(params: dict, input_size: int, loss_function: str):
    output_activation = {
        "mse": "linear",
        "binary_crossentropy": "sigmoid"
    }[loss_function]
    number_of_layers, number_of_neurons, activation = params["number_of_layers"], params["number_of_neurons"], params["activation"]

    neurons_in_layers = np.array([number_of_neurons // np.pow(2, _) for _ in range(3, 3 + number_of_layers)])

    sorted_neurons_in_layers = {
        False: np.sort(neurons_in_layers * (-1)) * (-1),
        True: np.sort(neurons_in_layers)
    }
    model_layers = [
        layers.Dense(num_in_layer, activation=activation)
        for num_in_layer in sorted_neurons_in_layers[params["is_increasing"]]
    ]
    input_layer = layers.Input(shape=(input_size, ))
    output_layer = layers.Dense(1, activation=output_activation)
    model_layers.insert(0, input_layer)
    model_layers.append(output_layer)
    return models.Sequential(model_layers)


def objective_accuracy(trial: optuna.Trial):
    # Optymalizowane parametry
    params = {k: trial.suggest_categorical(k, v) for k, v in search_space.items()}

    # Przygotowanie danych
    x_train, y_train = load_data(name="train", keyword=params["keywords"], count=params["count"])
    x_test, y_test = load_data(name="test", keyword=params["keywords"], count=params["count"])

    # Definicja modelu
    my_model = define_nn(params, x_train.shape[-1], "binary_crossentropy")
    my_model.compile(
        optimizer="adam",
        loss=BinaryCrossentropy(),
        metrics=[BinaryAccuracy(name="accuracy"), AUC(name="auc")]
    )

    # Proces uczenia i walidacji
    trial_history = my_model.fit(
        x_train, y_train,
        epochs=20,
        batch_size=256,
        validation_split=0.2,
        verbose=0
    )
    test_loss, test_accuracy, test_auc = my_model.evaluate(x_test, y_test, verbose=0)
    test_results = dict(zip(["test_loss", "test_accuracy", "test_auc"], [test_loss, test_accuracy, test_auc]))
    history_of_trials_accuracy.append({**test_results, **params})
    return np.max(trial_history.history["val_accuracy"])


def objective_mse(trial: optuna.Trial):
    # Optymalizowane parametry
    params = {k: trial.suggest_categorical(k, v) for k, v in search_space.items()}

    # Przygotowanie danych
    x_train, y_train = load_data(name="train", keyword=params["keywords"], count=params["count"])
    x_test, y_test = load_data(name="test", keyword=params["keywords"], count=params["count"])

    # Definicja modelu
    my_model = define_nn(params, x_train.shape[-1], "mse")
    my_model.compile(
        optimizer="adam",
        loss=MeanSquaredError(),
        metrics=[RootMeanSquaredError(name="rmse"), MeanAbsoluteError(name="mae")]
    )

    # Proces uczenia i walidacji
    trial_history = my_model.fit(
        x_train, y_train,
        epochs=20,
        batch_size=256,
        validation_split=0.2,
        verbose=0
    )
    test_loss, test_rmse, test_mae = my_model.evaluate(x_test, y_test, verbose=0)
    test_results = dict(zip(["test_loss", "test_rmse", "test_mae"], [test_loss, test_rmse, test_mae]))
    history_of_trials_mse.append({**test_results, **params})
    return np.min(trial_history.history["val_rmse"])

#### Accuracy as primary metric

In [None]:
history_of_trials_accuracy = []
study_accuracy = optuna.create_study(
    study_name="Zadanie 3 Accuracy",
    direction="maximize",
    sampler=optuna.samplers.GridSampler(search_space=search_space, seed=42)
)

optuna.logging.set_verbosity(optuna.logging.INFO)
study_accuracy.optimize(objective_accuracy)

In [None]:
if "results" not in os.listdir(os.getcwd()):
    os.mkdir("results")

history_of_trials_accuracy = pd.DataFrame(history_of_trials_accuracy)
history_of_trials_accuracy.to_csv(os.path.join("results", "history_of_trials_accuracy.csv"))

#### MSE as primary metric

In [None]:
history_of_trials_mse = []
study_mse = optuna.create_study(
    study_name="Zadanie 3 MSE",
    direction="minimize",
    sampler=optuna.samplers.GridSampler(search_space=search_space, seed=42)
)

optuna.logging.set_verbosity(optuna.logging.INFO)
study_mse.optimize(objective_mse)

In [None]:
if "results" not in os.listdir(os.getcwd()):
    os.mkdir("results")

history_of_trials_mse = pd.DataFrame(history_of_trials_mse)
history_of_trials_mse.to_csv(os.path.join("results", "history_of_trials_mse.csv"))

#### Results

In [None]:
history_of_trials_accuracy.sort_values(by=["test_accuracy", "test_auc"], ascending=False).head()

In [None]:
history_of_trials_mse.sort_values(by=["test_rmse", "test_mae"], ascending=False).head()

In [None]:
history_of_trials_accuracy = pd.get_dummies(history_of_trials_accuracy, columns=["activation"])
history_of_trials_mse = pd.get_dummies(history_of_trials_mse, columns=["activation"])

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt


fig, ax = plt.subplots(ncols=2, figsize=(20, 6))
sns.heatmap(
    data=history_of_trials_accuracy.corr(),
    cmap="viridis",
    annot=True,
    fmt=".2f",
    ax=ax[0]
)
sns.heatmap(
    data=history_of_trials_mse.corr(),
    cmap="viridis",
    annot=True,
    fmt=".2f",
    ax=ax[1]
)
plt.show()