In [None]:
%matplotlib inline

In [None]:
from collections.abc import Callable, Iterable
import io
import itertools
import os
os.environ["TF_USE_LEGACY_KERAS"] = "1"
from pathlib import Path
import pickle
import random
import re
from shutil import rmtree
from tempfile import mkdtemp
from textwrap import TextWrapper
from typing import Any
from zipfile import ZipFile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.preprocessing import Normalizer
import tensorflow as tf
from tensorflow import constant, Tensor
from tensorflow.data import AUTOTUNE, Dataset
from tensorflow.keras.backend import clear_session
from tensorflow.keras.callbacks import Callback, EarlyStopping
from tensorflow.keras.layers import (
  Dense, Embedding, GRU, Layer, TextVectorization
)
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.utils import (
  deserialize_keras_object, register_keras_serializable,
  serialize_keras_object, set_random_seed,
)
from typing import override

# Demonstration of a generic hands-on deep learning method for anomaly detection in log data
## Introduction
### Motivation

This notebook demonstrates how unexpected subsequences (anomalies) can be found in sequential text data. The example implementation uses [software logs](https://en.m.wikipedia.org/wiki/Logging_(computing)).

Logs can be very helpful in understanding the behavior of a new system, application or environment we have only recently started to work with. The latter situation involves some incremental learning process - from human and ... possibly machine standpoint. That is, machine learning (ML) can be utilized as a powerful tool in log data analysis.

Log messages, in general, are very specific to the activities being logged and thus can contain numeric data, so anomaly detection methods based on [natural language processing (NLP)](https://en.wikipedia.org/wiki/Natural_language_processing), including the [bag-of-words (BoW)](https://en.wikipedia.org/wiki/Bag-of-words_model) model, can be applied only to a limited extent. For the purpose of genericity, the most important *component* of a log to be analyzed by this notebook's method is the *sequence* of log messages.

Focusing on the log message *sequences*, there are 2 main tasks involved in preparing log data for ML-aided analysis:
1. cleaning, filtering and sorting - to obtain only the log messages of interest;
2. classification or clusterization of the selected log messages into messages types to be processed as a sequence.

Both task 1 and task 2, but especially task 2, can be accomplished via ML.

Task 1 is important in filtering out only the relevant log information, especially when the log is huge in size (e.g., gigabytes). Regular expressions or even ML can be utilized for solving this task.

Task 2 is based on task 1, and for solving this task some ML text classification or clustering algorithm can be utilized, as compared to using the pure regular expressions approach.

After task 1 and task 2 are completed, a log is converted to a sequence of *message types* rather than messages.

|  Log             |
|------------------|
|  Message type 1  |
|  ...             |
|  Message type n  |


However, mentioning *message type* is just for clarification regarding how the method works, so in this notebook a log entry is just called *log message*.

### Method

As per [A survey on the application of deep learning for anomaly detection in logs](#a-survey-on-the-application-of-deep-learning-for-anomaly-detection-in-logs), there exist various approaches for anomaly detection in sequential text data. **In particular, this notebook demonstrates a hands-on, generic in its simplicity, method for finding anomalies in a potentially large amount of logs by utilizing [k-means clustering](#k-means-clustering), regular expressions and [recurrent neural network (`RNN`)](#recurrent-neural-network).**

The implementation includes three main tasks:
1. application of an [unsupervised](https://en.wikipedia.org/wiki/Unsupervised_learning) clusterization method via [k-means clustering](#k-means-clustering), based on [singular value decomposition (`SVD`)](#singular-value-decomposition) on potentially high-dimensional sparse data;
1. transformation of the clustered log message data into unique per-cluster regular expression patterns that represent each log message as input to the `RNN` model;
1. application of a [semi-supervised](https://en.wikipedia.org/wiki/Weak_supervision) anomaly detection method via `RNN`, based on the following step-by-step algorithm:
    1. training process to learn possible log message sequences in already available normal log data (the supervised part);
    1. next log message predictions on new log data;
    1. [optional] if in step 2 a next log message cannot be predicted based on what is learned in step 1, a human intervention is needed to determine whether this is an anomaly or just a new log message to be learned (the unsupervised part);
    1. [optional] if in step 3 are found new normal log messages, go to step 1, else: go to step 2.

The implementation of the `BoW` log clusterization model is based on [Scikit-learn](#scikit-learn).
The implementation of the `RNN` anomaly detection model is based on [TensorFlow](#tensorflow).

In this notebook no hyperparameter tuning is performed because the main purpose of the notebook is to demonstrate a working example.

### Limitations

This notebook's generic method does not perform any timestamp-related log data preprocessing as this would cause noise in the data and potentially incorrect results. This is why there is a setting that specifies removal of initial token(s) per log line, where is typically the place of a timestamp.

### Other log message grouping methods

The demonstrated clusterization method is a coarse, generic and basically *unsupervised* one. Apart from tuning the number of clusters found using the `k-means` method, via additional configuration it is possible to perform fine-grained grouping on log messages, for example, by at least partial log *message labeling* combined with *label-spreading*. Also, a not ML-based approach with regular expressions is possible. However, such alternative methods tend to require expert-level assistance and thus do not represent a generic, autonomous solution.

## System setup

[Scikit-learn](#scikit-learn), [TensorFlow](#tensorflow) and [Keras 2](#keras) are used.

## Configuration

In [None]:
# Common
RANDOM_SEED = 42

# Data
CACHE_DIR = mkdtemp()
COLUMN_INDICES_TO_DROP = [0]  # E.g., date/time.
DATA_DIR = "data"
DATASET_FILE = "10492770.zip"
DATASET_FILES = [
  "Authentication_Logs_logs.csv",
  "Cloud-Based_Services_logs.csv",
  "Operating_System_Logs_logs.csv",
  "Server_Logs_logs.csv",
  "Syslog_Data_logs.csv",
]
DATASET_URL = (
  "https://zenodo.org/api/records/"
  f"{os.path.splitext(DATASET_FILE)[0]}/files-archive"
)
SKIPPED_LOG_SEQUENCE = (2, 0)  # Indices from DATASET_FILES.
LOG_SIZE = 100_000
SUBSEQUENCE_LENGTH = 100
VALIDATION_SPLIT = 10_000

# Modeling
MODELS_DIR = "models"

BOW_MODEL_NAME = "log_data_clusterizer.pkl"
BOW_MODEL_PATH = (
  Path() / MODELS_DIR / BOW_MODEL_NAME
)
KMEANS_RETRIES = 10
MIN_EXPLAINED_VARIANCE_RATIO = 0.95
MIN_CLUSTERS = 2
CLUSTERS = 5
MAX_CLUSTERS = 8

RNN_MODEL_NAME = "text_sequence_analyzer.keras"
RNN_MODEL_PATH = (
  Path() / MODELS_DIR / RNN_MODEL_NAME
)
EMBEDDING_DIM = 256
RNN_UNITS = 1024

# Output formatting
SEPARATOR = " " * 3

# Training
BATCH_SIZE = 128
BUFFER_SIZE = 10_000
EARLY_STOP_PATIENCE = 3
EPOCHS = 100

# Testing
NEXT_TOKEN_PROB_THRESHOLD = 0.02

## Log generator

The `retrieve_dataset` function downloads from `DATASET_URL` to `DATA_DIR` the [Comprehensive Network Logs Dataset for Multi-Device Analysis](#comprehensive-network-logs-dataset-for-multi-device-analysis) log dataset.

The generated log is a random mix of messages from `DATASET_FILES` which are part of the log dataset.

Intentional dataset-to-dataset sequence gaps are left as per the dataset indices from `SKIPPED_LOG_SEQUENCE`, so that to be possible to reproduce anomalies based on learned incomplete sequences. It must be noted that `SKIPPED_LOG_SEQUENCE` can be applied reliably for more than 2 `DATASET_FILES`.

As this notebook's method does not perform timestamp-related processing, the initial column, which typically contains logged date/time, is removed by default. Also, *NaN* data is removed, if any.

The `generate_log` function generates a log of specified size. To be followed a typical case, the generated log is represented as a whole text of lines where each line corresponds to a log message.

In [None]:
def retrieve_dataset() -> None:
  ds_path = Path() / DATA_DIR / DATASET_FILE
  if not Path.exists(ds_path):
    response = requests.get(DATASET_URL)
    zip_fileobj = io.BytesIO(response.content)
    with open(ds_path, "wb") as file:
      file.write(zip_fileobj.getbuffer())
    with ZipFile(zip_fileobj) as zfile:
      zfile.extractall(ds_path.parent)


class LogGenerator:
  """Log generator implemented as a callable.

  The generated log is a random mix of messages from `DATASET_FILES`.

  Intentional dataset-to-dataset sequence gaps are left as per the dataset
  indices from `SKIPPED_LOG_SEQUENCE`, so that to be possible to reproduce
  anomalies based on learned incomplete sequences. It must be noted that
  `SKIPPED_LOG_SEQUENCE` works reliably for more than 2 `DATASET_FILES`.

  For each log dataset, the initial column, which typically contains logged
  date/time, is removed by default. Also, *NaN* data is removed, if any.
  """

  def __init__(self):
    self._datasets: list[list[str]] = []
    for filename in DATASET_FILES:
      dataset = pd.read_csv(Path() / DATA_DIR / filename)
      dataset = dataset.drop(
        columns=dataset.columns[COLUMN_INDICES_TO_DROP]
      )
      dataset = dataset.dropna()
      dataset_as_list = dataset.apply(
        lambda row: " ".join([str(row[col]).strip() for col in dataset]),
        axis=1,
      ).to_list()
      self._datasets.append(dataset_as_list)
    self._ds_idx = -1
    self._ds_pos = [-1] * len(self._datasets)

  def __iter__(self) -> "LogGenerator":
    return self

  def __next__(self) -> str:
    ds_idx = random.randint(0, len(self._datasets)-1)
    while (self._ds_idx, ds_idx) == SKIPPED_LOG_SEQUENCE:
      ds_idx = random.randint(0, len(self._datasets)-1)
    self._ds_idx = ds_idx
    self._ds_pos[ds_idx] += 1
    if self._ds_pos[ds_idx] == len(self._datasets[ds_idx]):
      self._ds_pos[ds_idx] = 0
    return self._datasets[ds_idx][self._ds_pos[ds_idx]]


def generate_log(
  log_gen: LogGenerator,
  log_size: int = LOG_SIZE,
) -> list[str]:
  return ["\n".join([next(log_gen) for _ in range(log_size)])]


set_random_seed(RANDOM_SEED)
retrieve_dataset()
log_gen = LogGenerator()
log_text = generate_log(log_gen)
log_lines = log_text[0].splitlines()

## Log message types clustering

Log message types clustering is realized by a `BoW` model which is a [Scikit-learn](#scikit-learn) pipeline consisting of:
- an `sklearn.feature_extraction.text.CountVectorizer` instance for per-message feature extraction;
- an `sklearn.decomposition.TruncatedSVD` instance for dimensionality reduction via `SVD`, important for a stable clustering process;
- an `sklearn.preprocessing.Normalizer` instance for normalization of the `SVD` data, important for a stable clustering process;
- an `sklearn.cluster.KMeans` instance to perform the clustering process.

An optimal number of `SVD` components is determined by the `get_svd_components_count` function.

In [None]:
if not Path.exists(BOW_MODEL_PATH):
  def get_svd_components_count(
    vectorizer: CountVectorizer,
    X: Iterable[str],
  ) -> int:
    """Returns an optimal number of SVD components for specified data.

    Args:
      vectorizer: Transforms `X` to a matrix of frequency counts per feature.
      X: Iterable for log messages.

    Returns:
      A number of SVD components corresponding to
      `MIN_EXPLAINED_VARIANCE_RATIO`.
    """
    X_vect = vectorizer.fit_transform(X)
    svd = TruncatedSVD(
      n_components=len(vectorizer.vocabulary_),
      random_state=RANDOM_SEED,
    )
    svd.fit(X_vect)
    explained_variance_ratio = svd.explained_variance_ratio_
    explained_variance_ratio = explained_variance_ratio[
      np.argsort(explained_variance_ratio)
    ][::-1]
    return int(np.argmax(
      np.cumsum(explained_variance_ratio) >= MIN_EXPLAINED_VARIANCE_RATIO
    )) + 1

  vectorizer = CountVectorizer(
    lowercase=False, tokenizer=str.split, token_pattern=None
  )
  log_data_clusterizer = make_pipeline(
    vectorizer,
    TruncatedSVD(
      n_components=get_svd_components_count(vectorizer, log_lines),
      random_state=RANDOM_SEED,
    ),
    Normalizer(copy=False),
    KMeans(n_init=KMEANS_RETRIES, random_state=RANDOM_SEED),
    memory=CACHE_DIR,
  )

### Number of clusters

By application of the *elbow* method, `CLUSTERS` clusters are selected. Even though preset in the [configuration section](#configuration), `CLUSTERS` is to be selected here based on an *elbow*-like curve.

In [None]:
if not Path.exists(BOW_MODEL_PATH):
  def train_clusterizer(model: Pipeline) -> None:
    k_inertias: list[float] = []
    k_range = np.r_[MIN_CLUSTERS:CLUSTERS, CLUSTERS+1:MAX_CLUSTERS]
    for k in k_range:
      model.set_params(kmeans__n_clusters=k)
      model.fit(log_lines)
      k_inertias.append(model.named_steps["kmeans"].inertia_)
    plt.scatter(k_range, k_inertias, c="blue")

    model.set_params(kmeans__n_clusters=CLUSTERS)
    model.fit(log_lines)
    plt.scatter(
      [CLUSTERS], model.named_steps["kmeans"].inertia_, c="red",
      label=f"Selected K={CLUSTERS}",
    )

    plt.xlabel("K")
    plt.xticks(np.concatenate((k_range, [CLUSTERS])))
    plt.ylabel("Inertia")
    plt.legend()
    plt.show()

  train_clusterizer(log_data_clusterizer)
  rmtree(CACHE_DIR)

### Text sequence generation

In this section is prepared `text_sequence` which is input to the [text sequence analyzer](#text-sequence-analyzer) model.

In this section is also prepared `anomalous_text_sequence`, which is a log message sequence meant to contain all possible permutation tuples, including anomalous tuples corresponding to `SKIPPED_LOG_SEQUENCE`. `anomalous_text_sequence` is used to reproduce an [anomaly](#anomaly-detection) and then show that there are no anomalies after the [corrective action](#corrective-action).

Based on the output from the `get_log_message_type_summary_map` and `get_top_features` functions, `text_sequence` and `anomalous_text_sequence` are generated as texts of message summaries in *regex* format, one summary per line. The message summaries must be unique just as the message types are.

In [None]:
def get_log_message_type_summary_map(
  model: Pipeline,
  log_lines: list[str],
  log_message_types: np.ndarray | None = None,
) -> dict[int, str]:
  """Maps a cluster to regex representations of its messages.

  A regex format is useful, because:
    - it defines a unique summary for each learned log message from each
      cluster, thus providing for a sequence of unique inputs to the
      `text_sequence_analyzer` model;
    - it provides for a means to determine whether a new log message is
      actually a known one that is already learned by `text_sequence_analyzer`.

  Args:
    model: A `Scikit-learn` pipeline for log message clusterization.
    log_lines: Log messages, one per line.
    log_message_types: Optional, log message types predicted by `model`.
      Can be set internally based on `model` and `log_lines`.

  Returns:
    A type-summary mapping for log messages, where the summary is a regex.
  """
  np_log_lines = np.array(log_lines)
  if log_message_types is None:
    log_message_types = model.predict(np_log_lines)
  message_type_summary_map: dict[int, str] = {}
  for cluster, top_features in get_top_features(model).items():
    messages = np_log_lines[np.where(log_message_types == cluster)]
    fine_tokens = [msg.split() for msg in messages]
    cluster_regexes: dict[int, list[str]] = {}
    for msg_tokens in fine_tokens:
      regex = create_regex(msg_tokens, top_features)
      if cluster in cluster_regexes:
        if regex not in cluster_regexes[cluster]:
          cluster_regexes[cluster] += [regex]
      else:
        cluster_regexes[cluster] = [regex]
    for msg in messages:
      assert any(re.match(regex, msg) is not None
                 for regex in cluster_regexes[cluster])
    message_type_summary_map[cluster] = "|".join(
      sorted(cluster_regexes[cluster])
    )
  return message_type_summary_map


def create_regex(
  msg_tokens: list[str],
  top_features: set[str],
) -> str:
  sequence: list[str] = []
  feature_sequence: list[str] = []
  for token in msg_tokens:
    if token in top_features:
      feature_sequence += [token]
    else:
      if feature_sequence:
        feature_sequence_str = r'\s'.join(feature_sequence)
        sequence += [feature_sequence_str, ".+"]
        feature_sequence = []
      else:
        if not sequence or sequence[-1] != ".+":
          sequence += [".+"]
  if feature_sequence:
    feature_sequence_str = r'\s'.join(feature_sequence)
    sequence += [feature_sequence_str]
  sequence_str = r'\s+'.join(sequence)
  return f"(^{sequence_str}$)"


def get_top_features(model: Pipeline) -> dict[int, set[str]]:
  vectorizer = model.named_steps["countvectorizer"]
  all_features = vectorizer.get_feature_names_out()
  top_feature_indices_per_cluster = np.argsort(
    model.named_steps["truncatedsvd"].inverse_transform(
      model.named_steps["kmeans"].cluster_centers_
    )
  )[:, ::-1]
  top_features_per_cluster: dict[int, set[str]] = {}
  for cluster, feature_indices in enumerate(top_feature_indices_per_cluster):
    top_features: set[str] = set()
    for feature in all_features[feature_indices]:
      if model.score([feature]) <= -1:
        break
      top_features.add(feature)
    if not top_features:
      top_features = set(all_features[feature_indices][:3])
    top_features_per_cluster[cluster] = top_features
  return top_features_per_cluster


if Path.exists(BOW_MODEL_PATH):
  log_data_clusterizer = pickle.load(open(BOW_MODEL_PATH, "rb"))
else:
  pickle.dump(log_data_clusterizer, open(BOW_MODEL_PATH, "wb"))

print(log_data_clusterizer)

log_message_types = log_data_clusterizer.predict(log_lines)
log_message_type_summary_map = get_log_message_type_summary_map(
  log_data_clusterizer, log_lines, log_message_types
)
text_sequence = [
  "\n".join([log_message_type_summary_map[t]
             for t in log_message_types])
]
anomalous_text_sequence = [
  "\n".join([
    f"{log_message_type_summary_map[p1]}\n"
    f"{log_message_type_summary_map[p2]}"
    for p1, p2 in itertools.permutations(np.unique(log_message_types), r=2)]
  )
]
print("\n\nLog message type-summary mapping")
print("-" * 80)
log_message_type_summary_map

## Text sequence analyzer

### Definitions
The anomaly detection model is defined here. This model is named `text_sequence_analyzer` and is of type `TextSequenceAnalyzer`. To keep both preprocessing and training logic together in the subsequently saved model, an internal to the model class `Preprocessor` is defined.

`text_sequence_analyzer` is an end-to-end sequence analyzer capable of whitespace-tokenizing raw textual inputs into sequences of token ids and predicting the next token id. The `detect_anomalies` method contains the anomaly detection logic - if a token id part of the input sequence is not predicted based on `NEXT_TOKEN_PROB_THRESHOLD`, `anomaly_detected_cb` is called with all data related to the anomaly.

If a `text_sequence_analyzer.keras` model file in `Keras` format exists - the model is loaded from this file.

In [None]:
@register_keras_serializable(package="text_sequence_analysis")
class TextSequenceAnalyzer(Model):
  """RNN-based model that learns sequential data and detects anomalies.

  This is an end-to-end model capable of tokenizing raw textual inputs
  into sequences of token ids to predict the next token id.

  To keep both preprocessing and training logic together in the subsequently
  saved model, an internal to the model class `Preprocessor` is defined to
  wrap a `tensorflow.keras.layers.TextVectorization` instance.

  An input text sequence is learned by a combination of layers:
  - a `Preprocessor` layer to build a *vocabulary* from the model's inputs;
  - a `tensorflow.keras.layers.Embedding` layer with `EMBEDDING_DIM`
    output vector size;
  - a `tensorflow.keras.layers.GRU` sequence-to-sequence layer with `RNN_UNITS`
    number of units;
  - a `tensorflow.keras.layers.Dense` multinomial classification layer with
    number of units matching the vocabulary size.

  The `detect_anomalies` method contains the anomaly detection logic -
  if a token id part of the input sequence is not predicted based on
  `NEXT_TOKEN_PROB_THRESHOLD`, `anomaly_detected_cb` is called with all
  data related to the anomaly.

  This model is trained for multi-label predictions via `softmax` activation,
  with `tensorflow.keras.losses.SparseCategoricalCrossentropy` used for loss
  and `tensorflow.keras.optimizers.Adam` used for optimizer.
  """

  @register_keras_serializable(package="text_sequence_analysis")
  class Preprocessor(Layer):
    """A preprocessing layer for `TextSequenceAnalyzer`.

    This layer has two processing modes: for training and not for training.
    The modes are specified as a parameter to the `preprocess` method, where is
    the implementation for these modes.

    In the for-training mode, the input text sequence is vectorized and split
    into a shuffled and batched training set and a batched validation set
    as per the `self._to_tf_dataset` method.

    In the alternative mode, the input text sequence is only vectorized.
    """

    def __init__(self, vocabulary: list[str]):
      super().__init__()
      self._text_vectorizer = TextVectorization(
        standardize=None, split=tf.strings.split,
        vocabulary=vocabulary,
      )

    @register_keras_serializable(package="text_sequence_analysis")
    def preprocess(
      self,
      text_sequence: list[str],
      for_training: bool = False,
      validation_split: int = VALIDATION_SPLIT,
    ) -> tuple[Dataset, Dataset] | Tensor:
      sequence: Tensor = self._text_vectorizer(text_sequence)[0]
      if for_training:
        if isinstance(validation_split, float):
          validation_split = int(validation_split * len(sequence))
        train_seq = sequence[:-validation_split]
        train_ds = self._to_tf_dataset(train_seq, shuffle=True)
        valid_seq = sequence[-validation_split:]
        valid_ds = self._to_tf_dataset(valid_seq, shuffle=False)
        return train_ds, valid_ds
      else:
        return sequence

    @register_keras_serializable(package="text_sequence_analysis")
    def get_vocabulary(self) -> list[str]:
      return self._text_vectorizer.get_vocabulary()  # type: ignore

    @register_keras_serializable(package="text_sequence_analysis")
    @classmethod
    def _to_tf_dataset(
      cls,
      sequence: Tensor,
      shuffle: bool = False,
      random_seed: int | None = None,
    ) -> Dataset:
      """Converts integers to a dataset of shift-by-one series in tuples.

      Args:
        sequence: A tensor of integers.
        shuffle: Whether to shuffle the input sequence.
        seed: The random seed if `shuffle=True`.

      Returns:
        A dataset that is batched, can be shuffled, and is optimized
        for access.
      """
      dataset = Dataset.from_tensor_slices(sequence)
      dataset = dataset.batch(SUBSEQUENCE_LENGTH+1, drop_remainder=True)
      dataset = dataset.map(lambda seq: (seq[:-1], seq[1:]))
      if shuffle:
        dataset = dataset.shuffle(BUFFER_SIZE, seed=random_seed)
      dataset = dataset.batch(BATCH_SIZE, drop_remainder=False)
      dataset = dataset.prefetch(AUTOTUNE)
      return dataset


  @register_keras_serializable(package="text_sequence_analysis")
  @classmethod
  def create_preprocessor(
    cls,
    text_sequence: list[str],
  ) -> "TextSequenceAnalyzer.Preprocessor":
    text_vectorizer = TextVectorization(
      standardize=None, split=tf.strings.split
    )
    text_vectorizer.adapt(text_sequence)
    return TextSequenceAnalyzer.Preprocessor(
      text_vectorizer.get_vocabulary()
    )

  def __init__(
    self,
    preprocessor: "TextSequenceAnalyzer.Preprocessor",
    embedding_dim: int,
    rnn_units: int,
  ):
    super().__init__()
    vocab_size = len(preprocessor.get_vocabulary())
    self._preprocessor = preprocessor
    self._embedding_dim = embedding_dim
    self._embedding = Embedding(
      vocab_size, embedding_dim
    )
    self._rnn_units = rnn_units
    self._gru = GRU(
      rnn_units, return_sequences=True, return_state=True
    )
    self._dense = Dense(
      vocab_size, activation="softmax"
    )

  @override
  def call(  # type: ignore
    self,
    inputs: Tensor,
    training: bool = False,
    mask: Tensor | None = None,
    states: list[Any] | None = None,
    return_state: bool = False,
  ) -> tuple[Tensor, list[Any] | None] | Tensor:
    x = inputs
    x = self._embedding(x, training=training)
    if states is None:
      states = self._gru.get_initial_state(x)
    x, states = self._gru(x, initial_state=states, training=training)
    x = self._dense(x, training=training)
    if return_state:
      return x, states
    return x

  @register_keras_serializable(package="text_sequence_analysis")
  def detect_anomalies(
    self,
    text: list[str],
    anomaly_detected_cb: Callable[[list[str], list[int], int, list[int]],
                                  None],
  ) -> None:
    """Detects anomalies in input text and notifies about them via callback.

    The input text is tokenized and the tokens are passed one-by-one
    to the model to repeatedly obtain the next token prediction. In every
    iteration the last token must be a valid next token as per the model,
    i.e., with probability greater than `NEXT_TOKEN_PROB_THRESHOLD`,
    otherwise anomaly is detected and notified via `anomaly_detected_cb`.

    Args:
      text: Input text that is to be tokenized by `self.preprocessor`.
      anomaly_detected_cb: A callback to be called when anomaly is detected.
    """
    sequence: list[int] = []
    predictions: Tensor | None = None
    states: list[Any] | None = None
    predicted_token_ids: list[int] = []
    token_ids = (self._preprocessor.preprocess(text)
                 .numpy().ravel().tolist())
    for token_id in token_ids:
      sequence += [token_id]
      if predictions is not None and token_id not in predicted_token_ids:
        anomaly_detected_cb(self._preprocessor.get_vocabulary(), sequence,
                            token_id, predicted_token_ids)
      predictions, states = self.call(  # type: ignore
        inputs=constant([[token_id]]), states=states, return_state=True
      )
      predictions = predictions[:, -1][0]
      predicted_token_ids = tf.where(
        predictions >= NEXT_TOKEN_PROB_THRESHOLD
      ).numpy().ravel().tolist()

  @property
  def preprocessor(self) -> "TextSequenceAnalyzer.Preprocessor":
    return self._preprocessor

  @override
  def get_config(self) -> dict[str, Any]:
    config = super().get_config()
    config.update({
      "preprocessor": serialize_keras_object(
        self._preprocessor
      ),
      "embedding_dim": self._embedding_dim,
      "rnn_units": self._rnn_units,
    })
    return config

  @classmethod
  def from_config(
    cls,
    config: dict[str, Any],
    custom_objects=None,
  ) -> "TextSequenceAnalyzer":
    preprocessor = deserialize_keras_object(
      config["preprocessor"]
    )
    embedding_dim = deserialize_keras_object(
      config["embedding_dim"]
    )
    rnn_units = deserialize_keras_object(
      config["rnn_units"]
    )
    return cls(preprocessor, embedding_dim, rnn_units)


if Path.exists(RNN_MODEL_PATH):
  text_sequence_analyzer = load_model(RNN_MODEL_PATH)
  assert(text_sequence_analyzer is not None)
  preprocessor = text_sequence_analyzer.preprocessor
else:
  preprocessor = TextSequenceAnalyzer.create_preprocessor(text_sequence)
  train_ds, valid_ds = preprocessor.preprocess(
    text_sequence, for_training=True
  )
  text_sequence_analyzer = TextSequenceAnalyzer(
    preprocessor=preprocessor,
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS,
  )
  text_sequence_analyzer.build(input_shape=(None, 1))
  text_sequence_analyzer.compile(loss="sparse_categorical_crossentropy",
                                 optimizer="adam")
  text_sequence_analyzer.summary()

### Training

The training process for `text_sequence_analyzer` is defined in the `train_sequence_analyzer` function. Training does not start in case of an already saved model, i.e., if a `text_sequence_analyzer.keras` file exists. If training does start, overfitting is prevented by usage of a `tensorflow.keras.callbacks.EarlyStopping` callback, `EARLY_STOP_PATIENCE` and `val_loss` as the monitored metric. The maximum number of training epochs is configured in `EPOCHS`.

In [None]:
def train_sequence_analyzer(
  text_sequence_analyzer: TextSequenceAnalyzer,
  train_ds: Dataset,
  valid_ds: Dataset,
  callbacks: list[Callback],
) -> None:
  history = text_sequence_analyzer.fit(
    train_ds,
    validation_data=valid_ds,
    epochs=EPOCHS,
    callbacks=callbacks,
  )
  plt.plot(history.history["val_loss"], "r--",
           label="val_loss")
  plt.xlabel("Epoch")
  plt.xlim([0, EPOCHS])
  plt.legend()
  plt.show()


early_stop_cb = EarlyStopping(
  monitor="val_loss",
  patience=EARLY_STOP_PATIENCE,
  restore_best_weights=True,
)

if not Path.exists(RNN_MODEL_PATH):
  train_sequence_analyzer(
    text_sequence_analyzer, train_ds, valid_ds,
    callbacks=[early_stop_cb],
  )
  text_sequence_analyzer.save(RNN_MODEL_PATH)

### Anomaly detection

In this section the `test_for_anomalies` function demonstrates anomaly detection.

It must be noted that there can be *false positives* in text generated by `generate_log`, this is when `text_sequence_analyzer` is under-trained on otherwise valid sequences. Such *false positives* can also be detected by setting "high" `NEXT_TOKEN_PROB_THRESHOLD` in combination with "low" `EPOCHS`, again causing `text_sequence_analyzer` to have a high bias on `train_ds` and mispredict the next token. Setting too small values for `LOG_SIZE`, `SUBSEQUENCE_LENGTH` or `EARLY_STOP_PATIENCE` can also lead to under-trained model and *false positives*. On the other hand, an overfitting model can also have problems with *false positives* because it can assign high probabilities due to focusing too much on certain log messages.

Interestingly, in the above-mentioned cases anomaly detection can be used as an indicator for an under-trained model.

Also important is the regular case of false positives - new to the model *normal* data which simply has to be learned. This case is demonstrated when the `detect_anomalies` method is called with `anomalous_text_sequence` which is meant to contain `SKIPPED_LOG_SEQUENCE`-based subsequences that have not been learned by `text_sequence_analyzer`.

In [None]:
def on_anomaly_detected(
  vocabulary: list[str],
  sequence: list[int],
  token_id: int,
  predicted_token_ids: list[int],
) -> None:
  sequence_str = SEPARATOR.join([
    f"{i}.{vocabulary[n_token_id]}"
    for i, n_token_id in enumerate(sequence, start=1)
  ])
  token_id_str = vocabulary[token_id]
  predicted_token_ids_str = SEPARATOR.join([
    vocabulary[p_token_id] for p_token_id in predicted_token_ids
  ])
  print("ANOMALY DETECTED")
  print("-" * 80)
  print("SEQUENCE:")
  text_wrapper = TextWrapper()
  sequence_lines = text_wrapper.wrap(sequence_str)
  for line in sequence_lines:
    print(line)
  print("-" * 80)
  print(f"{token_id_str}\nNOT IN")
  predicted_token_ids_lines = text_wrapper.wrap(
    predicted_token_ids_str
  )
  for line in predicted_token_ids_lines:
    print(line)
  print("\n\n")


def test_for_anomalies(
  text_sequence_analyzer: TextSequenceAnalyzer,
  log_data_clusterizer: Pipeline,
  type_summary_map: dict[int, str],
  anomaly_detected_cb: Callable[[list[str], list[int], int, list[int]],
                                None],
) -> None:
  log_text = generate_log(log_gen, SUBSEQUENCE_LENGTH)
  log_lines = log_text[0].splitlines()
  log_message_types = log_data_clusterizer.predict(log_lines)
  text_sequence = [
    "\n".join([type_summary_map[t]
               for t in log_message_types])
  ]
  # There should be no anomalies with generate_log
  # unless the model is undertrained.
  text_sequence_analyzer.detect_anomalies(
    text_sequence,
    anomaly_detected_cb,
  )
  text_sequence_analyzer.detect_anomalies(
    anomalous_text_sequence,
    anomaly_detected_cb,
  )


test_for_anomalies(
  text_sequence_analyzer, log_data_clusterizer,
  log_message_type_summary_map, on_anomaly_detected,
)

### Corrective action

`SKIPPED_LOG_SEQUENCE` is made invalid after set to *(-1, -1)*, so that new log generation should include any previously skipped sequence and thus `text_sequence_analyzer` can learn that such a sequence is not anomalous.

In [None]:
SKIPPED_LOG_SEQUENCE = (-1, -1)
new_log_text = generate_log(log_gen)
new_log_lines = new_log_text[0].splitlines()
new_log_message_types = log_data_clusterizer.predict(
  new_log_lines
)
new_text_sequence = [
  "\n".join([log_message_type_summary_map[t]
             for t in new_log_message_types])
]
new_train_ds, new_valid_ds = preprocessor.preprocess(
  new_text_sequence, for_training=True
)
train_sequence_analyzer(
  text_sequence_analyzer, new_train_ds, new_valid_ds,
  callbacks=[early_stop_cb],
)
test_for_anomalies(
  text_sequence_analyzer, log_data_clusterizer,
  log_message_type_summary_map, on_anomaly_detected,
)

## New log data

In this section is demonstrated how new log data can be interpreted based on `log_message_type_summary_map`. Data that does not fit any known cluster can either be included for re-training of the `log_data_clusterizer` and `text_sequence_analyzer` models, or can simply be filtered out.

In [None]:
test_log_message = new_log_lines[0]
test_log_message_matched = False
for cluster, pattern in log_message_type_summary_map.items():
  if re.match(pattern, test_log_message) is not None:
    test_log_message_matched = True
    print(f"'{test_log_message}' is a known log message from "
          f"cluster {cluster}.")
    break
if not test_log_message_matched:
  print(f"'{test_log_message}' is not a known log message.")
log_message_type_summary_map

## Reset global state

In [None]:
clear_session()

## References

<br><br>

### APA style for references
American Psychological Association. (2022). Creating an APA Style reference list guide. https://apastyle.apa.org/instructional-aids/creating-reference-list.pdf

American Psychological Association. (2024). APA Style common reference examples guide. https://apastyle.apa.org/instructional-aids/reference-examples.pdf

<br><br>

### A survey on the application of deep learning for anomaly detection in logs
Himler, P., Landauer, M., Skopik, F., & Wurzenberger, M. (2024). Anomaly detection in log-event sequences: A federated deep learning approach and open challenges. *Machine Learning with Applications*, *16*, 100554. https://doi.org/10.1016/j.mlwa.2024.100554

<br><br>

### Deeplog: Anomaly detection and diagnosis from system logs through deep learning
Du, M., Li, F., Zheng, G., & Srikumar, V. (2017, October). Deeplog: Anomaly detection and diagnosis from system logs through deep learning. *In Proceedings of the 2017 ACM SIGSAC conference on computer and communications security* (pp. 1285-1298). https://doi.org/10.1145/3133956.3134015

<br><br>

### Datasets
##### Comprehensive Network Logs Dataset for Multi-Device Analysis
Salman, M., & Hasan, R. (2024). Comprehensive Network Logs Dataset for Multi-Device Analysis (Version v1) [Data set]. Zenodo. https://doi.org/10.5281/zenodo.10492770

<br><br>

### Machine learning methods
##### Singular value decomposition
- [Singular value decomposition - Wikipedia](https://en.wikipedia.org/wiki/Singular_value_decomposition)
##### K-means clustering
- [k-means clustering - Wikipedia](https://en.wikipedia.org/wiki/K-means_clustering)
##### Recurrent neural network
- [Recurrent neural network - Wikipedia](https://en.wikipedia.org/wiki/Recurrent_neural_network)

<br><br>

### Machine learning models
##### Bag-of-words
- [Bag-of-words model - Wikipedia](https://en.wikipedia.org/wiki/Bag-of-words_model)

<br><br>

### GitHub repos
- [GitHub - ageron/handson-ml3: A series of Jupyter notebooks that walk you through the fundamentals of Machine Learning and Deep Learning in Python using Scikit-Learn, Keras and TensorFlow 2.](https://github.com/ageron/handson-ml3)
  - [handson-ml3/16_nlp_with_rnns_and_attention.ipynb at main · ageron/handson-ml3 · GitHub](https://github.com/ageron/handson-ml3/blob/main/16_nlp_with_rnns_and_attention.ipynb)

<br><br>

### Guides and tutorials
- [Clustering text documents using k-means — scikit-learn documentation](https://scikit-learn.org/stable/auto_examples/text/plot_document_clustering.html#k-means-clustering-on-text-features)
- [Save, serialize, and export models  |  TensorFlow Core](https://www.tensorflow.org/guide/keras/serialization_and_saving#custom_objects)
- [Text generation with an RNN  |  TensorFlow](https://www.tensorflow.org/text/tutorials/text_generation)
- [Working with preprocessing layers  |  TensorFlow Core](https://www.tensorflow.org/guide/keras/preprocessing_layers)

<br><br>

### Libraries
##### Keras
Chollet, F., & others. (2015). Keras. https://keras.io
- [Getting started with Keras](https://keras.io/getting_started/#tensorflow--keras-2-backwards-compatibility)
##### Matplotlib
Hunter, J. D. (May-June 2007). Matplotlib: A 2D Graphics Environment. *Computing in Science & Engineering*, *9*(3), 90-95. https://doi.org/10.1109/MCSE.2007.55
- [Quick start guide](https://matplotlib.org/stable/users/explain/quick_start.html)
##### Numpy
Harris, C. R., Millman, K. J., van der Walt, S. J., Gommers, R., Virtanen, P., Cournapeau, D., Wieser, E., Taylor, J., Berg, S., Smith, N. J., Kern, R., Picus, M., Hoyer, S., van Kerkwijk, M. H., Brett, M., Haldane, A., del Río, J. F., Wiebe, M., Peterson, P., ... Oliphant, T. E. (2020). Array programming with NumPy. *Nature*, *585*, 357–362. https://doi.org/10.1038/s41586-020-2649-2
- [What is NumPy?](https://numpy.org/doc/2.2/user/whatisnumpy.html)
##### Pandas
The pandas development team. pandas-dev/pandas: Pandas [Computer software]. https://doi.org/10.5281/zenodo.3509134
- [Getting started — pandas](https://pandas.pydata.org/docs/getting_started/index.html#intro-to-pandas)
##### Scikit-learn
Pedregosa, F., Varoquaux, G., Gramfort, A., Michel, V., Thirion, B., Grisel, O., Blondel, M., Prettenhofer, P., Weiss, R., Dubourg, V., Vanderplas, J., Passos, A., Cournapeau, D., Brucher, M., Perrot, M., & Duchesnay, É. (2011). Scikit-learn: Machine Learning in Python. *Journal of Machine Learning Research*, *12*, 2825–2830. https://jmlr.csail.mit.edu/papers/v12/pedregosa11a.html
- [Getting Started — scikit-learn](https://scikit-learn.org/stable/getting_started.html)
##### TensorFlow
Abadi, M., Agarwal, A., Barham, P., Brevdo, E., Chen, Z., Citro, C., Corrado, G. S., Davis, A., Dean, J., Devin, M., Ghemawat, S., Goodfellow, I., Harp, A., Irving, G., Isard, M., Jozefowicz, R., Jia, Y., Kaiser, L., Kudlur, M., ... Zheng, X. (2015). TensorFlow, Large-scale machine learning on heterogeneous systems [Computer software]. https://doi.org/10.5281/zenodo.4724125
- [Introduction to TensorFlow](https://www.tensorflow.org/learn)