In [1]:
import typing
import dspy
import os 
import stix2
import dotenv
from Modules.BasicHtmlToTextParser import BasicHtmlToTextParser
from ThreatReportScraper.Scraper import Scraper
from pydantic import Field, BaseModel

_ = dotenv.load_dotenv("./.env")

In [2]:
llm_literal = typing.Literal["llama_3_2_1b_instruct", "llama_3_2_3b_instruct", "llama_3_1_8b_instruct", "llama_3_1_70b_instruct", "qwen_2_5_7b_instruct", "qwen_2_5_72b_instruct"]
optimization_literal = typing.Literal["FS-O1", "FS-O2", "ZERO-O1", "ZERO-O2"]


def get_deepinfra_llm(llm: llm_literal, temperature: float = 0.1, max_tokens: int = 1024, cache=False, cache_in_memory=False) -> dspy.LM: 
    base_args = dict(api_key=os.environ.get("DEEPINFRA_API_KEY"), base_url="https://api.deepinfra.com/v1/openai", temperature=temperature, max_tokens=max_tokens, cache=cache, cache_in_memory=cache_in_memory)

    deepinfra_llms = {
        "llama_3_2_1b_instruct": dspy.LM(model="openai/meta-llama/Llama-3.2-1B-Instruct", **base_args),
        "llama_3_2_3b_instruct": dspy.LM(model="openai/meta-llama/Llama-3.2-3B-Instruct", **base_args), 
        "llama_3_1_8b_instruct": dspy.LM(model="openai/meta-llama/Meta-Llama-3.1-8B-Instruct", **base_args),
        "llama_3_1_70b_instruct": dspy.LM(model="openai/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", **base_args),
        "qwen_2_5_7b_instruct": dspy.LM(model="openai/Qwen/Qwen2.5-7B-Instruct", **base_args), 
        "qwen_2_5_72b_instruct": dspy.LM(model="openai/Qwen/Qwen2.5-72B-Instruct", **base_args),
    }

    return deepinfra_llms[llm]


def load_dspy_module(module: typing.Literal["MalwareExtractor", "ThreatActorExtractor", "TargetsExtractor", "AttackPatternExtractor"], optimization: optimization_literal, llm: llm_literal, base_path="./Modules"):
    program = dspy.load(f"{base_path}/{module}/{optimization}/{llm}")
    return program

In [3]:
malware_extractor_llm_id: llm_literal = "llama_3_1_70b_instruct"
threat_actor_extractor_llm_id: llm_literal = "llama_3_1_70b_instruct"
attack_pattern_extractor_llm_id: llm_literal = "llama_3_1_70b_instruct"
targets_extractor_llm_id: llm_literal = "llama_3_1_70b_instruct"


malware_extractor = load_dspy_module("MalwareExtractor", optimization="FS-O1", llm=malware_extractor_llm_id)
threat_actor_extractor = load_dspy_module("ThreatActorExtractor", optimization="FS-O1", llm=threat_actor_extractor_llm_id)
attack_pattern_extractor = load_dspy_module("AttackPatternExtractor", optimization="FS-O1", llm=attack_pattern_extractor_llm_id)
targets_extractor = load_dspy_module("TargetsExtractor", optimization="FS-O1", llm=targets_extractor_llm_id)

In [None]:
scraper = Scraper("./ThreatReportScraper/firefox_profile/")
htmlParser = BasicHtmlToTextParser()

In [None]:
threat_report_html = scraper.scrape("https://www.threatfabric.com/blogs/cerberus-a-new-banking-trojan-from-the-underworld", default_wait_for_page_load=0.00000001)
threat_report_txt = htmlParser.forward(threat_report_html=threat_report_html)

In [None]:
class AttackPatternTriple(BaseModel):
    source: str = Field()
    source_type: typing.Literal["malware", "threat_actor", "campaign", "course_of_action", "indicator", "intrusion_set"] = Field()
    relationship: typing.Literal["uses", "mitigates", "indicates"] = Field()
    target_attack_pattern: str = Field()
    target_type: typing.Literal["attack_pattern"] = Field()

class TargetTriple(BaseModel):
    source: str = Field()
    source_type: typing.Literal["attack_pattern", "campaign", "intrusion_set", "malware", "threat_actor"] = Field()
    relationship: typing.Literal["targets"] = Field()
    target: str = Field()
    target_type: typing.Literal["identity", "location", "vulnerability", "infrastructure", "tool"] = Field()


    

def enforce_stix_attack_patterns(attack_pattern_triples: typing.List[AttackPatternTriple]) -> typing.List[AttackPatternTriple]:
    filtered_triples = []
    
    for triple in attack_pattern_triples:
        if triple.source_type in ["malware", "threat_actor", "campaign", "intrusion_set"]:
            if triple.relationship == "uses":
                filtered_triples.append(triple)
        
        elif triple.source_type == "course_of_action" and triple.relationship == "mitigates":
            filtered_triples.append(triple)
        
        elif triple.source_type == "indicator" and triple.relationship == "indicates":
            filtered_triples.append(triple)
    
    return filtered_triples



def enforce_stix_targets(target_triples: typing.List[TargetTriple]) -> typing.List[TargetTriple]:
    filtered_triples = []
    
    for triple in target_triples:
        if triple.source_type == "attack_pattern":
            if (triple.relationship == "targets" and 
                triple.target_type in ["identity", "location", "vulnerability", "tool"]):
                filtered_triples.append(triple)
                
        elif triple.source_type == "campaign":
            if (triple.relationship == "targets" and 
                triple.target_type in ["identity", "location", "vulnerability"]):
                filtered_triples.append(triple)
                
        elif triple.source_type == "intrusion_set":
            if (triple.relationship == "targets" and 
                triple.target_type in ["identity", "location", "vulnerability"]):
                filtered_triples.append(triple)
                
        elif triple.source_type == "malware":
            if (triple.relationship == "targets" and 
                triple.target_type in ["identity", "infrastructure", "vulnerability", "location"]):
                filtered_triples.append(triple)
                
        elif triple.source_type == "threat_actor":
            if (triple.relationship == "targets" and 
                triple.target_type in ["identity", "location", "vulnerability"]):
                filtered_triples.append(triple)
    
    return filtered_triples

In [None]:
stix_malwares, stix_threat_actors, stix_relationships = dict(), dict(), []
stix_attack_patterns, stix_targets = [], []

with dspy.settings.context(lm=get_deepinfra_llm(malware_extractor_llm_id, cache=True, cache_in_memory=True)): 
    malware_names = malware_extractor.forward(threat_report=threat_report_txt).malware_names

    malware_names = [malware_name.lower() for malware_name in malware_names]
    malware_names = list(set(malware_names))

    for malware_name in malware_names: 
        stix_malwares[malware_name] = stix2.Malware(name=malware_name, is_family=False)


with dspy.settings.context(lm=get_deepinfra_llm(threat_actor_extractor_llm_id, cache=True, cache_in_memory=True)):
    threat_actors = threat_actor_extractor(threat_report=threat_report_txt).threat_actors

    threat_actors = [threat_actor.lower() for threat_actor in threat_actors]
    threat_actors = list(set(threat_actors))

    for threat_actor in threat_actors: 
        stix_threat_actors[threat_actor] = stix2.ThreatActor(name=threat_actor)

In [None]:
with dspy.settings.context(lm=get_deepinfra_llm(attack_pattern_extractor_llm_id, cache=True, cache_in_memory=True)):
    attack_patterns = attack_pattern_extractor(threat_report=threat_report_txt, mentioned_malwares=list(set(malware_names)), mentioned_threat_actors=threat_actors).attack_pattern_triples
    
    attack_patterns = enforce_stix_attack_patterns(attack_patterns)

In [None]:
for attack_pattern_triple in attack_patterns:
    source_ref = attack_pattern_triple.source.lower()
    source_type = attack_pattern_triple.source_type

    # 
    if source_type == "campaign":
        continue
    elif source_type == "course_of_action":
        continue
    elif source_type == "indicator":
        continue
    elif source_type == "intrusion_set":
        continue
    elif source_type == "malware":
        if source_ref in malware_names: 
            source_ref = stix_malwares[source_ref]
        else: 
            continue
    elif source_type == "threat_actor":
        if source_ref in threat_actors: 
            source_ref = threat_actors[source_ref]
        else:
            continue
    else:
        raise Exception("Unhandled literal value")


    attack_pattern = stix2.AttackPattern(name=attack_pattern_triple.target_attack_pattern)
    rel = stix2.Relationship(source_ref, attack_pattern_triple.relationship, attack_pattern)

    stix_attack_patterns.append(attack_pattern)
    stix_relationships.append(rel)

In [None]:
with dspy.settings.context(lm=get_deepinfra_llm(targets_extractor_llm_id, cache=True, cache_in_memory=True)):
    targets = targets_extractor(threat_report=threat_report_txt, mentioned_malwares=malware_names, mentioned_threat_actors=threat_actors).targets_triples
    targets = enforce_stix_targets(targets)

In [None]:
for targets_triple in targets:

    # 1. handle the source type 
    source_ref = targets_triple.source.lower()
    source_type = targets_triple.source_type

    if source_type == "campaign":
        continue
    elif source_type == "attack_pattern":
        continue
    elif source_type == "intrusion_set":
        continue
    elif source_type == "malware":
        if source_ref in malware_names: 
            source_ref = stix_malwares[source_ref]
        else: 
            continue
    elif source_type == "threat_actor":
        if source_ref in threat_actors: 
            source_ref = threat_actors[source_ref]
        else:
            continue
    else: 
        raise Exception("Unhandled literal value")
    

    # 2. handle the target type 
    target_type = targets_triple.target_type
    target_ref = None

    if target_type == "identity": 
        target_ref = stix2.Identity(name=targets_triple.target)
    elif target_type == "infrastructure":
        target_ref = stix2.Infrastructure(name=targets_triple.target)
    elif target_type == "location":
        target_ref = stix2.Location(region=targets_triple.target, name=targets_triple.target)
    elif target_type == "tool":
        target_ref = stix2.Tool(name=targets_triple.target)
    elif target_type == "vulnerability":
        target_ref = stix2.Vulnerability(name=targets_triple.target)
    else: 
        raise Exception("Unhandled literal value")


    
    rel = stix2.Relationship(source_ref, targets_triple.relationship, target_ref)
    
    stix_targets.append(target_ref)
    stix_relationships.append(rel)

In [None]:
stix_bundle = stix2.Bundle(list(stix_malwares.values()), list(stix_threat_actors.values()), stix_attack_patterns, stix_targets, stix_relationships)

print(stix_bundle.serialize())

In [None]:
for target_triple in targets:
    source_ref = target_triple.source.lower() 

    target_type = target_triple.target_type

    if target_type == "tool":
        target = stix2.Tool(name=target_triple.target)
    elif target_type == "location":
        target = stix2.Location(region=target_triple.target)
    else:
        continue


    if target_triple.source_type == "malware":
        if source_ref in stix_malwares.keys(): 
            source_ref = stix_malwares[source_ref]

            rel = stix2.Relationship(source_ref=source_ref.id, 
                                     relationship_type=target_triple.relationship, 
                                     target_ref=attack_pattern.id)            
            
            stix_relationships.append(rel)
            stix_targets.append(target)




+ Warum das ganze? --> High Quality CTI mit Datengrundlage
+ Kostengünstig (sofern weitgehend automatisiert)
+ 


+ mentioned_malware, mentioned_threat_actor ist aus dem Dataset (cheat??) (es ist aber auch unbekannt wie andere Ansätze das machen)
+ Alle Ansätze basieren auf vollständigem Workflow vom Threat-Report (URL/HTML) --> txt --> STIX-Bundle
+ Datasets umfassen den Threat-Report oft nur als bereinigten Klartext, wenn dort ist es also txt --> STIX-Bundle (nich vollständig automatisiert, zumindest nicht in der Eval berücksichtigt)


+ Man könnte erweiterte Strategien zeigen
+ Hier ist Relation Extraction und Objekt Extraction zusammen, man kann aber natürlich weiter aufdröseln! 
+ Ebenfalls Kombination von Ansätzen mit hoher Precision und hoher Recall sinnvoll? 