# Automating the Cybersecurity Triage Process: A Comparative Study on the Performance of Large Language Models

Install required libraries, including OpenAI and Ollama

In [None]:
!pip install openai
!pip install ollama
!pip install pandas
!pip install openpyxl
!pip install matplotlib

Import libraries

In [1]:
import os
import time
import json
import re
import pandas as pd
import matplotlib.pyplot as plt
from statistics import median
from abc import abstractmethod, ABC
from typing import Any, Callable
from openai import AzureOpenAI
from ollama import Client

plt.style.use('tableau-colorblind10')

Define Abstraction Class for Language Models and Prompts.

In [2]:
class Prompt:

    def __init__(self, system: str, user: str):
        self.system = system
        self.user = user


class LanguageModel:

    def __init__(self, model_id: str):
        self.model_id = model_id

    @abstractmethod
    def generate(self, prompt: Prompt) -> dict[str, str]:
        pass


class PromptGenerator:

    @abstractmethod
    def generate(self, input_value: any) -> Prompt:
        pass

    @abstractmethod
    def get_id(self) -> str:
        pass

    @abstractmethod
    def get_field(self) -> str:
        pass


class JsonPromptGenerator(PromptGenerator, ABC):

    def __init__(self, data: dict[str, str]):
        self.data = data

    def get_field(self) -> str:
        return self.data['field']

    def get_id(self) -> str:
        return self.data['id']


def load_json(path: str) -> dict[str, Any]:
    with open(path, 'r') as f:
        return json.load(f)

Implement OpenAI Language Model.

**Note:** This requires environment variables to be set.

In [3]:
OPENAI_KEY: str = os.getenv("OPENAI_KEY")  # API key
OPENAI_ENDPOINT: str = os.getenv("OPENAI_ENDPOINT")  # Host URL
OPENAI_DEPLOYMENT: str = os.getenv("OPENAI_DEPLOYMENT")  # Model ID


class OpenAILanguageModel(LanguageModel):
    client = AzureOpenAI(azure_endpoint=OPENAI_ENDPOINT, api_key=OPENAI_KEY, api_version="2024-02-15-preview")

    def generate(self, prompt: Prompt) -> dict[str, str]:
        try:
            t0 = time.time()
            response = OpenAILanguageModel.client.chat.completions.create(
                model=self.model_id,
                messages=[
                    {"role": "system", "content": prompt.system},
                    {"role": "user", "content": prompt.user},
                ],
                response_format={"type": "json_object"}
            )
            t1 = time.time()

            return {'response': response.choices[0].message.content,
                    'in_tokens': response.usage.prompt_tokens,
                    'out_tokens': response.usage.completion_tokens,
                    'time': t1 - t0}

        except Exception as e:
            return {'error': str(e)}

Implement Ollama Language Model.

**Note:** This requires Ollama to be running in the background on the address equal to `OLLAMA_HOST`.
This is done by executing `ollama serve`.

In [4]:
OLLAMA_HOST = 'http://localhost:11434'


class OllamaLanguageModel(LanguageModel):
    client = Client(host=OLLAMA_HOST)

    def generate(self, prompt: Prompt) -> dict[str, str]:
        try:
            t0 = time.time()
            result = OllamaLanguageModel.client \
                .generate(model=self.model_id,
                          system=prompt.system,
                          prompt=prompt.user,
                          format='json',
                          stream=False)
            # consider bug where repeated token limit is reached and output is aborted but not marked as done
            if result['done']:
                result['time'] = result['total_duration'] / 1e9
            else:
                result['time'] = time.time() - t0
            return result

        except Exception as e:
            return {'error': str(e)}

Create model clients.

**Note:** This script assumes that the Ollama models have already been pulled.

In [5]:
models: dict[str, LanguageModel] = {
    'llama3': OllamaLanguageModel('llama3:8b'),  # 8b
    'phi3': OllamaLanguageModel('phi3:14b'),  # 14b
    'phi3-mini': OllamaLanguageModel('phi3:3.8b'),  # 3.8b
    'aya23': OllamaLanguageModel('aya:8b'),  # 8b
    'mistral': OllamaLanguageModel('mistral:7b'),  # 7b
    'codellama': OllamaLanguageModel('codellama:13b'),  # 7b
    'gemma': OllamaLanguageModel('gemma:7b'),  # 7b
    'gemma-mini': OllamaLanguageModel('gemma:2b'),  # 2b
    'gpt4': OpenAILanguageModel(OPENAI_DEPLOYMENT),  # 1760b
}

model_names = {k: m.model_id for k, m in models.items()}
model_names['gpt4'] = 'GPT-4'

Setup evaluation framework

In [17]:
def parse_model_response(response: str, field: str) -> Any | None:
    try:
        return json.loads(response)[field]
    except (TypeError, KeyError) as _:
        return None


def execute_all_on_model(model: LanguageModel, prompts: list[Prompt], delay: int = 0) -> list[dict[str, str]]:
    result = []
    first_run = True
    for prompt in prompts:
        if not first_run:
            time.sleep(delay)
        output = model.generate(prompt)  # execute prompt
        # print(output['response'])
        result.append(output)
        first_run = False
    return result


def generate_prompts(prompt_generator: PromptGenerator, input_values: list[str]) -> list[Prompt]:
    return [prompt_generator.generate(value) for value in input_values]


def evaluate_model_outputs(predicted: list[bool], actual: list[bool]) -> dict[str, int]:
    tp, tn, fp, fn = 0, 0, 0, 0
    for _p, _a in zip(predicted, actual):
        if _p and _a:
            tp += 1
        elif _p and not _a:
            fp += 1
        elif not _p and _a:
            fn += 1
        elif not _p and not _a:
            tn += 1
    return {'tp': tp, 'tn': tn, 'fp': fp, 'fn': fn}


def evaluate_model_outputs_eq(predicted: list[Any], actual: list[Any]) -> dict[str, int]:
    t, f, = 0, 0
    for a, b, in zip(predicted, actual):
        print(a, b)
        if isinstance(a, str):
            if isinstance(b, set) or isinstance(b, list):
                if a.lower() in b:
                    t += 1
                else:
                    f += 1
            elif a.lower() == b.lower():
                t += 1
            else:
                f += 1
        elif a == b:
            t += 1
        else:
            f += 1
    return {'t': t, 'f': f}


def get_evaluation_statistics(tp: int, tn: int, fp: int, fn: int) -> dict[str, float]:
    accuracy = 0.0 if sum((tp, tn, fp, fn)) == 0 else (tp + tn) / (tp + tn + fp + fn)
    precision = 0.0 if tp == 0 else tp / (tp + fp)
    recall = 0.0 if tp == 0 else tp / (tp + fn)
    f1 = 0.0 if tp == 0 else 2 * precision * recall / (precision + recall)
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }


def get_evaluation_statistics_eq(t: int, f: int) -> dict[str, float]:
    accuracy = 0.0 if t == 0 else t / (t + f)
    return {
        'accuracy': accuracy
    }


def get_model_responses(outputs: list[dict[str, str]]) -> list[str]:
    return [output['response'] for output in outputs if 'response' in output]


def _evaluate(model: LanguageModel,
              prompts: list[Prompt],
              classifications: list[Any],
              field: str,
              evaluation_provider: Callable[[list[Any], list[Any]], dict[str, int]],
              statistics_provider: Callable[..., dict[str, float]],
              delay: int = 0) -> dict[str, float]:
    outputs: list[dict[str, str]] = execute_all_on_model(model, prompts, delay)
    median_time = median(o['time'] for o in outputs if 'time' in o)
    responses = get_model_responses(outputs)
    parsed_raw = [parse_model_response(response, field) for response in responses]
    errors = len([p for p in parsed_raw if p is None])
    parsed = [p for p in parsed_raw if p is not None]
    evaluation = evaluation_provider(parsed, classifications)
    statistics = statistics_provider(**evaluation)
    statistics['time'] = median_time
    statistics['errors'] = errors
    return statistics | evaluation  # join dicts


def evaluate(model: LanguageModel,
             prompts: list[Prompt],
             classifications: list[bool],
             field: str,
             delay: int = 0) -> dict[str, float]:
    return _evaluate(model, prompts, classifications, field,
                     evaluate_model_outputs, get_evaluation_statistics, delay)


def evaluate_eq(model: LanguageModel,
                prompts: list[Prompt],
                classifications: list[str],
                field: str,
                delay: int = 0) -> dict[str, float]:
    return _evaluate(model, prompts, classifications, field,
                     evaluate_model_outputs_eq, get_evaluation_statistics_eq, delay)


def evaluate_all(language_models: dict[str, LanguageModel],
                 prompt_generators: dict[str, PromptGenerator],
                 dataset: tuple[list[str], list[Any]],
                 evaluator: Callable[[LanguageModel, list[Prompt], list[Any], str, int], dict[str, float]] = evaluate,
                 delay: int = 0) -> dict[str, dict[str, dict[str, float]]]:
    prompts_dict: dict[str, list[Prompt]] = {
        key: generate_prompts(generator, dataset[0])
        for key, generator in prompt_generators.items()
    }

    nested: dict[str, dict[str, dict[str, float]]] = dict()

    for model_id, model in language_models.items():
        print(model_id)
        model_result = dict()
        nested[model_id] = model_result
        first_run = True
        for prompt_id, prompts in prompts_dict.items():
            print('\t' + prompt_id)
            if not first_run:  # wait between prompt runs
                time.sleep(delay)
            evaluation = evaluator(model, prompts, dataset[1], prompt_generators[prompt_id].get_field(), delay)
            print('\t\t' + str(evaluation))
            model_result[prompt_id] = evaluation
            first_run = False

    return nested


def transform_evaluation(nested: dict[str, dict[str, dict[str, float]]]) -> pd.DataFrame:
    df = pd.DataFrame.from_dict(nested, orient='index').stack().to_frame()
    return pd.DataFrame(df[0].values.tolist(), index=df.index)


## Detect Email Announcements

In [7]:
class DetectEmailPrompt(JsonPromptGenerator):

    def generate(self, email: str) -> Prompt:
        return Prompt(self.data['system'], self.data['user'] + '\n' + email)


detect_announcement_prompts: dict[str, PromptGenerator] = {
    key: DetectEmailPrompt(value)
    for key, value in load_json('data/detect_announcement_prompts.json').items()
}

Get announcement email dataset.

In [None]:
announcement_emails = pd.read_excel('data/announcement_emails.xlsx')
announcement_emails['tactic'] = announcement_emails['tactic'].apply(lambda t: re.split(', ?', t))
announcement_emails

Get normal emails from Enron dataset.

In [None]:
normal_emails = pd.read_csv('data/enron.csv', nrows=500) \
    .rename(columns={'Message': 'email'})[['email']]
normal_emails_sizes = normal_emails['email'].map(len)  # get email sizes
normal_emails = (normal_emails[(normal_emails_sizes > 100) & (normal_emails_sizes < 500)]
                 .where(lambda x: ~x['email'].str.startswith('-' * 10))  # filter forwards
                 .dropna()
                 .sample(20))  # filter by email size and select 20
normal_emails['is_announcement'] = False
normal_emails

Concatenate the announcement and non-announcement datasets.

In [None]:
emails = pd.concat([announcement_emails, normal_emails]).sample(frac=1).reset_index(drop=True)
emails

Split into dataset for prediction and actual classification.

In [253]:
email_bodies: list[str] = [*emails['email'].values]
is_actual_announcement: list[bool] = [*emails['is_announcement'].values]
detect_announcement_dataset = (email_bodies, is_actual_announcement)
detect_announcement_dataset_size = len(emails)

Run all prompts on all models.

In [134]:
announcement_detection_evaluation_dict = evaluate_all(
    {i: models[i] for i in models if i != 'gpt4'},
    detect_announcement_prompts,
    detect_announcement_dataset
)

In [138]:
announcement_detection_evaluation_dict_gpt4 = evaluate_all(
    {'gpt4': models['gpt4']},
    detect_announcement_prompts,
    detect_announcement_dataset,
    delay=7  # seconds delay between prompts to prevent timeout or token limit
)

Transform output into dataframe.

In [246]:
announcement_detection_evaluation = transform_evaluation(
    announcement_detection_evaluation_dict | announcement_detection_evaluation_dict_gpt4
)
# announcement_detection_evaluation = transform_evaluation(announcement_detection_evaluation_dict)
announcement_detection_evaluation.index = announcement_detection_evaluation.index.rename(['Model', 'Prompt'])
announcement_detection_evaluation = announcement_detection_evaluation.rename(index=model_names)
announcement_detection_evaluation

Export the important metrics as a Latex table.

In [254]:
df = announcement_detection_evaluation[['f1', 'time']].copy()
df['error_rate'] = announcement_detection_evaluation['errors'] / detect_announcement_dataset_size
df.columns.name = 'Metric'
s = df.stack()
s.name = 'Value'
print(s.to_frame().reorder_levels(['Model', 'Metric', 'Prompt']).unstack(level=-1).unstack().to_latex(
    float_format='%.3f'))

In [None]:
announcement_detection_f1 = announcement_detection_evaluation['f1']
ad_f1_axes = announcement_detection_f1.unstack() \
    .sort_values(by=next(iter(detect_announcement_prompts.keys())), ascending=False) \
    .plot(kind='barh', title='Announcement Detection Evaluation', xlabel='F1-score')
ad_f1_axes

In [165]:
ad_f1_axes.get_figure().savefig('data/announcement_detection_f1.pdf', format='pdf', bbox_inches='tight')

## Tactic Detection

In [9]:
detect_tactic_prompts: dict[str, PromptGenerator] = {
    key: DetectEmailPrompt(value)
    for key, value in load_json('data/detect_tactic_prompts.json').items()
}

In [10]:
tactic_email_bodies: list[str] = [*announcement_emails['email'].values]
tactics: list[bool] = [*announcement_emails['tactic'].values]
detect_tactic_dataset = (tactic_email_bodies, tactics)
detect_tactic_dataset_size = len(announcement_emails)

In [19]:
tactic_detection_evaluation_dict = evaluate_all(
    {k: m for k, m in models.items() if k != 'gpt4'},
    detect_tactic_prompts,
    detect_tactic_dataset,
    evaluator=evaluate_eq
)

In [18]:
tactic_detection_evaluation_dict_gpt4 = evaluate_all(
    {'gpt4': models['gpt4']},
    detect_tactic_prompts,
    detect_tactic_dataset,
    evaluator=evaluate_eq,
    delay=5
)

In [20]:
tactic_detection_evaluation = transform_evaluation(
    tactic_detection_evaluation_dict | tactic_detection_evaluation_dict_gpt4
)
tactic_detection_evaluation.index = tactic_detection_evaluation.index.rename(['Model', 'Prompt'])
tactic_detection_evaluation = tactic_detection_evaluation.rename(index=model_names)
tactic_detection_evaluation

In [22]:
df = tactic_detection_evaluation[['accuracy', 'time']].copy()
# df['error_rate'] = tactic_detection_evaluation['errors'] / detect_tactic_dataset_size
df.columns.name = 'Metric'
s = df.stack()
s.name = 'Value'
print(s.to_frame().reorder_levels(['Model', 'Metric', 'Prompt'])
      .unstack(level=-1)
      .unstack()
      .to_latex(float_format='%.3f'))