# Single-tier language model example

The following tutorial demonstrates how to utilize safekit's language modeling recurrent neural network to perform event-level anomaly detection. Unlike the aggregate autoencoder and its baselines, the language model is capable of detecting anomalous behavior at the event level. It accomplishes this by attempting to learn the syntax of log lines and the semantic relationships between individual fields in a log line. This allows the model to predict not only the likelihood of a network event, but also the likelihood of individual features appearing at given positions in the log line representation of that event.

In [1]:
import tensorflow as tf
import numpy as np
import json
import sys
import os

#from safekit.batch import OnlineBatcher
from safekit.graph_training_utils import ModelRunner, EarlyStop
#from safekit.tf_ops import lm_rnn
import safekit.tf_ops as ops
from safekit.util import get_mask, Parser

tf.set_random_seed(408)
np.random.seed(408)

In [2]:
def lm_rnn(x, t, token_embed, layers, seq_len=None, context_vector=None, cell=tf.nn.rnn_cell.BasicLSTMCell):
    """
    Token level LSTM language model that uses a sentence level context vector.

    :param x: (tensor) Input to rnn
    :param t: (tensor) Targets for language model predictions (typically next token in sequence)
    :param token_embed: (tensor) MB X ALPHABET_SIZE.
    :param layers: A list of hidden layer sizes for stacked lstm
    :param seq_len: A 1D tensor of mini-batch size for variable length sequences
    :param context_vector: (tensor) MB X 2*CONTEXT_LSTM_OUTPUT_DIM. Optional context to append to each token embedding
    :param cell: (class) A tensorflow RNNCell sub-class
    :return: (tuple) token_losses (tensor), hidden_states (list of tensors), final_hidden (tensor)
    """
    token_set_size = token_embed.get_shape().as_list()[0]
    cells = [cell(num_units) for num_units in layers]
    cell = tf.nn.rnn_cell.MultiRNNCell(cells, state_is_tuple=True)
    # mb X sentence_length X embedding_size
    #x_lookup = tf.nn.embedding_lookup(token_embed, x)
    x_lookup = tf.expand_dims(x, axis=2)
    # List of mb X embedding_size tensors
    input_features = tf.unstack(x_lookup, axis=1)

    # input_features: list max_length of sentence long tensors (mb X embedding_size+context_size)
    if context_vector is not None:
        input_features = [tf.concat([embedding, context_vector], 1) for embedding in input_features]

    # hidden_states: sentence length long list of tensors (mb X final_layer_size)
    # cell_state: data structure that contains the cell state for each hidden layer for a mini-batch (complicated)
    hidden_states, cell_state = tf.nn.static_rnn(cell, input_features,
                                          initial_state=None,
                                          dtype=tf.float32,
                                          sequence_length=seq_len,
                                          scope='language_model')
    # batch_size X sequence_length (see tf_ops for def)

    #token_losses = ops.batch_softmax_dist_loss(t, hidden_states, token_set_size)
    token_losses = eyed_mvn_loss(t, hidden_states)
    final_hidden = cell_state[-1].h
    return token_losses, hidden_states, final_hidden


def eyed_mvn_loss(truth, h, scale_range=1.0):
    """
    This function takes the output of a neural network after it's last activation, performs an affine transform,
    and returns the squared error of this result and the target.

    :param truth: A tensor of target vectors.
    :param h: The output of a neural network post activation.
    :param scale_range: For scaling the weight matrices (by default weights are initialized two 1/sqrt(fan_in)) for
    tanh activation and sqrt(2/fan_in) for relu activation.
    :return: (tf.Tensor[MB X D], None) squared_error, None
    """
    fan_in = h[0].get_shape().as_list()[1]
    dim = truth.shape.as_list()[1]
    U = tf.Variable(ops.fan_scale(scale_range, tf.tanh, h[0]) * tf.truncated_normal([fan_in, dim],
                                                                             dtype=tf.float32, name='U'))
    hidden_tensor = tf.stack(h)
    b = tf.Variable(tf.zeros([dim]))
    ustack = tf.stack([U]*len(h))
    y = tf.matmul(hidden_tensor, ustack) + b
    loss_columns = tf.square(y-truth)
    return loss_columns
class OnlineBatcher:
    """
    Gives batches from a csv file.
    For batching data too large to fit into memory. Written for one pass on data!!!
    """

    def __init__(self, datafile, batch_size,
                 skipheader=False, delimiter=',',
                 alpha=0.5, size_check=None,
                 datastart_index=3, norm=False):
        """

        :param datafile: (str) File to read lines from.
        :param batch_size: (int) Mini-batch size.
        :param skipheader: (bool) Whether or not to skip first line of file.
        :param delimiter: (str) Delimiter of csv file.
        :param alpha: (float)  For exponential running mean and variance.
                      Lower alpha discounts older observations faster.
                      The higher the alpha, the further you take into consideration the past.
        :param size_check: (int) Expected number of fields from csv file. Used to check for data corruption.
        :param datastart_index: (int) The csv field where real valued features to be normalized begins.
                                Assumed that all features beginnning at datastart_index till end of line
                                are real valued.
        :param norm: (bool) Whether or not to normalize the real valued data features.
        """

        self.alpha = alpha
        self.f = open(datafile, 'r')
        self.batch_size = batch_size
        self.index = 0
        self.delimiter = delimiter
        self.size_check = size_check
        if skipheader:
            self.header = self.f.readline()
        self.datastart_index = datastart_index
        self.norm = norm
        self.replay = False

    def next_batch(self):
        """
        :return: (np.array) until end of datafile, each time called,
                 returns mini-batch number of lines from csv file
                 as a numpy array. Returns shorter than mini-batch
                 end of contents as a smaller than batch size array.
                 Returns None when no more data is available(one pass batcher!!).
        """
        matlist = []
        l = self.f.readline()
        lsplit = l.strip().split(self.delimiter)
        if l == '':
            return None
        l = lsplit[0].replace(':','') + ',' + ','.join(lsplit[1:])
        rowtext = np.array([float(k) for k in l.strip().split(self.delimiter)])
        if self.size_check is not None:
            while len(rowtext) != self.size_check:
                l = self.f.readline()
                lsplit = l.strip().split(self.delimiter)
                if l == '':
                    return None
                l = lsplit[0].replace(':','') + ',' + ','.join(lsplit[1:])
                rowtext = np.array([float(k) for k in l.strip().split(self.delimiter)])
        matlist.append(rowtext)
        for i in range(self.batch_size - 1):
            l = self.f.readline()
            lsplit = l.strip().split(self.delimiter)
            if l == '':
                break
            l = lsplit[0].replace(':','') + ',' + ','.join(lsplit[1:])
            rowtext = np.array([float(k) for k in l.strip().split(self.delimiter)])
            if self.size_check is not None:
                while len(rowtext) != self.size_check:
                    l = self.f.readline()
                    lsplit = l.strip().split(self.delimiter)
                    if l == '':
                        return None
                    l = lsplit[0].replace(':','') + ',' + ','.join(lsplit[1:])
                    rowtext = np.array([float(k) for k in l.strip().split(self.delimiter)])
            matlist.append(rowtext)
        data = np.array(matlist)
        if self.norm:
            batchmean, batchvariance = data[:,self.datastart_index:].mean(axis=0), data[:, self.datastart_index:].var(axis=0)
            if self.index == 0:
                self.mean, self.variance = batchmean, batchvariance
            else:
                self.mean = self.alpha * self.mean + (1 - self.alpha) * batchmean
                self.variance = self.alpha * self.variance + (1 - self.alpha) * batchvariance
                data[:, self.datastart_index:] = (data[:, self.datastart_index:] - self.mean)/(self.variance + 1e-10)
        self.index += self.batch_size
        return data

First, we'll define some hyperparameters for our model—these will be explained in greater detail as we go.

In [3]:
layer_list = [10]
#lr = 1e-3
lr = 5e-3
#embed_size = 20
embed_size = 1
mb_size = 64

maxbadcount = 10

Next, we load the JSON file describing the specifications for the data.

This JSON file describes a dictionary specifying the number of features in the input data; the categories corresponding to the features; whether the corresponding category is metadata, input, or output; and the indices which map these categories to specific features. This dictionary can later be used to ease interaction with the data when providing it as input to Tensorflow.

`sentence_length` specifies a fixed sequence length over which our model will perform backpropagation through time, and `token_set_size` specifies the size of the vocabulary comprising all of the sequences—the former will be used to define the shape of the placeholders used for the features and targets, while the latter is used to define the shape of the embedding matrix used to map our categorical features to embedded representations.

In [4]:
dataspecs = json.load(open('../safekit/features/specs/lm/load_level_config.json', 'r'))
#dataspecs = json.load(open('../safekit/features/specs/lm/test_config.json', 'r'))
sentence_length = dataspecs['sentence_length'] - 1
token_set_size = dataspecs['token_set_size']
#x = tf.placeholder(tf.int32, [None, sentence_length])
#t = tf.placeholder(tf.int32, [None, sentence_length])
x = tf.placeholder(tf.float32, [None, sentence_length])
t = tf.placeholder(tf.float32, [None, sentence_length])
ph_dict = {'x': x, 't': t}

token_embed = tf.Variable(tf.truncated_normal([token_set_size, embed_size]))

Now we define the recurrent neural network proper. A call to `lm_rnn` will instantiate all of the graph operations comprising our RNN and return a tuple of tensors: `token_losses`, which represents the token-wise losses over each input sequence; `h_states`, a sentence-length tensor comprised of the hidden states at each time step; and `final_h`, simply the hidden state at the last time step. For this call, we pass our input and output placeholders as well as our embedding matrix. We also provide a list of hidden layer sizes which determines the dimensionality of the hidden states at each time step—specifying more than one layer size will yield a stacked RNN architecture. The resulting model is a single-tiered RNN using Long Short Term Memory cells with a hidden dimensionality of 10.

Finally, we define our losses over individual lines and over all lines by first averaging the feature-wise losses, then averaging these losses over an entire batch.

In [5]:
token_losses, h_states, final_h = lm_rnn(x, t, token_embed, layer_list)

line_losses = tf.reduce_mean(token_losses, axis=1)
avg_loss = tf.reduce_mean(line_losses)

Instructions for updating:
This class is deprecated, please use tf.nn.rnn_cell.LSTMCell, which supports all the feature this cell currently has. Please replace the existing code with tf.nn.rnn_cell.LSTMCell(name='basic_lstm_cell').


To map losses back to our input features easily, we'll next define a function that we can call during the training loop that will write metadata and losses for each data point in the current minibatch.

In [6]:
outfile = open('/home/wxh/AnomalyDetectionModels/safekit-master/examples/results', 'w')
outfile.write("batch cpu io_stps io_wtps io_bread io_bwrtn mem_used mem_kbbuffers mem_kbcache loss\n")

def write_results(data_dict, loss, outfile, batch):
    for n, s, d, u, r, l, i, j, k, m in zip(data_dict['time'].flatten().tolist(),
                                    data_dict['cpu'].flatten().tolist(),
                                    data_dict['io_stps'].flatten().tolist(),
                                    data_dict['io_wtps'].flatten().tolist(),
                                    data_dict['io_bread'].flatten().tolist(),
                                    data_dict['io_bwrtn'].flatten().tolist(),
                                    data_dict['mem_used'].flatten().tolist(),
                                    data_dict['mem_kbbuffers'].flatten().tolist(),
                                    data_dict['mem_kbcache'].flatten().tolist(),
                                    loss.flatten().tolist()):
        outfile.write('%s %s %s %s %s %s %s %s %s %s %r\n' % (batch, n, int(s), int(d), int(u), int(r), int(l), int(i),
                                                            int(j), int(m), k))

Now we instantiate a `ModelRunner` object, which provides a simple interface for interacting with the Tensorflow session. Instantiating this object will define the optimizer Tensorflow will use for gradient descent and initialize all of the variables in the Tensorflow graph. We can then use the `train_step` method on this object to perform an optimization step or the `eval` method to retrieve the values of arbitrary tensors in the graph.

In order to record the losses for all of the features, we define a list `eval_tensors` that contains tensors whose values we want to retrieve during training. We'll provide this list to the `ModelRunner`'s `eval` method during the training loop to compute these tensors, then record their values with the `write_results` function defined previously.

In [7]:
model = ModelRunner(avg_loss, ph_dict, learnrate=lr)

eval_tensors = [avg_loss, line_losses]

For our experiments, we want to first train our model on a single day of user activity, evaluate the model's performance on the next day, then repeat this process for each day in the data. To ease this process, we'll define a function that will either train or evaluate our model over a single day of events.

We first instantiate a batcher to divide the data into smaller portions. Since each day may contain a large number of events, we want to provide it to the model in small batches to avoid filling memory. Adjusting the minibatch size may also improve the model's performance. Here, we'll use a batch size of 64 data points, defined above as `mb_size`.

We then define a stopping criteria for training using the `EarlyStop` object; if our model's performance doesn't improve after 10 training steps—defined above as `maxbadcount`—the `check_error` function we instantiate will return `False`, and training will be discontinued.

In order to prepare data for training or evaluation, we manipulate raw batches from our batcher to construct a dictionary for Tensorflow that maps features to the placeholders used to feed data into the computational graph during training. We map the metadata features to their respective dictionary fields, define the upper range of our inputs and outputs with the `endx` and `endt` variables, then use these to select the appropriate features in the raw batch to determine our input and output.

During training, we retrieve the losses for the current batch, then perform a training step to perform gradient descent over a single batch of inputs. This process repeats until either the batcher has reached the end of the input file, the stopping criteria has been met, or the model's error has diverged to infinity. During evaluation, we only retrieve the losses, then write these to our results file using `write_results`.

In [8]:
def trainday(is_training, f):
    batch_num = 0
    #data = OnlineBatcher('/home/hutch_research/data/lanl/char_feats/word_day_split/' + f, mb_size, delimiter=' ')
    #data = OnlineBatcher('/home/wxh/AnomalyDetectionModels/safekit-master/data_examples/lanl/lm_feats/word_day_split/' + f, mb_size, delimiter=' ')
    data = OnlineBatcher('/home/wxh/lstmDnn/data/proceed/' + f, mb_size, delimiter=',', skipheader=True)
    #data = OnlineBatcher(f, mb_size, delimiter=',', skipheader=True)
    raw_batch = data.next_batch()
    cur_loss = sys.float_info.max
    check_error = EarlyStop(maxbadcount)
    #endx = raw_batch.shape[1] - 1
    endx = raw_batch.shape[1] - 2
    #endt = raw_batch.shape[1]
    endt = raw_batch.shape[1] - 1
    training = check_error(raw_batch, cur_loss)
    while training:
    #while 1:
        '''data_dict = {'line': raw_batch[:, 0], 'second': raw_batch[:, 1], 
                     'day': raw_batch[:, 2], 'user': raw_batch[:, 3], 
                     'red': raw_batch[:, 4], 'x': raw_batch[:, 5:endx],
                     't': raw_batch[:, 6:endt]}'''
        try:
            data_dict = {
                'time':raw_batch[:, 0],
                'cpu':raw_batch[:, 1],
                'io_stps':raw_batch[:, 2],
                'io_wtps':raw_batch[:, 3],
                'io_bread':raw_batch[:, 4],
                'io_bwrtn':raw_batch[:, 5],
                'mem_used':raw_batch[:, 6],
                'mem_kbbuffers':raw_batch[:, 7],
                'mem_kbcache':raw_batch[:, 8],
                'x': raw_batch[:, 1:endx],
                't': raw_batch[:, 2:endt]
            }
        except Exception as e:
            print(raw_batch)

        _, cur_loss, pointloss = model.train_step(data_dict, eval_tensors, update=is_training)
        
        if not is_training:
            write_results(data_dict, pointloss, outfile, batch_num)
        batch_num += 1
        #if data_dict['time'][0] % 10 == 0:
        #if batch_num % 5 == 0:
        if 0:
            print('%s %s %s %s %s %r' % (raw_batch.shape[0], data_dict['time'][0],
                                            ('fixed', 'update')[is_training],
                                            f, data.index, cur_loss))
        #print('loss: ', cur_loss)
        raw_batch = data.next_batch()
        #if cur_loss < 6e-5:
        #    break
        training = check_error(raw_batch, cur_loss)
        if training < 0:
            exit(0)
        return cur_loss

For concision, we will train and evaluate our model on a small subset of our data. To train and evaluate over the entire data set, uncomment the lines following the current definition of `files`.

Notice that if we use the entire data set, we reference a field in our data specifications called `weekend_days`. In our configuration files, we have specified a list of days in our data set which correspond to weekends. We want to exclude these days from training simply because they represent different patterns of user activity that may not match the distribution of user activities found during weekdays. To include these events in our analyses without affecting accuracy, another model can be trained on these events.|

In [9]:
files = dataspecs['test_files']
#weekend_days = dataspecs['weekend_days']
#files = [str(i) + '.txt' for i in range(dataspecs["num_days"]) if i not in weekend_days]

Finally, we enter the training loop, which simply consists of two successive calls to `trainday`. The first call trains the model on the current day, and the second call evaluates the model on the following day.

In [10]:
'''for idx, f in enumerate(files[:-1]):
    trainday(True, f)
    trainday(False, files[idx + 1])'''
idx = 0
epoch = 10000
f = open(r'/home/wxh/AnomalyDetectionModels/safekit-master/examples/084123-151540.log', 'a')
#for idx, f in enumerate(files[:-1]):
while idx < epoch:
#while 1:
    loss = trainday(True, files[0])
    test_loss = trainday(False, files[2])
    f.write("epoch {} loss {} tst_loss {} + '\n'".format(idx, loss, test_loss))
    print('epoch {} loss {} tst_loss {}'.format(idx, loss, test_loss))
    idx += 1
outfile.close()

epoch 0 loss 0.267102718353 tst_loss 0.0641174837947
epoch 1 loss 0.256767392159 tst_loss 0.0607639625669
epoch 2 loss 0.247098594904 tst_loss 0.0576633438468
epoch 3 loss 0.2379193753 tst_loss 0.0547402203083
epoch 4 loss 0.229071617126 tst_loss 0.0519647859037
epoch 5 loss 0.220443621278 tst_loss 0.049329970032
epoch 6 loss 0.211964219809 tst_loss 0.0468354225159
epoch 7 loss 0.203580036759 tst_loss 0.0444859080017
epoch 8 loss 0.195248425007 tst_loss 0.0422924980521
epoch 9 loss 0.186937645078 tst_loss 0.0402736924589
epoch 10 loss 0.178629353642 tst_loss 0.038456261158
epoch 11 loss 0.170320123434 tst_loss 0.0368760377169
epoch 12 loss 0.162022292614 tst_loss 0.0355791375041
epoch 13 loss 0.153765782714 tst_loss 0.0346232317388
epoch 14 loss 0.145599812269 tst_loss 0.0340782441199
epoch 15 loss 0.137594237924 tst_loss 0.03402531147
epoch 16 loss 0.129838705063 tst_loss 0.0345525071025
epoch 17 loss 0.122437894344 tst_loss 0.0357448607683
epoch 18 loss 0.11550052464 tst_loss 0.03766