# Dataset Import Experiment

Author: Luca Cotti (<luca.cotti@unibs.it>)

This notebook contains an experiment for importing the "russellmitchell" dataset from the AIT-LDS repository.


## Setup

In [None]:
import json
import logging
import os
import re
import shutil
from pathlib import Path

import pandas as pd

# Delete imported data if it already exists
DELETE_OUT_IF_EXISTS = True

# Path of the dataset to import
DATASET_PATH = "../ait_datasets/russellmitchell"

# These log dir contain just the logs on user/attacker behavior generation
LOG_DIRS_TO_IGNORE = ["attacker", "ext_user", "internal_employee", "remote_employee"]

# Maximum number of lines to read from each log file
TRUNC_LINES = 10000

# Where to store the processed csv files
OUT_DIR = "./out"


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
from typing import namedtuple

ttp = namedtuple("TTP", ["tactic", "techniques"])

# Map the log files labels to TTPs, using the description provided in the paper
LABELS_TO_TTPS: dict[str, ttp] = {
    "['attacker_vpn', 'foothold']": ttp("Initial Access", ["Valid Accounts"]),
    "['service_scan', 'foothold']": ttp(
        "Reconnaissance",
        ["Active Scanning", "Gather Victim Network Information"],
    ),
    "['attacker_http', 'foothold', 'service_scan']": ttp(
        "Reconnaissance",
        ["Active Scanning", "Gather Victim Network Information"],
    ),
    "['traceroute', 'foothold']": ttp(
        "Reconnaissance",
        ["Active Scanning", "Gather Victim Network Information"],
    ),
    "['attacker_http', 'foothold', 'dirb']": ttp(
        "Reconnaissance",
        ["Active Scanning", "Gather Victim Host Information"],
    ),
    "['attacker_http', 'foothold', 'wpscan']": ttp(
        "Reconnaissance",
        ["Active Scanning", "Gather Victim Host Information"],
    ),
    "['attacker_http', 'foothold', 'webshell_upload']": ttp(
        "Execution",
        [
            "Exploitation for Client Execution Persistence",
            "Server Software Component Discovery",
        ],
    ),
    "['attacker_http', 'foothold', 'webshell_cmd']": ttp(
        "Persistence",
        ["Server Software Component Discovery"],
    ),
    "['webshell_cmd', 'escalate']": ttp("Credential Access", ["OS Credential Dumping"]),
    "['escalate', 'crack_passwords']": ttp(
        "Credential Access",
        ["Brute Force: Password Cracking"],
    ),
    "['attacker_change_user', 'escalate']": ttp("Privilege Escalation", ["Valid Accounts"]),
    "['attacker_change_user', 'escalate', 'escalated_command', 'escalated_sudo_command']": ttp(
        "Privilege Escalation",
        ["Valid Accounts"],
    ),
    "['escalated_command', 'escalated_sudo_command', 'escalate']": ttp(
        "Execution",
        ["Command and Scripting Interpreter"],
    ),
    "['escalated_command', 'escalated_sudo_command', 'escalate', 'escalated_sudo_session']": ttp(
        "Execution",
        ["Command and Scripting Interpreter"],
    ),
    "['escalated_command', 'escalated_sudo_command', 'escalated_sudo_session', 'escalate']": ttp(
        "Execution",
        ["Command and Scripting Interpreter"],
    ),
    "['dnsteal', 'exfiltration-service', 'attacker']": ttp(
        "Exfiltration",
        ["Exfiltration Over Alternative Protocol"],
    ),
    "['dnsteal', 'attacker', 'dnsteal-received']": ttp(
        "Exfiltration",
        ["Exfiltration Over Alternative Protocol"],
    ),
    "['dnsteal', 'attacker', 'dnsteal-dropped']": ttp(
        "Exfiltration",
        ["Exfiltration Over Alternative Protocol"],
    ),
}

## Load dataset

In [None]:
# List all directories in the logs subdir of the DATASET_PATH
logs_subdir_path = Path(DATASET_PATH) / "gather"
logs_dirs = [d for d in os.listdir(logs_subdir_path) if (logs_subdir_path / d).is_dir()]

labels_subdir_path = Path(DATASET_PATH) / "labels"

# Filter out directories containing "attacker", "ext_user", or "Internal_employee"
devices = [d for d in logs_dirs if not any(re.search(x, d) for x in LOG_DIRS_TO_IGNORE)]


def read_log_and_labels(device: str, app: str, file_name: str) -> pd.DataFrame:
    """Read a log file and returns a dataframe with the lines and line numbers."""
    file_path = logs_subdir_path / device / "logs" / app / file_name

    lines = []
    with file_path.open() as file:
        for i, line in enumerate(file):
            if i >= TRUNC_LINES:
                break

            lines.append({"line_number": i, "text": line.strip()})

    log_df = pd.DataFrame(lines)
    log_df["tactic"] = ""
    log_df["techniques"] = ""

    labels_path = labels_subdir_path / device / "logs" / app / file_name

    if labels_path.exists():
        with labels_path.open() as file:
            for line in file:
                label_data = json.loads(line.strip())

                # If the line number is greater than the number of lines in the log file,
                # stop reading the labels file. This may happen if the log file was truncated.
                if label_data["line"] >= len(log_df):
                    break

                tactic, techniques = LABELS_TO_TTPS[str(label_data["labels"])]

                log_df.loc[log_df["line_number"] == label_data["line"], ["tactic", "techniques"]] = (
                    tactic,
                    "[" + ", ".join(techniques) + "]",
                )

    return log_df


def import_data() -> None:
    logger.info("\nProcessing logs...")

    for device in devices:
        logger.info("  - %s", device)

        # Go to the logs subdir of the current device
        logs_dir = logs_subdir_path / device / "logs"

        # Walk through the logs directory and read all log files
        for root, _, files in os.walk(logs_dir):
            # The log app corresponds to the relative path from the logs directory.
            # If the log is a system log, the log app will be empty.
            app = str(Path(root).relative_to(logs_dir)) if root != str(logs_dir) else ""

            for file_name in files:
                if "log" in file_name:
                    logger.info("    > %s...", file_name, end="")

                    # Binary files should be skipped.
                    # If the file is binary, an exception will be raised.
                    try:
                        log_df = read_log_and_labels(device, app, file_name)
                    except UnicodeDecodeError:
                        continue

                    # Create a directory for the device in the output directory
                    (Path(OUT_DIR) / device / app).mkdir(parents=True, exist_ok=True)
                    log_df.to_csv(
                        Path(OUT_DIR) / device / app / f"{file_name}.csv",
                        index=False,
                    )

                    # Delete the dataframe to free up memory
                    del log_df

                    logger.info("✔")

    logger.info("Done.")

In [None]:
if Path(OUT_DIR).exists() and DELETE_OUT_IF_EXISTS:
    logger.info("Deleting existing output directory...")
    shutil.rmtree(OUT_DIR)

if not Path(OUT_DIR).exists():
    import_data()
else:
    logger.info("Output directory already exists. Skipping data import.")

Deleting existing output directory...

Processing logs...
  - morris_mail
    > mail.log.1...✔
    > auth.log.1...✔
    > syslog.1...✔
    > syslog.3...✔
    > mail.log...✔
    > user.log...✔
    > syslog...✔
    > auth.log...✔
    > syslog.4...✔
    > syslog.2...✔
    > user.log.1...✔
    > mainlog.2...✔
    > mainlog.4...✔
    > mainlog.1...✔
    > mainlog...✔
    > mainlog.3...✔
    > horde-access.log...✔
    > horde-error.log...✔
  - inet-dns
    > auth.log.1...✔
    > syslog.1...✔
    > syslog.3...✔
    > dnsmasq.log...✔
    > syslog...✔
    > auth.log...✔
    > syslog.4...✔
    > syslog.2...✔
  - vpn
    > auth.log.1...✔
    > syslog.1...✔
    > syslog.3...✔
    > openvpn.log...✔
    > auth.log...✔
    > syslog.4...✔
    > syslog.2...✔
    > audit.log...✔
    > stats.log...✔
    > suricata-start.log...✔
    > log.pcap.1642684648...    > suricata.log...✔
    > log.pcap.1642964618...    > fast.log...✔
  - monitoring
    > logstash-json.log...✔
    > logstash-plain-2022-01-22-1.log.