In [None]:
from langchain_core.globals import set_debug, set_verbose

set_verbose(False)
set_debug(False)

TEST_LOG_PATH = "test_data/with_template.csv"

EMBEDDINGS_MODEL = "nomic-embed-text"

PARSER_MODEL = "qwen2.5-coder:7b"

SELF_REFLECTION_STEPS = 3

In [None]:
from langchain.document_loaders import CSVLoader
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Load the logs from the CSV file
loader = CSVLoader(
    file_path=TEST_LOG_PATH,
    metadata_columns=["line_number", "tactic", "techniques", "template"],
)
data = loader.load()

# Split the logs into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
all_splits = text_splitter.split_documents(data)

# Load the embeddings model
local_embeddings = OllamaEmbeddings(model=EMBEDDINGS_MODEL)

# Create the vector store
vectorstore = Chroma.from_documents(
    documents=all_splits,
    embedding=local_embeddings,
    collection_metadata={"hnsw:space": "cosine"},
)

# Create the parser model
parser_model = ChatOllama(
    model=PARSER_MODEL,
    temperature=0.5,
)

In [3]:
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough


def format_logs_for_prompt(log: str, similar_logs: list[Document]) -> str:
    """
    Formats the given log and a list of similar logs into a string suitable for use in a prompt.

    This function takes an input log and a list of similar logs, extracts the relevant content from each log,
    and formats them into a single string that can be used as input for a prompt. The logs are enclosed in
    double quotes and separated by commas, and the entire list is enclosed in square brackets.

    Args:
        log (str): The input log to be formatted.
        similar_logs (list[Document]): A list of Document objects representing similar logs.

    Returns:
        str: A formatted string containing the input log and similar logs, suitable for use in a prompt.

    """
    # Cut the "text: " prefix from the page content of each log
    all_logs = [log, *[similar_log.page_content[len("text: ") :] for similar_log in similar_logs]]
    all_logs = [f'"{log}"' for log in all_logs]

    return "[" + ", ".join(all_logs) + "]"


prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You will be provided with a list of logs. You must identify and abstract all the dynamic variables in logs with '<*>' and output ONE static log template that matches all the logs. Datetimes and ip addresses should each be abstracted as a standalone '<*>'.  Print the input logs' template delimited by backticks.",
        ),
        (
            "human",
            'Log list: ["2022-01-21 00:09:11 try to connect to host: 172.16.254.1:5000, finished.", "2022-01-21 00:09:11 try to connect to host: 173.16.254.2:6060, finished."]',
        ),
        ("ai", "<*> try to connect to host: <*>, finished."),
        ("human", "Log list: {logs}"),
    ],
)

chain = (
    RunnablePassthrough.assign(logs=lambda inputs: format_logs_for_prompt(inputs["input_log"], inputs["similar_logs"]))
    | prompt
    | parser_model
    | StrOutputParser()
)

In [4]:
import re
import uuid


def get_template(log: str) -> str:
    """
    Given a log, this function identifies and returns a template that matches the log.
    It first searches for very similar logs in the vector store and checks if their templates match the current log.
    If no matching template is found, it searches for sufficiently similar logs and uses them to generate a template.

    Args:
        log (str): The log for which the template needs to be identified.

    Returns:
        str: The identified template for the given log.

    """
    similarity_question = f'Which logs are most similar to "{log}"?'

    # Check if there are very similar logs
    # Assumption: the returned documents are sorted by most relevant first
    very_similar_logs = vectorstore.similarity_search_with_relevance_scores(
        similarity_question,
        score_threshold=0.7,
        k=10,
        filter={"template": {"$ne": ""}},
    )

    # If there are very similar logs,
    # check if their template matches with the current log
    if len(very_similar_logs) > 0:
        for similar_log in very_similar_logs:
            if re.match(similar_log[0].metadata["template"], log):
                return similar_log[0].metadata["template"]

    # If there are no very similar logs or their template doesn't match,
    # find sufficiently similar logs
    similar_logs = vectorstore.similarity_search_with_relevance_scores(similarity_question, k=5, score_threshold=0.5)
    similar_logs = [log[0] for log in similar_logs]

    # Perform self-reflection to verify that the template
    # matches both the current and similar logs
    self_reflection_countdown = SELF_REFLECTION_STEPS

    while self_reflection_countdown > 0:
        self_reflection_countdown -= 1

        # Find the template using the current log and the similar logs
        template = chain.invoke({"input_log": log, "similar_logs": similar_logs})

        # Replace all of the <*> in the template with (.*?)
        template = template.replace("<*>", "(.*?)")

        # Check that the current log matches the template
        if not re.match(template, log):
            continue

        # Check that all the similar logs match the template
        for similar_log in similar_logs:
            if not re.match(template, similar_log.page_content):
                continue

        # If the template matches all the logs, stop the self-reflection loop
        break

    # Update the template metadata value for the similar logs
    for similar_log in similar_logs:
        similar_log.metadata["template"] = template
        vectorstore.update_document(document_id=similar_log.id, document=similar_log)

    # Save the new logs to the vector store
    vectorstore.add_documents([Document(id=uuid.uuid4(), page_content=log, metadata={"template": template})])

    return template

In [6]:
get_template("2022-01-21 01:04:19 jhall/192.168.230.165:46011 peer info: IV_TCPNL=1")

'(.*?) (.*?)/(.*?) peer info: (.*?)=(.*?)'