# Check Data

This is a set of quick functions to generate simple dummy datasets. These will be used to adjust and "check" the AI anomaly detection's reliability. Once generated, they will be tested in numerous validation runs.

## Pre-reqs

In [1]:
!export LANGCHAIN_PROJECT="pr-ajar-outrun-25"
!source .env

In [2]:
# Preflight, load
from bigstick import LoadedModel as lm
import src.config as c
from string import ascii_uppercase as ABC
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
import random
import sys

PROJECT = "pr-ajar-outrun-25"

In [3]:
# Define outputs
@dataclass
class OutputReport:
    """Reporting
    Provide a JSON file and the subsequent information
    and this will build a report to output.
    """

    job_name: str
    write_dest: str = c.RESULTS_PATH
    script_start_time: datetime = datetime.now()
    heading: str = None

    def __post_init__(self) -> None:
        self.heading = {
            "job_name": self.job_name,
            "script_start_time": self.script_start_time.strftime("%Y-%m-%d %H:%M:%S"),
        }

        Path(self.write_dest).mkdir(parents=True, exist_ok=True)

    def write(
        self,
        data: dict,
        script_finish_time: datetime = datetime.now(),
        custom_headings: dict = None,
    ) -> Path:
        """Write data report

        Returns:
            Path: a Path object of the written report
        """

        self.heading["script_finish_time"] = script_finish_time.strftime(
            "%Y-%m-%d %H:%M:%S"
        )
        self.heading = {**self.heading, **custom_headings}
        report = {**self.heading, **{"results": data}}
        report_file = f"{self.write_dest}/report_{self.job_name}_{self.script_start_time.strftime('%Y%m%d-%H%M%S')}.json"

        with open(
            report_file,
            "w+",
        ) as f:
            json.dump(report, f)

        return Path(report_file)

## Small Dimension Array

In [3]:
job_name = "small-dim"

In [18]:
sample = {}
for letter in ABC:
    sample[letter] = 0
    
sample['F'] = 1
print(json.dumps(sample))

_Let's check this a few times._

In [26]:
results = {}
query_size = []
output = OutputReport(job_name=job_name, script_start_time=datetime.now())
c.TRIALS = 3
for i in range(0, c.TRIALS):
    query = f"""
        Find any anomalies in this data: {json.dumps(sample)}.
        Respond only with JSON containing the following keys and values:
            - "rank": <the rank you assigned to the anomaly>,
            - "line": <the line number of the data>,
            - "data": <the relevant data>,
            - "explanation: <the explanation for your choice>
            
        """

    query_size.append(sys.getsizeof(query))

    resp = lm(json_mode=True, base_url=f"http://{c.GPU_NODE}:11434").Simple(query=query)
    result = json.loads(resp.model_dump_json())["text"].strip("\n")
    results[i] = json.loads(result)

report_loc = output.write(
    data=results,
    script_finish_time=datetime.now(),
    custom_headings={"avg_query_size": round(sum(query_size) / len(query_size))},
)


### Parse the results

In [16]:
trials = c.TRIALS
# trials = 1000
# report_loc = "results/report_small-1d-array_20241012-115808.json"
report_data = json.load(open(report_loc, "r"))["results"]

expected_result_data = {"F": 1}
raw_char_match = ["F", ":", "1"]

exact_matches = 0
inexact_matches = 0
inexact_matches_correct_line = 0
non_matches = 0
non_match_records = {}

for i in report_data:
    if all(
        key in list(report_data[i].keys()) for key in ["data", "explanation", "line"]
    ):
        if (
            report_data[i]["data"] == expected_result_data
            and report_data[i]["line"] == 6
        ):
            exact_matches += 1
        elif (
            report_data[i]["data"] == expected_result_data
            or "F" in report_data[i]["explanation"]
            or all(char in str(report_data[i]["data"]) for char in raw_char_match)
        ):
            if report_data[i]["line"] == 6:
                inexact_matches_correct_line += 1

            else:
                inexact_matches += 1

        else:
            non_matches += 1
            non_match_records[i] = report_data[i]
    elif "anomalies" in list(report_data[i].keys()):
        for idx, anom in enumerate(report_data[i]["anomalies"]):
            if all(key in list(anom.keys()) for key in ["data", "explanation", "line"]):
                if anom["data"] == expected_result_data and anom["line"] == 6:
                    exact_matches += 1

                    # We break because one exact match is found
                    # The others are wrong since we only have on PoI
                    break

                elif (
                    anom["data"] == expected_result_data
                    or "F" in anom["explanation"]
                    or all(char in str(anom["data"]) for char in raw_char_match)
                ):
                    inexact_matches += 1

                    # We break because one inexact match is found
                    # The others are wrong since we only have on PoI
                    break
                else:
                    non_matches += 1
                    non_match_records[i] = {idx: anom}
            else:
                non_matches += 1
                non_match_records[i] = {idx: anom}
    else:
        non_matches += 1
        non_match_records[i] = report_data[i]


print(
    f"""
    {exact_matches=}
    {inexact_matches=}
    {inexact_matches_correct_line=}
    {non_matches=}
    """
)
print(json.dumps(non_match_records, indent=4))

## Small Dimension Varaible Data Length

The small array is performant enough (around 3-5s per query) that I need to see when this performance tapers off.

In [4]:
job_name = "small-dim-long-data"

In [6]:
# Replacement data of interest
replacement = "/../../../etc/shadow"

# This file is built with jobs/log-generate.sh
input_data = open(f"{c.DATA_PATH}/generated-logs/generated.log", "r").readlines()

# We will just capture part of the log for initial analysis, basically this is a word generator
results = enumerate([x.split(" ")[6] for x in input_data])

# Replace a random value
ran_replace = random.choice(range(len(input_data)))
print(f"Find the random replacement here: {ran_replace}")

with open(f"{c.DATA_PATH}/{job_name}.csv", "w+") as f:
    f.write("index,entry\n")
    for k, v in results:
        value = v 
        if k == ran_replace:
            value = replacement
        f.write(f"{k},{value}\n")

In [15]:
results = {}
query_size = []
input_data = open(f"{c.DATA_PATH}/{job_name}.csv", "r").read()
output = OutputReport(job_name=job_name, script_start_time=datetime.now())

trials = c.TRIALS
# trials = 3

for i in range(0, trials):
    query = f"""
        Find any anomalies in this data: {input_data}.
        Respond only with JSON containing the following keys and values:
            - "rank": <the rank you assigned to the anomaly>,
            - "line": <the line number of the data>,
            - "data": <the relevant data>,
            - "explanation: <the explanation for your choice>
            
        """
    resp = lm(json_mode=True, base_url=f"http://{c.GPU_NODE}:11434").Simple(query=query)

    query_size.append(sys.getsizeof(query))

    result = json.loads(resp.model_dump_json())["text"].strip("\n")
    results[i] = json.loads(result)

report_loc = output.write(
    data=results,
    script_finish_time=datetime.now(),
    custom_headings={"avg_query_size": round(sum(query_size) / len(query_size))},
)

### Parse results

In [21]:
report_data = json.load(open(report_loc, "r"))["results"]

exact_matches = 0
inexact_matches = 0
inexact_matches_correct_line = 0
non_matches = 0
non_match_records = {}

for i in report_data:
    if all(
        key in list(report_data[i].keys()) for key in ["data", "explanation", "line"]
    ):
        if (
            report_data[i]["data"] == replacement
            and report_data[i]["line"] == ran_replace
        ):
            exact_matches += 1
        elif (
            report_data[i]["data"] == replacement
            or replacement in report_data[i]["explanation"]
            or replacement in str(report_data[i]["line"])
        ):
            if report_data[i]["line"] == ran_replace:
                inexact_matches_correct_line += 1
            
            else:
                inexact_matches += 1


        else:
            non_matches += 1
            non_match_records[i] = report_data[i]

    elif "anomalies" in list(report_data[i].keys()):
        for idx, anom in enumerate(report_data[i]["anomalies"]):
            if all(key in list(anom.keys()) for key in ["data", "explanation", "line"]):
                if anom["data"] == replacement and anom["line"] == ran_replace:
                    exact_matches += 1
                    break
                elif (
                    anom["data"] == replacement
                    or replacement in anom["explanation"]
                    or replacement in str(anom["line"])
                ):
                    inexact_matches += 1
                    break
                else:
                    non_matches += 1
                    non_match_records[i] = {idx: anom}
            else:
                non_matches += 1
                non_match_records[i] = {idx: anom}
    else:
        non_matches += 1
        non_match_records[i] = report_data[i]

print(
    f"""
    {exact_matches=}
    {inexact_matches=}
    {inexact_matches_correct_line=}
    {non_matches=}
    """
)
if len(non_match_records) > 0:
    print(json.dumps(non_match_records, indent=4))

## Large File with viable logs
Through my testing I've discovered that context seems to be _required_.

In [4]:
job_name = "apache_logs"
start_lines = 100
end_lines = 100
increment = 10
modulo = 60
trials = c.TRIALS

In [5]:
# Replacement data of interest
replacement = '93.164.60.142 - - [17/May/2015:12:05:31 +0000] "GET /../../../etc/shadow HTTP/1.1" 200 32 "-" "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36'


def gen_data(lines: int = 100):
    # This file is from Elasticsearch
    # https://github.com/elastic/examples/blob/master/Common%20Data%20Formats/apache_logs/apache_logs
    input_data = open(f"{c.DATA_PATH}/apache_logs/1.txt", "r").readlines()[: lines - 1]

    # Modulo because % modulo
    # ran_replace = random.choice(range((lines-int(lines*.1))-1,(lines - 1)))
    ran_replace = 96

    # Swap the data:
    input_data[ran_replace] = f"{replacement}\n"

    with open(f"{c.DATA_PATH}/{job_name}.log", "w+") as f:
        f.write("".join(input_data))

    return (lines, ran_replace)

In [6]:
# Save to
report_locs = []

counter = 0
for i in range(start_lines, end_lines + 1, increment):
    counter += 1
    query_size = []
    results = {}

    lines, ran_replace = gen_data(lines=i)
    output = OutputReport(job_name=job_name, script_start_time=datetime.now())
    input_data = open(f"{c.DATA_PATH}/{job_name}.log", "r").read()

    for trial in range(0, trials):
        query = f"""
                Find any anomalies in this data: {input_data}.
                Respond only with JSON containing the following keys and values:
                    - "rank": <the rank you assigned to the anomaly>,
                    - "line": <the line number of the data>/<the total number of lines in the file>,
                    - "data": <the relevant data>,
                    - "explanation: <the explanation for your choice>
                    
                """
        resp = lm(json_mode=True, base_url=f"http://{c.GPU_NODE}:11434").Simple(
            query=query
        )

        query_size.append(sys.getsizeof(query))

        try:
            result = json.loads(resp.model_dump_json())["text"].strip("\n")
        except ValueError as e:
            result = {"error": e, "raw": resp.model_dump_json()}

        results[trial] = json.loads(result)

    report_locs.append(
        output.write(
            data=results,
            script_finish_time=datetime.now(),
            custom_headings={
                "avg_query_size": round(sum(query_size) / len(query_size)),
                "trials": len(results),
                "lines": i,
                "interest": ran_replace,
                "replacement": input_data.split("\n")[ran_replace],
            },
        )
    )
    print(f"Completed {counter}/{round((end_lines-start_lines)/increment)}")

Completed 1/0


### Parse the results

In [7]:
# Backload
# search_s = f"report_{job_name}_20241015"
# report_locs = [f for f in Path("./results").rglob(f"{search_s}*.json")]

final = {}

for report_loc in report_locs:
    report_data = json.load(open(report_loc, "r"))

    inline = report_data["interest"]
    replacement_sub = "/etc/shadow"

    exact_matches = 0
    inexact_matches = 0
    inexact_matches_correct_line = 0
    non_matches = 0
    non_match_records = {}

    for i in report_data["results"]:
        if all(
            key in list(report_data["results"][i].keys())
            for key in ["data", "explanation", "line"]
        ):
            if (
                report_data["results"][i]["data"] == replacement
                and report_data["results"][i]["line"] == inline
            ):
                exact_matches += 1
            elif (
                report_data["results"][i]["data"] == replacement
                or replacement_sub in report_data["results"][i]["explanation"]
                or replacement_sub in str(report_data["results"][i]["data"])
                or replacement_sub in str(report_data["results"][i]["line"])
            ):
                if report_data["results"][i]["line"] == inline:
                    inexact_matches_correct_line += 1

                else:
                    inexact_matches += 1

            else:
                non_matches += 1
                non_match_records[i] = report_data["results"][i]

        elif "anomalies" in list(report_data["results"][i].keys()):
            for idx, anom in enumerate(report_data["results"][i]["anomalies"]):
                if all(
                    key in list(anom.keys()) for key in ["data", "explanation", "line"]
                ):
                    if anom["data"] == replacement and anom["line"] == inline:
                        exact_matches += 1
                        break
                    elif (
                        anom["data"] == replacement
                        or replacement_sub in anom["explanation"]
                        or replacement_sub in str(anom["data"])
                        or replacement_sub in str(anom["line"])
                    ):
                        inexact_matches += 1
                        break
                    else:
                        non_matches += 1
                        non_match_records[i] = {idx: anom}
                else:
                    non_matches += 1
                    non_match_records[i] = {idx: anom}
        else:
            non_matches += 1
            non_match_records[i] = report_data["results"][i]

    dt_pat = "%Y-%m-%d %H:%M:%S"
    start = datetime.strptime(report_data["script_start_time"], dt_pat)
    end = datetime.strptime(report_data["script_finish_time"], dt_pat)

    final[report_data["lines"]] = {
        "exact_matches": exact_matches,
        "inexact_matches": inexact_matches,
        "inexact_matches_correct_line": inexact_matches_correct_line,
        "non_matches": non_matches,
        "avg_query_size": report_data["avg_query_size"],
        "runtime": round(end.timestamp() - start.timestamp()),
        "line_of_interest": inline,
    }
    
    # print(
    #     f"""
    #     {exact_matches=}
    #     {inexact_matches=}
    #     {inexact_matches_correct_line=}
    #     {non_matches=}
    #     """
    # )

    # if len(non_match_records) > 0:
    #     print(json.dumps(non_match_records, indent=4))

with open(
    f"results/final_{job_name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}.json", "w+"
) as f:
    json.dump(final, f)

## Second Check for 1d-array
The principle here is to force the LLM to reevaluate the data. This will take the original process and simply add another check to it.

In [7]:
results = {}

for i in range(0, c.TRIALS):
    resp_1 = lm(json_mode=True, base_url=f"http://{c.GPU_NODE}:11434").Simple(
        query=f"""
        Find any anomalies in this data: {json.dumps(sample)}.
        Respond only with JSON containing the following keys and values:
            - "rank": <the rank you assigned to the anomaly>,
            - "line": <the line number of the data>,
            - "data": <the relevant data>,
            - "explanation: <the explanation for your choice>
            
        """
    )

    result = json.loads(resp_1.model_dump_json())["text"].strip("\n")
    results[i]["resp_1"] = json.loads(result)

    # Second query

    resp_2 = lm(json_mode=True, base_url=f"http://{c.GPU_NODE}:11434").Simple(
        query=f"""
        {json.dumps(result)}
        
        Reivew this data. It should be formatted as a JSON dataset that contains the following keys:
            - "rank": <the rank you assigned to the anomaly>,
            - "line": <the line number of the data>,
            - "data": <the relevant data>,
            - "explanation: <the explanation for your choice>
        
        Ensure that the formatting is correct.
        Ensure that the values for the keys are correct by comparing to the original entry in this data:
        {json.dumps(sample)}
        """
    )
    
    result = json.loads(resp_2.json())["text"].strip("\n")
    results[i]["resp_2"] = json.loads(result)

with open(f"results/abc-1d-array_{c.TRIALS}.json", "w+") as f:
    json.dump(results, f)