## Connecting to Google Drive

In [None]:
from google.colab import drive

drive.mount('/content/drive/')

## Installing all the dependencies

In [None]:
%%capture
!pip install llama-index
!pip install openai
!pip install pypdf
!pip install --upgrade llama_index

# to use llama-index embeddings
!pip install llama-index-embeddings-openai

# to use arabert as the embedding model
# !pip install arabert
!pip install llama-index-embeddings-huggingface
# !pip install llama-index-llms-huggingface
!pip install transformers torch

# !pip install llama_index.core.node_parser
# %pip install jiwer gradio typing-extensions

## Data and Persist Folder

In [None]:
data_folder = "your data folder"
PERSIST_DIR = "your vector store index folder"

QA_CSV = "CSV for QA"

## Setting up the OPENAI API for Generator

In [None]:
import os
import openai

# setting up the API key to use OpenAI API
os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here"  # replace with your OpenAI API key


## Getting all the files in the data folder

In [None]:
import os
import glob

def get_files(root_folder):
    files = []
    for foldername, subfolders, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.endswith(('.pdf', '.csv','.txt')):
                # not a generic case
                if '_' not in filename:
                    files.append(os.path.join(foldername, filename))
    return files


In [None]:
def read_file(file_path):
    file = open(file_path,'r')
    text = file.read()
    file.close()
    return text

## Retriever

In [None]:
import os.path
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import StorageContext, load_index_from_storage
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.indices.postprocessor import SimilarityPostprocessor
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response.pprint_utils import *
from llama_index.core import Settings

from llama_index.embeddings.openai import OpenAIEmbedding
from openai import OpenAI

# Load aragpt2 model directly
from transformers import AutoModelForCausalLM

def get_retriever(PERSIST_DIR, top_k = 20):
    index = None

    try:
        print(f"Loading index from storage: {PERSIST_DIR}")
        storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
        index = load_index_from_storage(storage_context=storage_context)
        print("Index loaded successfully.")

    except Exception as e:
        print(f"Caught an exception: {e}")

    if index is None:
        raise ValueError("Failed to load or create the index.")

    retriever = VectorIndexRetriever(index=index, similarity_top_k=top_k)
    print(f"--- Successfully created the Retriever from {PERSIST_DIR} ---")
    return retriever

## Prompt

In [None]:
def generate_prompt(context, question):
    prompt = f'''Answer the question using the following description of the information flow from Intelligent Transport System (ITS).
            Choose your answer carefully analyzing the context entirely.
            -----------------------------------------
            {context}
            -----------------------------------------
            Question: {question}
            -----------------------------------------
            Your answer should be a python list:

    '''
    return prompt


This part is helpful to count the length of your prompt. You may adjust the parameters of the RAG model based on the token size of your prompt. 

In [None]:
import tiktoken
import openai

def count_tokens(text):

    # Initialize the encoder for the specific model
    encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")

    # Encode the prompt to get the token count
    tokenized_prompt = encoder.encode(text)
    token_count = len(tokenized_prompt)
    return token_count


## RAG

In [None]:
client = openai.OpenAI()
def get_response(retriever, question, top_k =10, model = "gpt-4o-mini-2024-07-18"):

    prompt = question
    nodes = retriever.retrieve(question)

    contexts = []
    ind = 0
    responses = []

    sources = []
    refs = []
    i = 0
    l = []
    prompt = f'''You are given the description of some information flow from an Intelligent Transport System (ITS). Based on the description and the source and destination of the flow, you have to find out potential MITRE ATT&CK technique that the attacker might use to intervene the flow. Answer in the required format.
                -------------------------------------------------------------
                Description and Context:\n
                \n
                -------------------------------------------------------------
                Question:
                '''
    for node in nodes:
        if ind == len(contexts):
            contexts.append("")

        text = '----------\n'
        text += 'Description:\n'
        text += node.node.text
        text += 'Source: '
        text += ' (' + node.node.metadata['file_path'].split('/')[-1][:-4] + ')' + "\n\n"
        text += '----------\n'

        if count_tokens(contexts[ind] + text + question) + count_tokens(prompt) + 100 + 2000>= 16385:
            ind+=1
            continue

        contexts[ind] += text

        i += 1
        if i==top_k:
            break

    ret = ""
    for j,context in enumerate(contexts):
        prompt = generate_prompt(context, question)

        gpt_response = client.chat.completions.create(
        model=model,
        messages= [
            {
                "role":"system",
                "content":'''You are a helpful assisstant. Your name is Transportation Security AI. Your role is to help with transportation security.
                There are some instructions in each prompt. Follow those instructions strictly.'''
            },
            {
                "role":"user",
                "content":prompt
            }
        ],
        temperature=0.0,
        max_tokens=10000,
        top_p=0,
        )
        ret += str(gpt_response.choices[0].message.content)

    return ret



## Question Answering

### OPENAI Embedding Retriever + OPENAI Generator

In [None]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI

Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
embed_model = OpenAIEmbedding(model="text-embedding-3-large")

In [None]:
retriever = get_retriever(PERSIST_DIR)

In [None]:
import re
from sklearn.metrics import precision_score, recall_score, f1_score


def parse_mitre_techniques(response):
    """
    Extracts MITRE Technique IDs from a response string and returns them as a Python list.

    Args:
        response (str): The response string containing MITRE Technique IDs.

    Returns:
        list: A list of extracted MITRE Technique IDs.
    """
    # Define a regular expression to match MITRE Technique IDs (e.g., "T1234")
    mitre_pattern = r'T\d{4}'

    # Find all matches in the response string
    mitre_techniques = re.findall(mitre_pattern, response)

    return mitre_techniques


def calculate_metrics(true_labels, predicted_labels):
    """
    Calculates TP, FP, FN, Precision, Recall, and F1-score for a single prediction.

    Args:
        true_labels (list): Ground truth technique IDs.
        predicted_labels (list): Predicted technique IDs.

    Returns:
        dict: A dictionary with TP, FP, FN, precision, recall, and F1-score.
    """

    # print(true_labels)

    true_set = set(true_labels)
    predicted_set = set(predicted_labels)

    # Calculate TP, FP, and FN
    tp = len(true_set & predicted_set)
    fp = len(predicted_set - true_set)
    fn = len(true_set - predicted_set)

    # Calculate precision, recall, and F1-score
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "tp": tp,
        "fp": fp,
        "fn": fn,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }


In [None]:
import pandas as pd

# Load the dataset
dataset_path = 'data folder location'  # Update this if the file is in a different location
df = pd.read_csv(dataset_path)

# Initialize variables
ground_truth = df['str_label']

prompts = df['TEXT-flow_fn_process_threat'].tolist() # rename with your column name
responses = []

# client = openai.OpenAI()
# Function to query OpenAI GPT model

for prompt in prompts:

    final_prompt =  f'''I am trying to do a multilabel classification of information flow description from Intelligent Transportation System (ITS) to MITRE ATT&CK Techniques.
                Here we have information flow name, its source and destination, some functional object description associated with it and the description of the information flow itself. It also has a threat report generated in the STRIDE framework.
                An attacker may attempt to compromise the integrity, confidentiality, or availability of the information flow in many ways.
                Find all the relevant MITRE ATT&CK techniques that the attacker might use to attack the information flow.

                Follow the instructions below carefully.

                1. We have a predefined list of MITRE ATT&CK Techniques that consists of 63 MITRE Techniques. You have to choose only the relevant MITRE ATT&CK Techniques from this list that is relevant to the information flow given.

                2. Understand the entire context, then generate a sublist of MITRE ATT&CK Technique from the given list.

                3. Do not add any other description in your answer.

                4. Only return the Technique IDs in python list format

                Given MITRE Technique List = ['T1495','T1485','T1595','T1134','T1040','T1132','T1098','T1069','T1036','T1562','T1187','T1486','T1119','T1027','T1498','T1654','T1548','T1082','T1552','T1614','T1531','T1204','T1529','T1046','T1489','T1195','T1566','T1659','T1059','T1213','T1133','T1080','T1005','T1078','T1001','T1190','T1203','T1136','T1491','T1033','T1189','T1068','T1652','T1049','T1020','T1041','T1021','T1105','T1518','T1200','T1053','T1557','T1056','T1087','T1565','T1499','T1657','T1559','T1074','T1106','T1560', 'T1556', 'T1589']

                Here is the information flow description:
                {prompt}

                Which are the relevant MITRE ATT&CK Techniques from the given list that the attacker might use to attack the information flow? Return the Technique IDs in python list format.
                '''

    response = get_response(retriever, final_prompt)
    list_response = parse_mitre_techniques(response)
    # print(list_response)
    responses.append(list_response)

# Compare responses to ground truth
true_positives, false_positives, false_negatives = 0, 0, 0

# Initialize results
predictions_with_metrics = []

for truth, pred in zip(ground_truth, responses):
    p_truth = re.findall(r"T\d+", truth)
    # print(p_truth)
    metrics = calculate_metrics(p_truth, pred)

    predictions_with_metrics.append({
        "true_label": p_truth,
        "predicted_label": pred,
        **metrics
    })


# Compute overall metrics
overall_tp = sum(row["tp"] for row in predictions_with_metrics)
overall_fp = sum(row["fp"] for row in predictions_with_metrics)
overall_fn = sum(row["fn"] for row in predictions_with_metrics)

overall_precision = overall_tp / (overall_tp + overall_fp) if (overall_tp + overall_fp) > 0 else 0
overall_recall = overall_tp / (overall_tp + overall_fn) if (overall_tp + overall_fn) > 0 else 0
overall_f1 = 2 * overall_precision * overall_recall / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0



In [None]:
import json

# Save predictions with metrics
predictions_with_metrics_path = data_folder + "filename_to_save_the_predictions.json"
with open(predictions_with_metrics_path, "w") as f:
    json.dump(predictions_with_metrics, f, indent=4)

# Save overall metrics
overall_metrics_path = data_folder + "filename_to_save_the_overall_prediction.json"
with open(overall_metrics_path, "w") as f:
    json.dump({
        "overall_precision": overall_precision,
        "overall_recall": overall_recall,
        "overall_f1": overall_f1
    }, f, indent=4)

print(f"Predictions with metrics saved to {predictions_with_metrics_path}")
print(f"Overall metrics saved to {overall_metrics_path}")