In [1]:
import os
import keras

import numpy as np
import pandas as pd

from toolkit.utils import get_file_types, get_2nd_block_bytes, convert_cat2num, byte_frequency_histogram

from keras.models import Sequential, Model # type: ignore
from keras.layers import Conv1D, MaxPooling1D, Dense, Flatten, Input, Dropout # type: ignore
from keras_tuner.tuners import RandomSearch
from keras_tuner import HyperParameters

from sklearn.model_selection import train_test_split

## 1. Data Preparation

In [3]:
CLASS_NAMES = [
    "doc",
    "pdf",
    "ps",
    "xls",
    "ppt",
    "swf",
    "gif",
    "jpg",
    "png",
    "html",
    "txt",
    "xml"
]

# we follow the same steps taken in the data visulisation notebook
# to be concise, our focus is on body blocks
dir_path = 'govdocs1/'
files_data = get_file_types(dir_path)
df = pd.DataFrame(files_data)

df['size KB'] = df['file'].apply(
    lambda x: float(os.path.getsize(
        os.path.join(os.getcwd(), f"govdocs1/{x}")
    ) / 1024) if os.path.isfile(
        os.path.join(os.getcwd(), f"govdocs1/{x}")
    ) else None
)

df = df[df['type'].isin(CLASS_NAMES)]
df = df[df['size KB'] > 16]

# sample our 12 targeted file types
df = df.groupby('type').apply(
    lambda x: x if x.name == "png" else x.sample(n=min(len(x), 2000), random_state=42)
).reset_index(drop=True)

# get file body block bytes and generate frequency histograms
base_dir = os.path.join(os.getcwd(), "govdocs1")
df["body_block_bytes"] = df["file"].apply(lambda file: get_2nd_block_bytes(os.path.join(base_dir, file)))
df["byte_integers"] = df["body_block_bytes"].apply(lambda byte_sequences: np.array([byte for byte in byte_sequences]))
df["byte_bfh"] = df["byte_integers"].apply(lambda byte_integers: byte_frequency_histogram(byte_integers))
df["class"] = df["type"].apply(lambda file_type: convert_cat2num(file_type))

# create feature and class arrays
X = np.array([x for x in df["byte_bfh"]])
y = np.array([y for y in df["class"]])

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

## 2. Hyper Parameters Search

In [26]:
def build_ffnn(hp: HyperParameters) -> Model:
    """ 
    Builds and compiles a FFNN model using Keras Tuner to optimise hyperparameters.

    Args:
        hp (HyperParameters): The hyperparameters to tune.
    Returns:
        Model: The FFNN model.
    """
    model = Sequential()
    model.add(Input(shape=(256, 1)))
    
    hp_units1 = hp.Int('units1', min_value=32, max_value=512, step=32)
    model.add(Dense(units=hp_units1, activation='relu'))
    
    hp_units2 = hp.Int('units2', min_value=32, max_value=512, step=32)
    model.add(Dense(units=hp_units2, activation='relu'))

    if hp.Boolean('add_layer'):
        hp_units3 = hp.Int('units3', min_value=32, max_value=512, step=32)
        model.add(Dense(units=hp_units3, activation='relu'))

    model.add(Flatten())
    
    model.add(Dense(13, activation='softmax'))
    hp_learning_rate = hp.Choice('learning_rate', values=[0.01, 0.001, 0.0001])
    
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model


def build_cnn(hp: HyperParameters) -> Model:
    """
    Builds and compiles a CNN model using Keras Tuner to optimise hyperparameters.

    Args:
        hp (HyperParameters): The hyperparameters to tune.
    Returns:
        Model: The CNN model.
    """
    model = Sequential()
    model.add(Input(shape=(256, 1)))
    
    hp_filters1 = hp.Int('filters1', min_value=32, max_value=128, step=32)
    model.add(Conv1D(filters=hp_filters1, kernel_size=3, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_size=2))
    
    hp_filters2 = hp.Int('filters2', min_value=64, max_value=256, step=64)
    model.add(Conv1D(filters=hp_filters2, kernel_size=3, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_size=2))
    
    model.add(Flatten())
        
    hp_dropout = hp.Float('dropout', min_value=0.2, max_value=0.5, step=0.1)
    model.add(Dropout(rate=hp_dropout))
    
    hp_units = hp.Int('units', min_value=64, max_value=256, step=64)
    model.add(Dense(units=hp_units, activation='relu'))
    
    model.add(Dense(13, activation='softmax'))
    hp_learning_rate = hp.Choice('learning_rate', values=[0.01, 0.001, 0.0001])
    
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

### Feed Forward Neural Network

In [14]:
ffnn_tuner = RandomSearch(
    build_ffnn,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=2,
    directory='hps_results',
    project_name='ffnn'
)

In [15]:
ffnn_tuner.search(X_train, y_train, epochs=5, validation_data=(X_val, y_val))

Trial 10 Complete [00h 02m 41s]
val_accuracy: 0.5780664086341858

Best val_accuracy So Far: 0.8202999532222748
Total elapsed time: 00h 39m 26s


In [16]:
ffnn_best_models = ffnn_tuner.get_best_models(num_models=1)

  saveable.load_own_variables(weights_store.get(inner_path))


In [17]:
test_loss, test_acc = ffnn_best_models[0].evaluate(X_test, y_test)
print('Test accuracy:', test_acc)

[1m146/146[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.8081 - loss: 0.6711
Test accuracy: 0.8093401789665222


In [18]:
ffnn_best_models[0].summary()

In [19]:
ffnn_best_hp = ffnn_tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best Hyperparameters: {ffnn_best_hp.values}")

Best Hyperparameters: {'units1': 352, 'units2': 32, 'add_layer': False, 'learning_rate': 0.001, 'units3': 192}


### Convolutional Neural Network

In [20]:
cnn_tuner = RandomSearch(
    build_cnn,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=2,
    directory='hps_results',
    project_name='cnn'
)

In [21]:
cnn_tuner.search(X_train, y_train, epochs=5, validation_data=(X_val, y_val))

Trial 10 Complete [00h 02m 25s]
val_accuracy: 0.7659346461296082

Best val_accuracy So Far: 0.8448044955730438
Total elapsed time: 00h 33m 15s


In [22]:
cnn_best_models = cnn_tuner.get_best_models(num_models=1)

  saveable.load_own_variables(weights_store.get(inner_path))


In [23]:
test_loss, test_acc = cnn_best_models[0].evaluate(X_test, y_test)
print('Test accuracy:', test_acc)

[1m146/146[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8382 - loss: 0.5028
Test accuracy: 0.8363324999809265


In [24]:
cnn_best_models[0].summary()

In [25]:
cnn_best_hp = cnn_tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best Hyperparameters: {cnn_best_hp.values}")

Best Hyperparameters: {'filters1': 64, 'filters2': 192, 'dropout': 0.4, 'units': 256, 'learning_rate': 0.001}
