In [1]:
# Get the environment variables
from dotenv import load_dotenv
import os

load_dotenv()
print(f"OpenAI API key => {len(os.getenv('OPENAI_API_KEY')) * '#'}")
print(f"Langchain API Key => {len(os.getenv('LANGCHAIN_API_KEY')) * '#'}")
print(f"Langchain project name => {os.getenv('LANGCHAIN_PROJECT_NAME')}")
print(f"Langchain endpoint => {os.getenv('LANGCHAIN_ENDPOINT')}")
print(f"Langchain tracking => {os.getenv('LANGCHAIN_TRACING_V2')}")

print(f"OpenSearch HOST => {os.getenv('OPENSEARCH_HOST')}")
print(f"OpenSearch PORT => {os.getenv('OPENSEARCH_PORT')}")
print(f"OpenSearch index name => {os.getenv('OPENSEARCH_INDEX_NAME')}")
print(f"OpenSearch account ID => {os.getenv('OPENSEARCH_ACCOUNT_ID')}")
print(f"OpenSearch account password => {len(os.getenv('OPENSEARCH_ACCOUNT_PASSWORD')) * '#'}")

OpenAI API key => ########################################################
Langchain API Key => ###################################################
Langchain project name => ex_cti_rag_mitre_002
Langchain endpoint => https://api.smith.langchain.com
Langchain tracking => true
OpenSearch HOST => 15.165.123.179
OpenSearch PORT => 9200
OpenSearch index name => ex_cti_rag_mitre_002
OpenSearch account ID => admin
OpenSearch account password => ###################


In [2]:
# Start Langsmith tracking
from langsmith import traceable
from langchain.callbacks.tracers import LangChainTracer
from langchain.callbacks.manager import CallbackManager

tracer = LangChainTracer(
    project_name=os.getenv('LANGCHAIN_PROJECT_NAME'),
)
callback_manager = CallbackManager([tracer])

# Wrap OpenAI functions
from langchain_openai import OpenAI
openai_llm = OpenAI(temperature=0, callback_manager=callback_manager)

In [10]:
# Import necessary libraries
from MITREAttackScrapper.cti.groups import MITREAttackCTIGroups
from langchain_core.documents import Document
from typing import List, Dict
from tqdm import tqdm

# Initialize an empty list to store documents
documents: List[Document] = []

# Get the list of MITRE ATT&CK Group IDs
attack_group_id_list: List[str] = [attack_group['id'] for attack_group in MITREAttackCTIGroups.get_list()]

# Iterate over a subset of the attack group IDs to fetch detailed information
for attack_group_id in tqdm(attack_group_id_list):
    attack_group_detail = MITREAttackCTIGroups.get(attack_group_id)
    
    # Extract the details of the threat group
    attack_group_id: str                                    = attack_group_detail['id']
    attack_group_name: str                                  = attack_group_detail['name']
    attack_group_description: str                           = attack_group_detail['description']
    attack_group_contributors: List[str]                    = attack_group_detail.get('contributors', [])
    attack_group_version: str                               = attack_group_detail.get('version', 'N/A')
    attack_group_created: str                               = attack_group_detail.get('created', 'N/A')
    attack_group_last_modified: str                         = attack_group_detail.get('last_modified', 'N/A')
    attack_group_url: str                                   = attack_group_detail['url']
    attack_group_associated_groups: List[Dict[str, str]]    = attack_group_detail.get('associated_group_descriptions', [])
    attack_group_techniques: List[Dict[str, str]]           = attack_group_detail.get('techniques_used', [])
    attack_group_softwares: List[Dict[str, str]]            = attack_group_detail.get('software', [])
    attack_group_references: Dict[int, Dict[str, str]]      = attack_group_detail.get('references', {})

    # Create a description related to the threat group
    description = f"{attack_group_name} (MITRE ATT&CK Group ID: {attack_group_id}) is a threat group that {attack_group_description}.\n"
    
    # Add contributors
    if attack_group_contributors:
        description += f"\nContributors: {', '.join(attack_group_contributors)}\n"
    
    # Add creation and modification dates
    description += f"\nVersion: {attack_group_version}\nCreated: {attack_group_created}\nLast Modified: {attack_group_last_modified}\n"
    
    # Add associated groups
    if attack_group_associated_groups:
        description += "\nAssociated Groups:\n"
        for assoc_group in attack_group_associated_groups:
            description += f"- {assoc_group['name']}: {assoc_group['description']}\n"
    else:
        description += "\nAssociated Groups: None\n"
    
    # Add techniques used
    if attack_group_techniques:
        description += "\nTechniques Used:\n"
        for technique in attack_group_techniques:
            description += (
                f"- {technique['main_technique_name']} (ID: {technique['main_technique_id']}): "
                f"{technique['use']} (URL: {technique['main_technique_url']})\n"
            )
            if 'sub_technique_id' in technique and 'sub_technique_name' in technique:
                description += (
                    f"  - Sub-Technique: {technique['sub_technique_name']} (ID: {technique['sub_technique_id']}) "
                    f"(URL: {technique['sub_technique_url']})\n"
                )
    else:
        description += "\nTechniques Used: None\n"
    
    # Add software used
    if attack_group_softwares:
        description += "\nSoftware Used:\n"
        for software in attack_group_softwares:
            description += f"- {software['name']} (ID: {software['id']}): {', '.join(software['references'])} (URL: {software['url']})\n"
            if 'techniques' in software:
                for technique in software['techniques']:
                    description += f"  - Associated Technique: {technique['name']} (URL: {technique['url']})\n"
    else:
        description += "\nSoftware Used: None\n"

    # Add references
    if attack_group_references:
        description += "\nReferences:\n"
        for ref_id, ref_detail in attack_group_references.items():
            description += f"[{ref_id}] {ref_detail['text']} (URL: {ref_detail['url']})\n"
    else:
        description += "\nReferences: None\n"
    
    # Create a document and add it to the list
    document = Document(page_content=description, 
                        metadata={"id": attack_group_id, 
                                  "name": attack_group_name,
                                  "url": attack_group_url,
                                  "reference": attack_group_references})
    documents.append(document)

# The 'documents' list now contains the detailed documents for each MITRE ATT&CK group

100%|██████████| 152/152 [01:43<00:00,  1.47it/s]


hi
