inspiration: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

In [1]:
import psycopg2
from psycopg2 import pool
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt

from keras.src.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.src.layers import SimpleRNN, Dense, Dropout
from keras import Sequential, Input

2024-06-12 09:36:56.457320: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
db_config = {
    'dbname': 'cadets_e3',
    'user': 'rosendahl',
    'password': '',
    'host': 'localhost',
    'port': '5432'
}

connection = None
cursor = None

def setup_connection():
    global connection
    global cursor
    connection = psycopg2.connect(**db_config)
    cursor = connection.cursor()

def close_connection():
    # Close the database connection
    cursor.close()
    connection.close()


sequence_ids = None
sequence_ids_training = None
sequence_ids_validation = None

labels = None
no_labels = None

features = None
no_features = None

def get_sequence_ids():
    global sequence_ids
    query = '''
    select subject_uuid, exec
    from sequence
    where length <= 1000;
    '''
    cursor.execute(query)
    sequence_ids = cursor.fetchall()
    print(f'loaded {len(sequence_ids)} sequence ids')

def split_sequence_ids(validation_split=0.2):
    global sequence_ids
    global sequence_ids_training
    global sequence_ids_validation
    np.random.shuffle(sequence_ids)
    split_index = int(len(sequence_ids) * (1 - validation_split))
    sequence_ids_training = sequence_ids[:split_index]
    sequence_ids_validation = sequence_ids[split_index:]
    print(f'training set: {len(sequence_ids_training)}')
    print(f'validation set: {len(sequence_ids_validation)}')

def get_labels():
    global labels
    global no_labels
    query = '''
    select distinct exec
    from sequence;
    '''
    cursor.execute(query)
    labels = [ x[0] for x in cursor.fetchall() ]
    no_labels = len(labels)
    print(f'found {no_labels} labels')
    print(f'labels: {labels}')

def get_features():
    global features
    global no_features
    query = '''
    select distinct e.type
    from event e;
    '''
    cursor.execute(query)
    features = [ x[0]for x in cursor.fetchall() ]
    features.append('NONE')
    no_features = len(features)
    print(f'found {no_features} features')
    print(f'features: {features}')

def print_class_distribution():
    query = '''
    select exec, count(*) as count
    from sequence
    where length <= 1000
    group by exec
    order by count desc;
    '''
    cursor.execute(query)
    data = cursor.fetchall()
    data = { x[0]: x[1] for x in data }
    print(f'class distribution: {data}')

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, sequence_ids, db_config, batch_size, shuffle=True, use_multiprocessing=True, workers=4):
        """
        sequence_ids: list of tuples (subject_uuid, exec)
        """
        super().__init__(
            use_multiprocessing=use_multiprocessing,
            workers=workers
        )
        self.db_config = db_config
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.sequence_ids = sequence_ids
        self.indexes = np.arange(len(self.sequence_ids))
        self.on_epoch_end()

    def __len__(self):
        # Calculate the number of batches per epoch
        return int(np.floor(len(self.sequence_ids) / self.batch_size))

    def __getitem__(self, index):
        # Generate one batch of data
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        # batch_data = [self.data[k] for k in indexes]
        batch_data = []

        # TODO: use connection pool or something comparable to allow reuse of connections/cursors
        with psycopg2.connect(**db_config) as local_connection:
            with local_connection.cursor() as local_cursor:
                for k in indexes:
                    subject_uuid, executable = self.sequence_ids[k]

                    query = '''
                    select e.type
                    from event e
                    where e.subject_uuid = %s
                    and e.properties_map_exec = %s
                    order by e.sequence_long;
                    '''
                    # result of this query is list of tuple (one item)
                    local_cursor.execute(query, (subject_uuid, executable))
                    data = local_cursor.fetchall()
                    data = [x[0] for x in data]
                    if len(data) > 1000:
                        data = data[:1000]
                    batch_data.append((executable, data))
                    """
                    filename = f'/home/jrosendahl/datasets/cadets/sequences/{executable}_{subject_uuid}.csv'
                    with open(filename, 'r') as f:
                        data = f.readlines()
                        data = [x.strip() for x in data]
                        batch_data.append((executable, data))
                    """
        X, y = self.__data_generation(batch_data)
        return X, y

    def on_epoch_end(self):
        # Updates indexes after each epoch
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, batch_data):
        # Generate data for one batch
        X = []
        y = []
        # find max sequences length
        max_len = 0
        for executable, sequence_data in batch_data:
            max_len = len(sequence_data) if len(sequence_data) > max_len else max_len 
        for executable, sequence_data in batch_data:
            # Assuming the last column is the target variable
            while len(sequence_data) < max_len:
                sequence_data.append('NONE')
            X.append(self.encode_data(sequence_data))
            y.append(self.encode_label(executable))
        X = np.array(X)
        y = np.array(y)
        return X, y

    def encode_label(self, label):
        label_index = np.unique(np.array(labels)).tolist().index(label)
        one_hot = np.zeros(no_labels)
        one_hot[label_index] = 1
        return one_hot

    def encode_data(self, data):
        seq_len = len(data)
        new_data = np.zeros((seq_len, no_features))
        for i, feature in enumerate(data):
            new_data[i, features.index(feature)] = 1
        return new_data

In [3]:
# Initialize the data generator
setup_connection()
get_sequence_ids()
split_sequence_ids()
get_labels()
get_features()

print_class_distribution()

training_generator = DataGenerator(sequence_ids_training, db_config, batch_size=32, shuffle=True, use_multiprocessing=True, workers=16)
validation_generator = DataGenerator(sequence_ids_validation, db_config, batch_size=32, shuffle=True, use_multiprocessing=True, workers=8)

loaded 428875 sequence ids
training set: 343100
validation set: 85775
found 135 labels
labels: ['adjkerntz', 'alpine', 'anvil', 'atrun', 'awk', 'basename', 'bash', 'bounce', 'bzcat', 'bzip2', 'cat', 'chkgrp', 'chmod', 'chown', 'cleanup', 'cmp', 'cp', 'cron', 'csh', 'cut', 'date', 'dd', 'devd', 'df', 'dhclient', 'diff', 'dmesg', 'egrep', 'env', 'expr', 'find', 'fortune', 'getty', 'grep', 'head', 'hostname', 'id', 'ifconfig', 'imapd', 'inetd', 'init', 'ipfstat', 'ipfw', 'ipop3d', 'jot', 'kenv', 'kill', 'kldstat', 'less', 'limits', 'links', 'local', 'locale', 'locate.code', 'lockf', 'login', 'ls', 'lsof', 'lsvfs', 'mail', 'mail.local', 'mailwrapper', 'main', 'makewhatis', 'master', 'minions', 'mkdir', 'mktemp', 'mlock', 'mount', 'msgs', 'mv', 'nawk', 'netstat', 'newsyslog', 'nginx', 'nice', 'nohup', 'ntpd', 'pEja72mA', 'pfctl', 'php-fpm', 'pickup', 'ping', 'pkg', 'postmap', 'procstat', 'proxymap', 'ps', 'pw', 'pwait', 'pwd_mkdb', 'python2.7', 'qmgr', 'resizewin', 'rm', 'route', 'screen', 

In [4]:
model = Sequential(layers=[
    Input(shape=(None, no_features)),
    SimpleRNN(64, return_sequences=True),
    Dropout(0.2),
    SimpleRNN(64, return_sequences=True),
    Dropout(0.2),
    SimpleRNN(64, return_sequences=False),
    Dense(no_labels, activation='softmax'),
])

model.summary()

2024-06-12 09:37:01.371903: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1928] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 9798 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:17:00.0, compute capability: 7.5
2024-06-12 09:37:01.372649: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1928] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 9658 MB memory:  -> device: 1, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:65:00.0, compute capability: 7.5


In [5]:
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    verbose=1,
    mode='auto'
)

checkpoint_path = ("/home/jrosendahl/sync/models/checkpoints")
model_checkpoint = ModelCheckpoint(
    filepath=f'{checkpoint_path}/rnn_generator.keras',
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

lr_schedule = ReduceLROnPlateau(
    monitor='val_loss', 
    factor=0.1, 
    patience=3, 
    min_lr=1e-6
)

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [6]:
# Train the model
history = model.fit(
    training_generator,
    validation_data=validation_generator,
    epochs=25,
    callbacks=[early_stop, model_checkpoint],
)

# Don't forget to close the connection after training
close_connection()

Epoch 1/25


I0000 00:00:1718177825.450108  346636 service.cc:145] XLA service 0x9d87100 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1718177825.450180  346636 service.cc:153]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
I0000 00:00:1718177825.450187  346636 service.cc:153]   StreamExecutor device (1): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
2024-06-12 09:37:05.537432: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-06-12 09:37:05.868661: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:465] Loaded cuDNN version 8902


[1m    1/10721[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m20:59:33[0m 7s/step - accuracy: 0.0000e+00 - loss: 4.9555

I0000 00:00:1718177829.103967  346636 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m 4383/10721[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m49:58[0m 473ms/step - accuracy: 0.3866 - loss: 2.4071

In [None]:
def plot_result(item):
    plt.plot(history.history[item], label=item)
    plt.plot(history.history["val_" + item], label="val_" + item)
    plt.xlabel("Epochs")
    plt.ylabel(item)
    plt.title("Train and Validation {} Over Epochs".format(item), fontsize=14)
    plt.legend()
    plt.grid()
    plt.show()


plot_result("loss")
plot_result("accuracy")
