# **Evalution of CTI-Bench dataset**

In [1]:
!pip install openai

Collecting openai
  Downloading openai-1.52.0-py3-none-any.whl.metadata (24 kB)
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.27.2-py3-none-any.whl.metadata (7.1 kB)
Collecting jiter<1,>=0.4.0 (from openai)
  Downloading jiter-0.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.2 kB)
Collecting httpcore==1.* (from httpx<1,>=0.23.0->openai)
  Downloading httpcore-1.0.6-py3-none-any.whl.metadata (21 kB)
Collecting h11<0.15,>=0.13 (from httpcore==1.*->httpx<1,>=0.23.0->openai)
  Downloading h11-0.14.0-py3-none-any.whl.metadata (8.2 kB)
Downloading openai-1.52.0-py3-none-any.whl (386 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m386.9/386.9 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx-0.27.2-py3-none-any.whl (76 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.4/76.4 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpcore-1.0.6-py3-none-any.whl (78 kB)
[2K   [90m━

In [2]:
import os
from openai import OpenAI
import openai

In [3]:
from google.colab import userdata
gpt_key = userdata.get('gptkey')
openai.api_key = gpt_key
client = OpenAI(api_key=gpt_key)

In [4]:
chat_completion = client.chat.completions.create(
   messages=[
     {
         "role": "user",
         "content": "Je vais à",
     }
   ],
   model= "gpt-3.5-turbo",
)
print(chat_completion.choices[0].message.content)

 la plage pour profiter du soleil et de l'air marin.


In [None]:
import pandas as pd
def read_prompts(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')
    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        You are given a multiple-choice question (MCQ) from a Cyber Threat Intelligence (CTI) knowledge benchmark dataset. /
        Your task is to choose the best option among the four provided. /
        Return your answer as a single uppercase letter: A, B, C, or D. /
        **Question:** {row['Question']} /
        **Options:** A) {row['Option A']} B) {row['Option B']} C) {row['Option C']} D) {row['Option D']} /
        **Important:** your answer should contain only the single letter corresponding to the best option, with no additional text. /
        """
        prompts.append(prompt)
    return df, prompts

In [None]:
import time
import requests
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import pandas as pd
import os
def execute_prompts(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        retries = 0
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-3.5-turbo",
                   temperature = 0,
                   max_tokens= 10,
                )
                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                results.append(result)
                if result == correct_answers[i]:
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if retries == max_retries:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['gpt3.5 Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        try:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        except UnicodeDecodeError:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='latin1')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gemini's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT3.5: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')



In [None]:
df, prompts = read_prompts("/content/cti-mcq.tsv")
execute_prompts(df, prompts)

Total correct responses by GPT3.5: 1403/2500
Final accuracy: 56.12%


Total correct responses by GPT3.5: 1403/2500
Final accuracy: 56.12%

In [None]:
import pandas as pd
def read_prompts_rcm(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')
    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        You are a cybersecurity expert specializing in cyber threat intelligence. \
        Analyze the following CVE description and map it to the appropriate CWE. \
        Ensure your response contains only the CWE ID. \
        CVEDescription: {row['Description']}
        """
        prompts.append(prompt)
    return df, prompts

In [None]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os

def execute_prompts_rcm(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0
        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-3.5-turbo",
                   temperature = 0,
                   max_tokens= 135,
                )

                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                print(result)
                results.append(result)
                if result.upper() == correct_answers[i].upper():
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")

    df['Gpt3.5 Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file_rcm.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gemini's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT3.5: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')

# Supposons que vous ayez un DataFrame 'df' et une liste de prompts 'prompts'
df, prompts = read_prompts_rcm("/content/cti-rcm.tsv")
execute_prompts_rcm(df, prompts)


CWE-416
Failed to process prompt 1 after 2500 retries.
CWE-287
Failed to process prompt 2 after 2500 retries.
CWE ID: CWE-79
Failed to process prompt 3 after 2500 retries.
CWE-787
Failed to process prompt 4 after 2500 retries.
CWE-79
Failed to process prompt 5 after 2500 retries.
CWE-79
Failed to process prompt 6 after 2500 retries.
CWE ID: CWE-269
Failed to process prompt 7 after 2500 retries.
CWE-918: Server-Side Request Forgery (SSRF)
Failed to process prompt 8 after 2500 retries.
CWE-79
Failed to process prompt 9 after 2500 retries.
CWE-200
Failed to process prompt 10 after 2500 retries.
CWE-79
Failed to process prompt 11 after 2500 retries.
CWE-122
Failed to process prompt 12 after 2500 retries.
CWE ID: CWE-416
Failed to process prompt 13 after 2500 retries.
CWE-78
Failed to process prompt 14 after 2500 retries.
CWE ID: CWE-269
Failed to process prompt 15 after 2500 retries.
CWE-200
Failed to process prompt 16 after 2500 retries.
CWE-434
Failed to process prompt 17 after 2500 retr

Total correct responses by GPT3.5: 680/1000                                    
Final accuracy: 68%

In [None]:
chat_completion = client.chat.completions.create(
   messages=[
     {
         "role": "user",
         "content": "Analyze the following CVE description and calculate the CVSS v3.1 Base Score. \
           Determine the values for each base metric: AV, AC, PR, UI, S, C, I, and A. \
           Summarize each metric's value and provide the final CVSS v3.1 vector string.   \
           Valid options for each metric are as follows: \
           - **Attack Vector (AV)**: Network (N), Adjacent (A), Local (L), Physical (P) \
           - **Attack Complexity (AC)**: Low (L), High (H) \
           - **Privileges Required (PR)**: None (N), Low (L), High (H) \
           - **User Interaction (UI)**: None (N), Required (R) \
           - **Scope (S)**: Unchanged (U), Changed (C) \
           - **Confidentiality (C)**: None (N), Low (L), High (H) \
           - **Integrity (I)**: None (N), Low (L), High (H) \
           - **Availability (A)**: None (N), Low (L), High (H)  \
           Summarize each metric’s value and provide the final CVSS v3.1 vector string. \
           Ensure the final line of your response contains only the CVSS v3 Vector String in the following format: \
           Example format: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H \
           CVE Description: In the Linux kernel through 6.7.1, there is a use-after-free in cec_queue_msg_fh, related to drivers/media/cec/core/cec-adap.c and drivers/media/cec/core/cec-api.c. ",
     }
   ],
   model= "gpt-3.5-turbo",
)
print(chat_completion.choices[0].message.content)

- Attack Vector (AV): Local (L)
- Attack Complexity (AC): Low (L)
- Privileges Required (PR): Low (L)
- User Interaction (UI): None (N)
- Scope (S): Unchanged (U)
- Confidentiality (C): Low (L)
- Integrity (I): Low (L)
- Availability (A): Low (L)

CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L


In [None]:
!pip install cvss

Collecting cvss
  Downloading cvss-3.1-py2.py3-none-any.whl.metadata (3.5 kB)
Downloading cvss-3.1-py2.py3-none-any.whl (30 kB)
Installing collected packages: cvss
Successfully installed cvss-3.1


In [None]:
import pandas as pd

def read_prompts_vsp(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')

    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        Analyze the following CVE description and calculate the CVSS v3.1 Base Score. \
        Determine the values for each base metric: AV, AC, PR, UI, S, C, I, and A. \
        Summarize each metric’s value and provide the final CVSS v3.1 vector string. \
        Valid options for each metric are as follows: \
        - Attack Vector (AV): Network (N), Adjacent (A), Local (L), Physical (P) \
        - Attack Complexity (AC): Low (L), High (H) \
        - Privileges Required (PR): None (N), Low (L), High (H) \
        - User Interaction (UI): None (N), Required (R) \
        - Scope (S): Unchanged (U), Changed (C) \
        - Confidentiality (C): None (N), Low (L), High (H) \
        - Integrity (I): None (N), Low (L), High (H) \
        - Availability (A): None (N), Low (L), High (H) \

        Ensure your response contains only the CVSS v3 Vector String in the following format: \
        Example format: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H \
        CVE Description: {row['Description']}
        """
        prompts.append(prompt)

    return df, prompts


In [None]:
from cvss import CVSS3
def get_cvss_score(cvss_vector):
    c = CVSS3(cvss_vector)
    cvss_score = c.scores()[0]
    return cvss_score

In [None]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os
from cvss import CVSS3  # Assuming you have a CVSS3 library or function to calculate the score

def get_cvss_score(cvss_vector):
    try:
        c = CVSS3(cvss_vector)
        cvss_score = c.scores()[0]
        return cvss_score
    except Exception as e:
        print(f"Error calculating CVSS score for vector {cvss_vector}: {e}")
        return None

def execute_prompts_vsp(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    error = 0
    total = 0
    max_retries = 5  # Limit the number of retries

    for i, prompt in enumerate(prompts):
        retries = 0
        success = False
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-3.5-turbo",
                    temperature=0,
                    max_tokens=170,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)

                pred = result.upper()
                gt = correct_answers[i].upper()

                try:
                    pred_vector = pred
                    pred_score = get_cvss_score(pred_vector)
                    gt_score = get_cvss_score(gt)
                    if pred_score is not None and gt_score is not None:
                        error += abs(pred_score - gt_score)
                        total += 1
                except Exception as e:
                    print(f"Invalid response at row {i + 1}: {e}")
                    continue

                success = True
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
                retries += 1
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['Gpt3.5 Response'] = results

    output_file = "/content/output_file_vsp.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with the responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Calculate and print the Mean Absolute Deviation (MAD)
    if total > 0:
        mad = error / total
        print(f'Mean Absolute Deviation: {mad}')
    else:
        print('No valid responses to calculate MAD.')

# Example usage
df, prompts = read_prompts_vsp("/content/cti-vsp.tsv")
execute_prompts_vsp(df, prompts)


Response for prompt 1: CVSS:3.1/AV:L/AC:H/PR:N/UI:N/S:U/C:N/I:H/A:N
Response for prompt 2: CVSS:3.1/AV:A/AC:H/PR:L/UI:N/S:U/C:H/I:H/A:H
Response for prompt 3: CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:H/I:H/A:H
Response for prompt 4: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:U/C:H/I:H/A:H
Response for prompt 5: CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H
Response for prompt 6: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:L/A:N
Response for prompt 7: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:U/C:L/I:L/A:L
Response for prompt 8: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
Response for prompt 9: CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:H/I:H/A:H
Response for prompt 10: CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:N
Response for prompt 11: CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:L/I:L/A:N
Response for prompt 12: CVSS:3.1/AV:L/AC:H/PR:N/UI:N/S:U/C:H/I:H/A:H
Response for prompt 13: CVSS:3.1/AV:L/AC:H/PR:H/UI:N/S:U/C:N/I:N/A:L
Response for prompt 14: CVSS:3.1/AV:N/AC:L/PR:H/UI:R/S:U/C:H/I:H/A:H
Response for prompt 15: CVSS:3.1/AV:L/AC:H/

Mean Absolute Deviation: 1.905999999999998

In [None]:
import pandas as pd
import os
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError


def read_prompts_taa(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')

    prompt_column = df.columns[-1]  # Assuming the prompt is in the last column
    return df, df[prompt_column].tolist()

def execute_prompts_taa(df, prompts):
    results = []
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0

        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-3.5-turbo",
                    temperature=0.9,
                    max_tokens=21,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)
                success = True
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['chatgpt3.5 Response'] = results
    return df

def save_results(df, output_file):
    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Llama's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

# Example usage
input_file = "/content/cti-taa.tsv"
output_file = "output_file_taa.tsv"

df, prompts = read_prompts_taa(input_file)
df_with_responses = execute_prompts_taa(df, prompts)
save_results(df_with_responses, output_file)


Response for prompt 1: TA505
Response for prompt 2: UNC2447
Response for prompt 3: TA505
Response for prompt 4: TA505
Response for prompt 5: Threat Actor: TA505
Response for prompt 6: Lunar Spider
Response for prompt 7: TA505
Response for prompt 8: TA551
Response for prompt 9: TA573
Response for prompt 10: TA505
Response for prompt 11: UNC2452
Response for prompt 12: UNC1151
Response for prompt 13: Lazarus Group
Response for prompt 14: TA505
Response for prompt 15: TA505
Response for prompt 16: UNC2449
Response for prompt 17: UNC1878
Response for prompt 18: TA505
Response for prompt 19: Lazarus Group
Response for prompt 20: FIN8
Response for prompt 21: TA505
Response for prompt 22: TA505.
Response for prompt 23: FIN8
Response for prompt 24: TA505
Response for prompt 25: MuddyWater
Response for prompt 26: Wizard Spider
Response for prompt 27: TA573
Response for prompt 28: TA505
Response for prompt 29: UNC1878
Response for prompt 30: Lazarus Group
Response for prompt 31: Threat Actor: TA

In [None]:
import pickle
with open('alias_dict.pickle', 'rb') as handle:
    alias_dict = pickle.load(handle)

In [None]:
with open('related_dict.pickle', 'rb') as handle:
    related_dict = pickle.load(handle)

In [None]:
def threat_actor_connection(actor1, actor2, alias_dict, related_dict):
    """
    Determines the connection type between two threat actors based on alias and related group information.

    Args:
        actor1: The first threat actor.
        actor2: The second threat actor.
        alias_dict: A dictionary where keys are threat actors and values are lists of their aliases.
        related_dict: A dictionary where keys are threat actors and values are lists of related threat actors.

    Returns:
        "C" if the actors are connected via an alias chain.
        "P" if the actors are connected via a related group chain.
        "I" if no connection is found.
    """

    actor1 = actor1.strip().lower()
    actor2 = actor2.strip().lower()

    # Normalize dictionaries and ensure bidirectional alias relationships
    alias_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in alias_dict.items()}
    for actor in list(alias_dict): # Iterate over a copy of the keys
        aliases = alias_dict[actor]
        for alias in aliases:
            if actor not in alias_dict.setdefault(alias, []):  # Avoid duplicates
                alias_dict[alias].append(actor)

    related_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in related_dict.items()}
    for actor in list(related_dict):  # Iterate over a copy of the keys
        related_groups = related_dict[actor]
        for related_actor in related_groups:
            if actor not in related_dict.setdefault(related_actor, []):
                related_dict[related_actor].append(actor)

    if is_alias_connected(actor1, actor2, alias_dict):
        return "C"

    if is_related_connected(actor1, actor2, alias_dict, related_dict):
        return "P"

    return "I"


def is_alias_connected(actor1, actor2, alias_dict):
    """
    Checks if two actors are connected through an alias chain using Breadth First Search (BFS).
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

    return False


def is_related_connected(actor1, actor2, alias_dict, related_dict) :
    """
    Checks if two actors are connected through a chain of aliases and related groups using BFS.
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

        for related_actor in related_dict.get(current_actor, []):
            if related_actor == actor2:
                return True
            if related_actor not in visited:
                queue.append(related_actor)

    return False

In [None]:
def compute_taa_accuracy(fname, col):
    """
    Returns Correct & Plausible Accuracy
    """
    df = pd.read_csv(fname, sep='\t')
    correct = 0
    plausible = 0
    total = 0

    for idx, row in df.iterrows():
        pred = row[col].lower().strip()
        gt = row['GT'].lower().strip()

        res = threat_actor_connection(gt, pred, alias_dict, related_dict)

        if res == 'C':
            correct += 1
        elif res == 'P':
            plausible += 1
        total += 1

    return correct/total*100, (correct+plausible)/total*100

In [None]:
print('Correct & Plausible Accuracy:', compute_taa_accuracy('/content/gpt3.5_output_file_taa.tsv', 'chatgpt3.5 Response'))

Correct & Plausible Accuracy: (2.0, 10.0)


GPT4

In [11]:
!pip install cvss

Collecting cvss
  Downloading cvss-3.2-py2.py3-none-any.whl.metadata (3.5 kB)
Downloading cvss-3.2-py2.py3-none-any.whl (29 kB)
Installing collected packages: cvss
Successfully installed cvss-3.2


In [12]:
from cvss import CVSS3
def get_cvss_score(cvss_vector):
    c = CVSS3(cvss_vector)
    cvss_score = c.scores()[0]
    return cvss_score

In [9]:
import pandas as pd

def read_prompts_vsp(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')

    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        Analyze the following CVE description and calculate the CVSS v3.1 Base Score. \
        Determine the values for each base metric: AV, AC, PR, UI, S, C, I, and A. \
        Summarize each metric’s value and provide the final CVSS v3.1 vector string. \
        Valid options for each metric are as follows: \
        - Attack Vector (AV): Network (N), Adjacent (A), Local (L), Physical (P) \
        - Attack Complexity (AC): Low (L), High (H) \
        - Privileges Required (PR): None (N), Low (L), High (H) \
        - User Interaction (UI): None (N), Required (R) \
        - Scope (S): Unchanged (U), Changed (C) \
        - Confidentiality (C): None (N), Low (L), High (H) \
        - Integrity (I): None (N), Low (L), High (H) \
        - Availability (A): None (N), Low (L), High (H) \

        Your response should contains only the CVSS v3 Vector String in the following format : \
        Example format: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H \
        Dont write anything else . \
        CVE Description: {row['Description']}
        """
        prompts.append(prompt)

    return df, prompts


In [None]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os
from cvss import CVSS3  # Assuming you have a CVSS3 library or function to calculate the score

def get_cvss_score(cvss_vector):
    try:
        c = CVSS3(cvss_vector)
        cvss_score = c.scores()[0]
        return cvss_score
    except Exception as e:
        print(f"Error calculating CVSS score for vector {cvss_vector}: {e}")
        return None

def execute_prompts_vsp(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    error = 0
    total = 0
    max_retries = 5  # Limit the number of retries

    for i, prompt in enumerate(prompts):
        retries = 0
        success = False
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-4o-mini",
                    temperature=0,
                    max_tokens=170,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)

                pred = result.upper()
                gt = correct_answers[i].upper()

                try:
                    pred_vector = pred
                    pred_score = get_cvss_score(pred_vector)
                    gt_score = get_cvss_score(gt)
                    if pred_score is not None and gt_score is not None:
                        error += abs(pred_score - gt_score)
                        total += 1
                except Exception as e:
                    print(f"Invalid response at row {i + 1}: {e}")
                    continue

                success = True
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
                retries += 1
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['Gpt4o Response'] = results

    output_file = "/content/output_file_vsp.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with the responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Calculate and print the Mean Absolute Deviation (MAD)
    if total > 0:
        mad = error / total
        print(f'Mean Absolute Deviation: {mad}')
    else:
        print('No valid responses to calculate MAD.')

# Example usage
df, prompts = read_prompts_vsp("/content/cti-vsp.tsv")
execute_prompts_vsp(df, prompts)


Response for prompt 1: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N
Response for prompt 2: CVSS:3.1/AV:N/AC:H/PR:L/UI:R/S:C/C:H/I:H/A:N
Response for prompt 3: CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:N
Response for prompt 4: CVSS:3.1/AV:L/AC:H/PR:N/UI:N/S:U/C:H/I:H/A:N
Response for prompt 5: CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:H/I:H/A:N
Response for prompt 6: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:N/A:N
Response for prompt 7: CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:N
Response for prompt 8: CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:N
Response for prompt 9: CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:C/C:H/I:H/A:N
Response for prompt 10: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N
Response for prompt 11: CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:L/I:N/A:N
Response for prompt 12: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
Response for prompt 13: CVSS:3.1/AV:L/AC:H/PR:H/UI:N/S:U/C:N/I:N/A:H
Response for prompt 14: CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
Response for prompt 15: CVSS:3.1/AV:L/AC:H/

Mean Absolute Deviation: 1.4096000000000017

In [None]:
import pandas as pd
import os
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError


def read_prompts_taa(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')

    prompt_column = df.columns[-1]  # Assuming the prompt is in the last column
    return df, df[prompt_column].tolist()

def execute_prompts_taa(df, prompts):
    results = []
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0

        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-4o-mini",
                    temperature=0.9,
                    max_tokens=21,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)
                success = True
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['chatgpt4o Response'] = results
    return df

def save_results(df, output_file):
    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Llama's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

# Example usage
input_file = "/content/cti-taa.tsv"
output_file = "output_file_taa.tsv"

df, prompts = read_prompts_taa(input_file)
df_with_responses = execute_prompts_taa(df, prompts)
save_results(df_with_responses, output_file)


Response for prompt 1: Dridex
Response for prompt 2: APT36 (Transparent Tribe)
Response for prompt 3: APT41
Response for prompt 4: APT41
Response for prompt 5: Dridex
Response for prompt 6: APT34
Response for prompt 7: APT41
Response for prompt 8: Qakbot Group (also known as Qbot or Qakbot)
Response for prompt 9: APT38
Response for prompt 10: Lapsus$
Response for prompt 11: UNC4210
Response for prompt 12: APT28 (Fancy Bear)
Response for prompt 13: APT28
Response for prompt 14: Cozy Bear (APT29)
Response for prompt 15: SVR (Russian Foreign Intelligence Service)
Response for prompt 16: FIN7
Response for prompt 17: APT34
Response for prompt 18: APT33
Response for prompt 19: APT28
Response for prompt 20: Andariel
Response for prompt 21: APT37
Response for prompt 22: Lazarus Group
Response for prompt 23: [PLACEHOLDER]
Response for prompt 24: APT35 (Charming Kitten)
Response for prompt 25: Mint Sandstorm (APT35)
Response for prompt 26: Charming Kitten
Response for prompt 27: EDUCATED MANTICO

In [17]:
import pickle
with open('alias_dict.pickle', 'rb') as handle:
    alias_dict = pickle.load(handle)

In [18]:
with open('related_dict.pickle', 'rb') as handle:
    related_dict = pickle.load(handle)

In [None]:
def threat_actor_connection(actor1, actor2, alias_dict, related_dict):
    """
    Determines the connection type between two threat actors based on alias and related group information.

    Args:
        actor1: The first threat actor.
        actor2: The second threat actor.
        alias_dict: A dictionary where keys are threat actors and values are lists of their aliases.
        related_dict: A dictionary where keys are threat actors and values are lists of related threat actors.

    Returns:
        "C" if the actors are connected via an alias chain.
        "P" if the actors are connected via a related group chain.
        "I" if no connection is found.
    """

    actor1 = actor1.strip().lower()
    actor2 = actor2.strip().lower()

    # Normalize dictionaries and ensure bidirectional alias relationships
    alias_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in alias_dict.items()}
    for actor in list(alias_dict): # Iterate over a copy of the keys
        aliases = alias_dict[actor]
        for alias in aliases:
            if actor not in alias_dict.setdefault(alias, []):  # Avoid duplicates
                alias_dict[alias].append(actor)

    related_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in related_dict.items()}
    for actor in list(related_dict):  # Iterate over a copy of the keys
        related_groups = related_dict[actor]
        for related_actor in related_groups:
            if actor not in related_dict.setdefault(related_actor, []):
                related_dict[related_actor].append(actor)

    if is_alias_connected(actor1, actor2, alias_dict):
        return "C"

    if is_related_connected(actor1, actor2, alias_dict, related_dict):
        return "P"

    return "I"


def is_alias_connected(actor1, actor2, alias_dict):
    """
    Checks if two actors are connected through an alias chain using Breadth First Search (BFS).
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

    return False


def is_related_connected(actor1, actor2, alias_dict, related_dict) :
    """
    Checks if two actors are connected through a chain of aliases and related groups using BFS.
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

        for related_actor in related_dict.get(current_actor, []):
            if related_actor == actor2:
                return True
            if related_actor not in visited:
                queue.append(related_actor)

    return False

In [None]:
def compute_taa_accuracy(fname, col):
    """
    Returns Correct & Plausible Accuracy
    """
    df = pd.read_csv(fname, sep='\t')
    correct = 0
    plausible = 0
    total = 0

    for idx, row in df.iterrows():
        pred = row[col].lower().strip()
        gt = row['GT'].lower().strip()

        res = threat_actor_connection(gt, pred, alias_dict, related_dict)

        if res == 'C':
            correct += 1
        elif res == 'P':
            plausible += 1
        total += 1

    return correct/total*100, (correct+plausible)/total*100

In [None]:
print('Correct & Plausible Accuracy:', compute_taa_accuracy('/content/gpt4o_taa.tsv', 'chatgpt4o Response'))

Correct & Plausible Accuracy: (22.641509433962266, 39.62264150943396)


Correct & Plausible Accuracy: (22.641509433962266, 39.62264150943396)

In [7]:
import pandas as pd
def read_prompts_rcm(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')
    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        You are a cybersecurity expert specializing in cyber threat intelligence. \
        Analyze the following CVE description and map it to the appropriate CWE. \
        Ensure your response contains only the CWE ID. \
        CVEDescription: {row['Description']}
        """
        prompts.append(prompt)
    return df, prompts

In [8]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os

def execute_prompts_rcm(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0
        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-4o-mini",
                   temperature = 0,
                   max_tokens= 135,
                )

                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                print(result)
                results.append(result)
                if result.upper() == correct_answers[i].upper():
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")

    df['Gpt4o Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file_rcm.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gemini's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT4o: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')

# Supposons que vous ayez un DataFrame 'df' et une liste de prompts 'prompts'
df, prompts = read_prompts_rcm("/content/cti-rcm.tsv")
execute_prompts_rcm(df, prompts)


KeyError: 'Description'

In [11]:
import pandas as pd
try:
        df = pd.read_csv("/content/CWE_Formatted.tsv", sep='\t', encoding='utf-8')
except UnicodeDecodeError:
        df = pd.read_csv("/content/CWE_Formatted.tsv", sep='\t', encoding='ISO-8859-1')
try:
        df2 = pd.read_csv("/content/cti-rcm.tsv", sep='\t', encoding='utf-8')
except UnicodeDecodeError:
        df2 = pd.read_csv("/content/cti-rcm.tsv", sep='\t', encoding='ISO-8859-1')
results = df['CWE'].tolist()
correct_answers = df2['GT'].tolist()
correct = 0

# Loop over the two columns using a for loop
for i in range(len(correct_answers)):
    if results[i] == correct_answers[i]:
        correct += 1

accuracy= correct/len(correct_answers)

    # Print the total correct answers
print(f'Total correct responses by GPT4o: {correct}/{len(correct_answers)}')
    # Print the final accuracy
print(f'Final accuracy: {accuracy:.2%}')

Total correct responses by GPT4o: 675/1000
Final accuracy: 67.50%


Total correct responses by GPT4o: 675/1000
Final accuracy: 67.50%

In [5]:
import pandas as pd
def read_prompts(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')
    prompts = []
    for index, row in df.iterrows():
        prompt = f"""
        You are given a multiple-choice question (MCQ) from a Cyber Threat Intelligence (CTI) knowledge benchmark dataset. /
        Your task is to choose the best option among the four provided. /
        Return your answer as a single uppercase letter: A, B, C, or D. /
        **Question:** {row['Question']} /
        **Options:** A) {row['Option A']} B) {row['Option B']} C) {row['Option C']} D) {row['Option D']} /
        **Important:** your answer should contain only the single letter corresponding to the best option, with no additional text. /
        """
        prompts.append(prompt)
    return df, prompts

In [13]:
import time
import requests
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import pandas as pd
import os
def execute_prompts(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        retries = 0
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-4o-mini",
                   temperature = 0,
                   max_tokens= 10,
                )
                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                print(result)
                results.append(result)
                if result == correct_answers[i]:
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if retries == max_retries:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['gpt4o Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        try:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        except UnicodeDecodeError:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='latin1')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gemini's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT4o: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')

df, prompts = read_prompts("/content/cti-mcq.tsv")
execute_prompts(df, prompts)

B
C
C
A
C
B
D
B
B
D
D
D
C
B
C
B
C
B
C
D
C
A
B
C
C
C
A
C
B
A
A
A
A
D
C
D
C
B
B
B
C
D
C
D
C
C
D
B
C
D
C
B
C
C
C
A
A
B
C
C
A
C
B
A
B
B
C
A
C
C
C
C
B
D
B
C
B
B
C
B
A
B
A
A
C
C
C
B
C
C
B
B
C
A
C
C
A
A
A
D
A
B
A
B
A
A
C
A
B
D
B
D
C
A
A
C
B
C
A
C
C
C
D
C
C
D
B
D
B
B
A
C
C
A
D
B
C
B
C
B
A
B
A
B
A
A
B
C
C
C
B
B
C
B
C
B
C
D
C
D
C
C
A
C
C
B
C
B
D
C
B
D
B
B
A
A
B
C
B
C
C
B
C
B
A
C
C
B
B
A
B
C
A
A
B
C
C
B
C
B
D
A
C
A
C
A
C
C
A
A
C
A
C
C
D
C
C
C
C
C
B
B
D
B
B
C
C
C
D
C
B
B
A
B
A
A
B
C
C
B
C
B
A
B
B
A
C
D
C
D
B
B
D
A
D
D
A
C
C
A
B
A
B
A
C
C
B
C
A
B
B
A
B
A
A
A
B
C
C
C
A
C
D
B
A
A
C
C
B
C
B
A
B
C
C
C
D
C
C
B
D
D
B
D
C
A
B
C
C
B
A
C
C
A
C
B
C
A
B
C
B
D
C
B
C
D
A
A
C
A
C
B
A
A
B
C
B
C
A
B
B
A
C
C
C
C
B
A
C
D
C
C
B
B
B
C
C
B
A
B
D
C
C
A
B
B
A
C
D
C
A
B
B
B
C
B
D
D
A
C
A
D
C
C
C
A
D
C
A
C
B
C
C
A
D
B
C
A
A
C
B
A
D
A
B
A
A
C
D
D
B
B
C
B
B
A
D
A
C
C
C
C
D
A
B
B
C
D
D
C
B
C
A
A
C
B
C
C
B
B
D
C
A
D
C
B
D
A
A
B
C
B
C
C
B
C
B
C
A
A
A
A
C
B
C
C
B
B
D
C
D
D
C
A
B
D
B
C
C
A
D
B
D
C
A
C
A
A
C
C
A
D
C
C
B
D
C
B
B
C


Total correct responses by GPT4o: 668/1000
Final accuracy: 66.80%

#**Evaluation de ma dataset**


In [6]:
import time
import requests
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import pandas as pd
import os
def execute_prompts(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        retries = 0
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-4o-mini",
                   temperature = 0,
                   max_tokens= 10,
                )
                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                print(result)
                results.append(result)
                if result == correct_answers[i]:
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if retries == max_retries:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['gpt4o Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        try:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        except UnicodeDecodeError:
            existing_df = pd.read_csv(output_file, sep='\t', encoding='latin1')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gemini's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT4o: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')

df, prompts = read_prompts("/content/mydataset_mcq.tsv")
execute_prompts(df, prompts)

C
C
C
C
A
C
C
B
C
B
B
B
B
D
B
B
A
D
B
B
B
B
D
B
B
C
C
A
C
C
C
B
B
B
D
B
B
A
B
C
B
B
A
A
C
A
C
C
D
C
A
C
B
A
C
C
A
B
A
B
B
C
B
C
C
D
C
B
A
B
A
C
A
D
A
D
A
D
A
D
A
A
A
C
A
A
B
C
A
C
A
D
B
A
B
C
D
C
B
D
A
A
C
D
C
D
C
C
D
A
D
D
A
A
C
D
D
B
D
C
C
A
A
A
A
B
A
B
A
C
A
B
D
C
A
A
A
D
A
D
A
A
B
C
A
A
D
B
D
C
C
A
B
C
A
C
A
B
B
D
A
B
C
B
A
D
C
D
D
C
A
D
B
C
C
D
B
D
C
C
A
D
B
A
A
B
C
A
B
C
A
A
B
C
B
D
B
D
C
A
A
C
C
B
C
D
C
D
B
C
B
A
D
D
A
A
A
C
A
C
B
A
A
C
C
A
C
C
C
C
A
C
A
C
B
C
C
D
D
C
C
C
C
C
C
C
C
B
A
A
C
A
A
A
C
D
C
C
B
D
A
B
C
C
A
D
C
B
D
D
C
B
D
B
C
C
C
C
C
B
A
C
D
A
B
A
C
C
A
B
C
C
A
A
A
A
A
B
A
C
B
D
C
A
A
A
A
A
B
D
D
C
D
A
D
B
D
A
A
B
B
B
C
A
C
C
A
D
B
A
B
C
D
A
B
D
B
C
C
A
C
C
C
C
C
C
A
B
B
C
C
C
C
B
C
D
C
A
C
B
D
C
C
A
B
C
A
A
C
A
C
A
A
A
B
A
D
A
C
A
A
C
A
D
A
C
A
A
A
A
A
C
A
A
A
C
A
D
B
B
C
C
B
A
A
A
C
C
D
C
A
C
B
B
C
C
B
A
A
A
C
D
D
C
C
B
C
A
A
A
B
A
A
D
A
B
C
A
A
C
C
B
A
B
B
A
B
A
B
A
C
A
C
C
B
C
D
B
B
A
D
C
A
B
D
C
A
A
D
B
B
A
C
C
A
B
A
A
A
A
C
A
B
C
B
A
A
A
C
D
D
A
B
C
B
A
A
A
D
C


Total correct responses by GPT4o: 816/1500
Final accuracy: 54.40%

In [8]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os

def execute_prompts_rcm(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    total_correct = 0
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0
        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                   messages=[
                          {
                            "role": "user",
                            "content": prompt,
                          }
                          ],
                   model= "gpt-4o-mini",
                   temperature = 0,
                   max_tokens= 135,
                )

                response= chat_completion.choices[0].message
                if not hasattr(response, 'content'):
                    raise ValueError(f"Invalid operation: The `response.text` quick accessor requires the response to contain a valid `Part`, but none were returned. Please check the `candidate.safety_ratings` to determine if the response was blocked. Prompt number: {i+1}")
                result = response.content.strip()
                print(result)
                results.append(result)
                if result.upper() == correct_answers[i].upper():
                    total_correct += 1
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")

    df['Gpt4o Response'] = results

    # Calculate the total correct answers
    accuracy = total_correct / len(correct_answers)
    output_file = "/content/output_file_rcm.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Gpt's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Print the total correct answers
    print(f'Total correct responses by GPT4o: {total_correct}/{len(correct_answers)}')
    # Print the final accuracy
    print(f'Final accuracy: {accuracy:.2%}')

# Supposons que vous ayez un DataFrame 'df' et une liste de prompts 'prompts'
df, prompts = read_prompts_rcm("/content/mydataset_rcm.tsv")
execute_prompts_rcm(df, prompts)


CWE-319
Failed to process prompt 1 after 2500 retries.
CWE-327
Failed to process prompt 2 after 2500 retries.
CWE-209
Failed to process prompt 3 after 2500 retries.
CWE-200
Failed to process prompt 4 after 2500 retries.
CWE-284
Failed to process prompt 5 after 2500 retries.
CWE-209
Failed to process prompt 6 after 2500 retries.
CWE-209
Failed to process prompt 7 after 2500 retries.
CWE-256
Failed to process prompt 8 after 2500 retries.
CWE-14
Failed to process prompt 9 after 2500 retries.
CWE-20
Failed to process prompt 10 after 2500 retries.
CWE-20
Failed to process prompt 11 after 2500 retries.
CWE-22
Failed to process prompt 12 after 2500 retries.
CWE-22
Failed to process prompt 13 after 2500 retries.
CWE-22
Failed to process prompt 14 after 2500 retries.
CWE-22
Failed to process prompt 15 after 2500 retries.
CWE-22
Failed to process prompt 16 after 2500 retries.
CWE-22
Failed to process prompt 17 after 2500 retries.
CWE-22
Failed to process prompt 18 after 2500 retries.
CWE-22
Fail

Total correct responses by GPT4o: 215/887
Final accuracy: 24.24%

In [14]:
import pandas as pd
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError
import os
from cvss import CVSS3  # Assuming you have a CVSS3 library or function to calculate the score

def get_cvss_score(cvss_vector):
    try:
        c = CVSS3(cvss_vector)
        cvss_score = c.scores()[0]
        return cvss_score
    except Exception as e:
        print(f"Error calculating CVSS score for vector {cvss_vector}: {e}")
        return None

def execute_prompts_vsp(df, prompts):
    results = []
    correct_answers = df['GT'].tolist()
    error = 0
    total = 0
    max_retries = 5  # Limit the number of retries

    for i, prompt in enumerate(prompts):
        retries = 0
        success = False
        while retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-4o-mini",
                    temperature=0,
                    max_tokens=170,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)

                pred = result.upper()
                gt = correct_answers[i].upper()

                try:
                    pred_vector = pred
                    pred_score = get_cvss_score(pred_vector)
                    gt_score = get_cvss_score(gt)
                    if pred_score is not None and gt_score is not None:
                        error += abs(pred_score - gt_score)
                        total += 1
                except Exception as e:
                    print(f"Invalid response at row {i + 1}: {e}")
                    continue

                success = True
                # Add a delay between requests to avoid hitting the rate limit
                time.sleep(2)  # Adjust the sleep duration as needed
                break  # Exit the retry loop if successful
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
                retries += 1
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['Gpt4o Response'] = results

    output_file = "/content/output_file_vsp.tsv"

    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with the responses
    combined_df.to_csv(output_file, sep='\t', index=False)

    # Calculate and print the Mean Absolute Deviation (MAD)
    if total > 0:
        mad = error / total
        print(f'Mean Absolute Deviation: {mad}')
    else:
        print('No valid responses to calculate MAD.')

# Example usage
df, prompts = read_prompts_vsp("/content/mydataset_vsp.tsv")
execute_prompts_vsp(df, prompts)


Response for prompt 1: CVSS:3.1/AV:N/AC:H/PR:N/UI:N/S:U/C:H/I:H/A:N
Response for prompt 2: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N
Response for prompt 3: CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:L/I:N/A:N
Response for prompt 4: CVSS:3.1/AV:L/AC:L/PR:H/UI:N/S:C/C:H/I:H/A:N
Error calculating CVSS score for vector CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:L /I:N/A:N: Unknown value "L " in field "C:L "
Response for prompt 5: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H
Error calculating CVSS score for vector CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:L /I:L/A:N: Unknown value "L " in field "C:L "
Response for prompt 6: CVSS:3.1/AV:N/AC:H/PR:L/UI:N/S:U/C:H/I:H/A:N
Error calculating CVSS score for vector CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C: L/I:L/A:N: Unknown value " L" in field "C: L"
Response for prompt 7: CVSS:3.1/AV:P/AC:L/PR:N/UI:N/S:U/C:L/I:N/A:N
Response for prompt 8: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N
Response for prompt 9: CVSS:3.1/AV:L/AC:H/PR:H/UI:N/S:U/C:H/I:H/A:H
Response for prompt 10

Mean Absolute Deviation: 1.8466666666666667

In [16]:
import pandas as pd
import os
import time
from http.client import RemoteDisconnected
from google.api_core.exceptions import TooManyRequests, InternalServerError


def read_prompts_taa(file):
    try:
        df = pd.read_csv(file, sep='\t', encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file, sep='\t', encoding='ISO-8859-1')

    prompt_column = df.columns[-1]  # Assuming the prompt is in the last column
    return df, df[prompt_column].tolist()

def execute_prompts_taa(df, prompts):
    results = []
    max_retries = 2500  # Maximum number of retries for connection errors

    for i, prompt in enumerate(prompts):
        success = False
        retries = 0

        while not success and retries < max_retries:
            try:
                chat_completion = client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt,
                        }
                    ],
                    model="gpt-4o-mini",
                    temperature=0.9,
                    max_tokens=21,
                )

                response = chat_completion.choices[0].message

                # Check if the response content exists
                if not response or not hasattr(response, 'content') or not response.content:
                    raise ValueError(f"No valid response for prompt number {i+1}. Response might be blocked or empty.")

                result = response.content.strip()
                print(f"Response for prompt {i+1}: {result}")
                results.append(result)
                success = True
            except TooManyRequests:
                print(f"Too many requests. Retrying prompt {i+1} after a delay...")
                time.sleep(60)  # Wait for 60 seconds before retrying
            except (ConnectionError, RemoteDisconnected) as e:
                print(f"Connection error: {e}. Retrying prompt {i+1}...")
                retries += 1
                time.sleep(2)  # Adjust the sleep duration as needed
            except ValueError as e:
                print(f"ValueError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt
            except InternalServerError as e:
                print(f"InternalServerError: {e}. Skipping prompt number {i+1}.")
                results.append("ERROR")
                break  # Skip this prompt

        if not success:
            print(f"Failed to process prompt {i+1} after {max_retries} retries.")
            results.append("ERROR")

    df['chatgpt4o Response'] = results
    return df

def save_results(df, output_file):
    if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
        existing_df = pd.read_csv(output_file, sep='\t', encoding='utf-8')
        combined_df = pd.concat([existing_df, df], ignore_index=True)
    else:
        combined_df = df

    # Save the updated file with Llama's responses
    combined_df.to_csv(output_file, sep='\t', index=False)

# Example usage
input_file = "/content/mydataset_taa.tsv"
output_file = "output_file_taa.tsv"

df, prompts = read_prompts_taa(input_file)
df_with_responses = execute_prompts_taa(df, prompts)
save_results(df_with_responses, output_file)


Response for prompt 1: Charming Kitten
Response for prompt 2: APT33
Response for prompt 3: APT41
Response for prompt 4: APT28
Response for prompt 5: APT28
Response for prompt 6: APT28
Response for prompt 7: APT33
Response for prompt 8: Lazarus Group
Response for prompt 9: SideWinder APT
Response for prompt 10: APT36
Response for prompt 11: APT10
Response for prompt 12: Lazarus Group
Response for prompt 13: Evil Corp
Response for prompt 14: Fancy Bear
Response for prompt 15: Lebanese Cyber Actors
Response for prompt 16: APT33
Response for prompt 17: [PLACEHOLDER]
Response for prompt 18: Lazarus Group
Response for prompt 19: APT28
Response for prompt 20: Lazarus Group
Response for prompt 21: ENERGETIC BEAR
Response for prompt 22: APT28 (Fancy Bear)
Response for prompt 23: APT10
Response for prompt 24: APT34
Response for prompt 25: APT33
Response for prompt 26: Lazarus Group
Response for prompt 27: APT29
Response for prompt 28: Cozy Bear (APT29)
Response for prompt 29: Charming Kitten
Res

In [19]:
def threat_actor_connection(actor1, actor2, alias_dict, related_dict):
    """
    Determines the connection type between two threat actors based on alias and related group information.

    Args:
        actor1: The first threat actor.
        actor2: The second threat actor.
        alias_dict: A dictionary where keys are threat actors and values are lists of their aliases.
        related_dict: A dictionary where keys are threat actors and values are lists of related threat actors.

    Returns:
        "C" if the actors are connected via an alias chain.
        "P" if the actors are connected via a related group chain.
        "I" if no connection is found.
    """

    actor1 = actor1.strip().lower()
    actor2 = actor2.strip().lower()

    # Normalize dictionaries and ensure bidirectional alias relationships
    alias_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in alias_dict.items()}
    for actor in list(alias_dict): # Iterate over a copy of the keys
        aliases = alias_dict[actor]
        for alias in aliases:
            if actor not in alias_dict.setdefault(alias, []):  # Avoid duplicates
                alias_dict[alias].append(actor)

    related_dict = {k.strip().lower(): [v.strip().lower() for v in val] for k, val in related_dict.items()}
    for actor in list(related_dict):  # Iterate over a copy of the keys
        related_groups = related_dict[actor]
        for related_actor in related_groups:
            if actor not in related_dict.setdefault(related_actor, []):
                related_dict[related_actor].append(actor)

    if is_alias_connected(actor1, actor2, alias_dict):
        return "C"

    if is_related_connected(actor1, actor2, alias_dict, related_dict):
        return "P"

    return "I"


def is_alias_connected(actor1, actor2, alias_dict):
    """
    Checks if two actors are connected through an alias chain using Breadth First Search (BFS).
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

    return False


def is_related_connected(actor1, actor2, alias_dict, related_dict) :
    """
    Checks if two actors are connected through a chain of aliases and related groups using BFS.
    """
    visited = set()
    queue = [actor1]

    while queue:
        current_actor = queue.pop(0)
        visited.add(current_actor)

        for alias in alias_dict.get(current_actor, []):
            if alias == actor2:
                return True
            if alias not in visited:
                queue.append(alias)

        for related_actor in related_dict.get(current_actor, []):
            if related_actor == actor2:
                return True
            if related_actor not in visited:
                queue.append(related_actor)

    return False

In [20]:
def compute_taa_accuracy(fname, col):
    """
    Returns Correct & Plausible Accuracy
    """
    df = pd.read_csv(fname, sep='\t')
    correct = 0
    plausible = 0
    total = 0

    for idx, row in df.iterrows():
        pred = row[col].lower().strip()
        gt = row['GT'].lower().strip()

        res = threat_actor_connection(gt, pred, alias_dict, related_dict)

        if res == 'C':
            correct += 1
        elif res == 'P':
            plausible += 1
        total += 1

    return correct/total*100, (correct+plausible)/total*100

In [22]:
print('Correct & Plausible Accuracy:', compute_taa_accuracy('/content/output_file_taa.tsv', 'chatgpt4o Response'))

Correct & Plausible Accuracy: (29.411764705882355, 61.76470588235294)


Correct & Plausible Accuracy: (29.411764705882355, 61.76470588235294)