# STIX 2.1 Domain Objects extractor

It extracts SDOs from a different block of texts by updating the list of SDOs found in the previous blocks.

In [None]:
import openai
import uuid
import json
import re
import os
from dotenv import load_dotenv
from stix2 import parse, exceptions, Bundle

### OpenAI Client

In [None]:
load_dotenv()

client = openai.OpenAI(
    api_key = os.getenv('OPENAI_API_KEY')
)
temperature = 0.7

### Utilities

In [None]:
# Extract the text from the input file, splitting it into blocks of fixed sixe
def extract_text_from_file(filename, TOKEN_LIMIT=15000):
    """
    Extract text from a .txt file into blocks of size TOKEN_LIMIT
    """
    with open(filename, "r", encoding="utf-8") as txt_file:
        content = txt_file.read()

    # Check the number of tokens
    num_tokens = len(content)

    text = []

    # Split the text if it exceeds the limit
    if num_tokens <= TOKEN_LIMIT:
        text = [content]
    else:
        text = [content[i:i+TOKEN_LIMIT] for i in range(0, num_tokens, TOKEN_LIMIT)]

    # Print results
    if isinstance(text, str):
        print(f"Text stored in a single variable with {num_tokens} tokens.")
    else:
        print(f"Text split into {len(text)} blocks of {TOKEN_LIMIT} tokens each.")

    return text

# Add UUIDs to 'id' fields in STIX objects, ensuring each object has a unique identifier.
def add_uuid_to_ids(stix_data):
    """
    Add UUIDs to 'id' fields in STIX objects.
    """
    for item in stix_data:
        if 'id' in item:
            object_type = item['type']
            item['id'] = f"{object_type}--{uuid.uuid4()}"
    return stix_data

### Variables

In [None]:
filename = "../text-reports/mandiant-apt1.txt"
stix_sdos = []
model_responses = []
text = extract_text_from_file(filename)

## Requesting extraction of SDOs from gpt-4

In [None]:
print("Generating STIX SDOs with GPT-4")

for i in range(len(text)):
    print("\n***************************************")
    print(f"* EXTRACTING FROM TEXT BLOCK {i}/{len(text)-1} ... *")
    print("***************************************\n")
    system_prompt_sdo = (
        # Role
        "You are a high skilled CTI analyst who focuses on STIX 2.1 Domain Objects (SDOs), with a strong drive for accuracy and validity.\n"
        # Task 
        "You are tasked with creating new STIX 2.1 Domain Objects (SDOs) from the provided threat intelligence text and expanding the already present SDOs in the provided list.\n"
        "You should follow this process:\n"
        "1. Extract any Named Entity (NE) in the text that aligns with the definition of STIX 2.1 Domain Objects.\n"
        "2. Collect all related information to the NE that should be present in an SDO from the text.\n"
        "3. Create a new SDO following the specified format and include it in the list.\n"
        # Specifics
        "Possible SDOs include: Attack Pattern, Campaign, Course of Action, Identity, Indicator, Intrusion Set, Malware, Observed Data, Report, Threat Actor, Tool, Vulnerability, Infrastructure, Sighting, Note, Opinion, Grouping, Incident, Location, Malware Analysis.\n"
        "Create relevant SDOs in JSON format, strictly adhering to the STIX 2.1 specification.\n"
        "For id property write just SDO_type-- following this example: \"id\": \"malware--\"\n"
        "The is_family field indicates whether the malware is a family (if true) or an instance (if false). The values true or false are always enclosed in quotes.\n"
        "Don't use created_by_ref and source_ref.\n"
        "Timestamp must be in ISO 8601 format.\n"
        "The values of \"created\" and \"modified\" should be both today's date.\n"
        "The labels property in malware is used to categorize or tag the malware object with descriptive terms (e.g., \"trojan\", \"backdoor\", \"ransomware\"), Must contain at least one string.\n"
        "The threat-actor labels property should be an array of strings representing categories or descriptive terms for the threat actor.\n"
        # Context
        "The goal is to extract a STIX 2.1 bundle from the text provided, your role is to extract the SDOs found in the text which will be bundled with the relationships and observable found by others.\n"
        "By accurately extracting the SDOs, you contribute importantly to the creation of the bundle, so keep in mind to be very accurate and reliable.\n"
        # Example
        "This is an example of an SDOs:\n"
        """[
        {
            "type": "threat-actor",
            "id": "threat-actor--a63b9ab7-7253-432d-98ea-5381207c74af",
            "name": "APT42",
            "description": "APT42 is an Iranian state-sponsored cyber espionage actor targeting Western and Middle Eastern NGOs, media organizations, academia, legal services, and activists. It operates on behalf of the Islamic Revolutionary Guard Corps Intelligence Organization (IRGC-IO).",
            "labels": [
                "state-sponsored",
                "cyber-espionage",
                "Iranian"
            ],
            "created": "2024-05-01T00:00:00Z",
            "modified": "2024-05-01T00:00:00Z"
        },
        {
            "type": "intrusion-set",
            "id": "intrusion-set--7e9242ea-dcab-49cf-8e6b-22919d2b361c",
            "name": "APT42 Operations",
            "description": "APT42 uses enhanced social engineering schemes to gain access to victim networks, including cloud environments, by harvesting credentials and using them to gain initial access. It exfiltrates data of strategic interest to Iran, using built-in features and open-source tools to avoid detection.",
            "labels": [
                "intrusion-set",
                "APT42"
            ],
            "created": "2024-05-01T00:00:00Z",
            "modified": "2024-05-01T00:00:00Z"
        },
        {
            "type": "malware",
            "id": "malware--d10904bd-10b9-4045-b514-f6c5f7d1de50",
            "name": "NICECURL",
            "description": "NICECURL is a backdoor written in VBScript that can download additional modules to be executed, including data mining and arbitrary command execution.",
            "is_family": "true",
            "labels": [
                "backdoor",
                "VBScript"
            ],
            "created": "2024-05-01T00:00:00Z",
            "modified": "2024-05-01T00:00:00Z"
        }
        ]\n"""
        # Notes
        "Ensure the output is a valid JSON array ([...]) containing only the updated SDOs list.\n"
        "The output should be correctly formatted as json with identation 4.\n"
        "Return only the JSON array, without any additional text, commentary, or code block delimiters (e.g., json).\n"
        "Always ensure that the output is complete, the JSON array should never be truncated.\n"
    )

    user_prompt = f"Text:\n{text[i]}"

    print("\nModel Response:")

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": system_prompt_sdo},
            {"role": "user", "content": user_prompt}
        ],
        temperature=temperature
    )

    raw_stix_sdo = response.choices[0].message.content

    print(raw_stix_sdo)

    raw_stix_sdo = raw_stix_sdo[raw_stix_sdo.find('['):]
    stix_sdos = json.loads(raw_stix_sdo)
    model_responses.append(json.loads(raw_stix_sdo))

In [None]:
combine_responses_prompt = (
             # Role
             "You are a high skilled CTI analyst who focuses on STIX 2.1 Domain Objects (SDOs), with a strong drive for accuracy and validity.\n"
             # Task 
             "You are tasked with combining the provided lists of SDOs into one final list.\n"
             "You should follow this process:\n"
             "1. All SDOs with the same name should be merged into one single object if they refer to the same Named Entity (eg. APT1).\n"
             "2. Enhance each of these objects with all the information found across all lists that relate to the Named Entity.\n"
             "3. All Named Entities should be present in some form.\n"
             "4. Every object in the input lists should contain unique informations that should not be discarded.\n"
             "5. Provide a JSON array containing unique SDOs that are enriched with all the information found in the input lists.\n"
             # Specifics
             "Possible SDOs include: Attack Pattern, Campaign, Course of Action, Identity, Indicator, Intrusion Set, Malware, Observed Data, Report, Threat Actor, Tool, Vulnerability, Infrastructure, Sighting, Note, Opinion, Grouping, Incident, Location, Malware Analysis.\n"
             "Create relevant SDOs in JSON format, strictly adhering to the STIX 2.1 specification.\n"
             "For id property write just SDO_type-- following this example: \"id\": \"malware--\"\n"
             "The is_family field indicates whether the malware is a family (if true) or an instance (if false). The values true or false are always enclosed in quotes.\n"
             "Don't use created_by_ref and source_ref.\n"
             "Timestamp must be in ISO 8601 format.\n"
             "The values of \"created\" and \"modified\" should be both today's date.\n"
             "The labels property in malware is used to categorize or tag the malware object with descriptive terms (e.g., \"trojan\", \"backdoor\", \"ransomware\"), Must contain at least one string.\n"
             "The threat-actor labels property should be an array of strings representing categories or descriptive terms for the threat actor.\n"
             # Context
             "The goal is to extract a STIX 2.1 bundle from the text provided, your role is to extract the SDOs found in the text which will be bundled with the relationships and observable found by others.\n"
             "By accurately extracting the SDOs, you contribute importantly to the creation of the bundle, so keep in mind to be very accurate and reliable.\n"
             # Example
             "This is an example of an SDOs:\n"
             """[
             {
                 "type": "threat-actor",
                 "id": "threat-actor--a63b9ab7-7253-432d-98ea-5381207c74af",
                 "name": "APT42",
                 "description": "APT42 is an Iranian state-sponsored cyber espionage actor targeting Western and Middle Eastern NGOs, media organizations, academia, legal services, and activists. It operates on behalf of the Islamic Revolutionary Guard Corps Intelligence Organization (IRGC-IO).",
                 "labels": [
                     "state-sponsored",
                     "cyber-espionage",
                     "Iranian"
                 ],
                 "created": "2024-05-01T00:00:00Z",
                 "modified": "2024-05-01T00:00:00Z"
             },
             {
                 "type": "intrusion-set",
                 "id": "intrusion-set--7e9242ea-dcab-49cf-8e6b-22919d2b361c",
                 "name": "APT42 Operations",
                 "description": "APT42 uses enhanced social engineering schemes to gain access to victim networks, including cloud environments, by harvesting credentials and using them to gain initial access. It exfiltrates data of strategic interest to Iran, using built-in features and open-source tools to avoid detection.",
                 "labels": [
                     "intrusion-set",
                     "APT42"
                 ],
                 "created": "2024-05-01T00:00:00Z",
                 "modified": "2024-05-01T00:00:00Z"
             },
             {
                 "type": "malware",
                 "id": "malware--d10904bd-10b9-4045-b514-f6c5f7d1de50",
                 "name": "NICECURL",
                 "description": "NICECURL is a backdoor written in VBScript that can download additional modules to be executed, including data mining and arbitrary command execution.",
                 "is_family": "true",
                 "labels": [
                     "backdoor",
                     "VBScript"
                 ],
                 "created": "2024-05-01T00:00:00Z",
                 "modified": "2024-05-01T00:00:00Z"
             }
             ]\n"""
             # Notes
             "Ensure the output is a valid JSON array ([...]) containing only the updated SDOs list.\n"
             "Return only the JSON array, without any additional text, commentary, or code block delimiters (e.g., json).\n"
             "Ensure the output is not truncated."
    )
        
sdo_list = f"Lists of SDOs:\n"
for response in model_responses:
    sdo_list += f"{response}\n"

print(sdo_list)
    
print("\n*************************************")
print("* COMBINING STIX DOMAIN OBJECTS ... *")
print("*************************************\n")
    
response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": combine_responses_prompt},
            {"role": "user", "content": sdo_list},
        ],
        temperature=temperature
)
combined_sdos = response.choices[0].message.content

print(combined_sdos)

stix_sdo_data = json.loads(combined_sdos)
if not isinstance(stix_sdo_data, list):
    raise ValueError("STIX SDOs must be a list of dictionaries.")

stix_data = add_uuid_to_ids(stix_sdo_data)
print("\n*********************************")
print("* EXTRACTED STIX Domain Objects *")
print("*********************************\n")
print(json.dumps(stix_data, indent=4))

## Creating the STIX bundle

In [None]:
def create_stix_bundle(sdo_data, sco_data, sro_data):
    """
    Create a STIX 2.1 bundle from input SDO, SCO, and SRO data.

    Args:
        sdo_data (list): List of valid SDO objects.
        sco_data (list): List of valid SCO objects.
        sro_data (list): List of valid SRO objects.

    Returns:
        str: A STIX 2.1 bundle in JSON format.
    """
    # Combine SDO, SCO, and SRO data
    all_objects = sdo_data + sco_data + sro_data
    #print("all_objects:", all_objects)

    bundle = {
        "type": "bundle",
        "id": f"bundle--{uuid.uuid4()}",
        "objects": all_objects
    }

    # Return the serialized bundle
    return json.dumps(bundle, indent=4)


# Create STIX bundle
stix_bundle = create_stix_bundle(add_uuid_to_ids(model_responses[-1]), [], [])

# Output the result
print("STIX Bundle:")
print(stix_bundle)

# Write the STIX bundle to the file
filename = "../bundles/APT1_merged.json"
with open(filename, 'w') as file:
    file.write(stix_bundle)
print(f"STIX bundle written to file: {filename}")