# ATT&CK GPT
References:
* https://python.langchain.com/en/latest/modules/indexes/getting_started.html

## Create ATT&CK Groups Knowledge Base

### Import Modules and Define Variables

In [1]:
from attackcti import attack_client
import os
import logging

logging.getLogger('taxii2client').setLevel(logging.CRITICAL)

In [2]:
# Define a few variables
current_directory = os.path.dirname("__file__")
knowledge_directory = os.path.join(current_directory, "knowledge")
db_directory = os.path.join(current_directory, "db")
templates_directory = os.path.join(current_directory, "templates")
group_template = os.path.join(templates_directory, "group.md")

### Initialize ATT&CK Client

In [3]:
lift = attack_client()

### Get ATT&CK Groups Knowledge
Gettings technique STIX objects used by all groups accross all ATT&CK matrices..

In [4]:
techniques_used_by_groups = lift.get_techniques_used_by_all_groups()
techniques_used_by_groups[0]

{'type': 'intrusion-set',
 'id': 'intrusion-set--b7f627e2-0817-4cd5-8d50-e75f8aa85cc6',
 'created_by_ref': 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5',
 'created': '2023-02-23T15:31:38.829Z',
 'modified': '2023-04-17T21:49:16.371Z',
 'name': 'LuminousMoth',
 'description': '[LuminousMoth](https://attack.mitre.org/groups/G1014) is a Chinese-speaking cyber espionage group that has been active since at least October 2020. [LuminousMoth](https://attack.mitre.org/groups/G1014) has targeted high-profile organizations, including government entities, in Myanmar, the Philippines, Thailand, and other parts of Southeast Asia. Some security researchers have concluded there is a connection between [LuminousMoth](https://attack.mitre.org/groups/G1014) and [Mustang Panda](https://attack.mitre.org/groups/G0129) based on similar targeting and TTPs, as well as network infrastructure overlaps.(Citation: Kaspersky LuminousMoth July 2021)(Citation: Bitdefender LuminousMoth July 2021)',
 'aliases': ['L

### Create ATT&CK Groups Documents

In [5]:
import copy
from jinja2 import Template

# Create Group docs
all_groups = dict()
for technique in techniques_used_by_groups:
    if technique['id'] not in all_groups:
        group = dict()
        group['group_name'] = technique['name']
        group['group_id'] = technique['external_references'][0]['external_id']
        group['created'] = technique['created']
        group['modified'] = technique['modified']
        group['description'] = technique['description']
        group['aliases'] = technique['aliases']
        if 'x_mitre_contributors' in technique:
            group['contributors'] = technique['x_mitre_contributors']
        group['techniques'] = []
        all_groups[technique['id']] = group
    technique_used = dict()
    technique_used['matrix'] = technique['matrix']
    technique_used['domain'] = technique['x_mitre_domains']
    technique_used['platform'] = technique['platform']
    technique_used['tactics'] = technique['tactic']
    technique_used['technique_id'] = technique['technique_id']
    technique_used['technique_name'] = technique['technique']
    technique_used['use'] = technique['relationship_description']
    if 'data_sources' in technique:
        technique_used['data_sources'] = technique['data_sources']
    all_groups[technique['id']]['techniques'].append(technique_used)

if not os.path.exists(knowledge_directory):
   print("[+] Creating knowledge directory..")
   os.makedirs(knowledge_directory)

print("[+] Creating markadown files for each group..")
markdown_template = Template(open(group_template).read())
for key in list(all_groups.keys()):
    group = all_groups[key]
    print("  [>>] Creating markdown file for {}..".format(group['group_name']))
    group_for_render = copy.deepcopy(group)
    markdown = markdown_template.render(metadata=group_for_render, group_name=group['group_name'], group_id=group['group_id'])
    file_name = (group['group_name']).replace(' ','_')
    open(f'{knowledge_directory}/{file_name}.md', encoding='utf-8', mode='w').write(markdown)

[+] Creating markadown files for each group..
  [>>] Creating markdown file for LuminousMoth..
  [>>] Creating markdown file for Metador..
  [>>] Creating markdown file for CURIUM..
  [>>] Creating markdown file for EXOTIC LILY..
  [>>] Creating markdown file for Moses Staff..
  [>>] Creating markdown file for SideCopy..
  [>>] Creating markdown file for Aoqin Dragon..
  [>>] Creating markdown file for Earth Lusca..
  [>>] Creating markdown file for POLONIUM..
  [>>] Creating markdown file for LAPSUS$..
  [>>] Creating markdown file for Ember Bear..
  [>>] Creating markdown file for BITTER..
  [>>] Creating markdown file for Aquatic Panda..
  [>>] Creating markdown file for Confucius..
  [>>] Creating markdown file for LazyScripter..
  [>>] Creating markdown file for TeamTNT..
  [>>] Creating markdown file for Andariel..
  [>>] Creating markdown file for Ferocious Kitten..
  [>>] Creating markdown file for IndigoZebra..
  [>>] Creating markdown file for BackdoorDiplomacy..
  [>>] Creat

## Generate Knowledge Base Embeddings

### Load Documents

In [6]:
import glob
from langchain.document_loaders import UnstructuredMarkdownLoader
import tiktoken

In [7]:
# variables
group_files = glob.glob(os.path.join(knowledge_directory, "*.md"))

# Loading Markdown files
md_docs = []
print("[+] Loading Group markdown files..")
for group in group_files:
    print(f' [*] Loading {os.path.basename(group)}')
    loader = UnstructuredMarkdownLoader(group)
    md_docs.extend(loader.load())

print(f'[+] Number of .md documents processed: {len(md_docs)}')

tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

tiktoken.encoding_for_model('gpt-3.5-turbo')
token_counts = [tiktoken_len(doc.page_content) for doc in md_docs]

print(f"""[+] Token Counts:
Min: {min(token_counts)}
Avg: {int(sum(token_counts) / len(token_counts))}
Max: {max(token_counts)}""")

[+] Loading Group markdown files..
 [*] Loading admin@338.md
 [*] Loading Ajax_Security_Team.md
 [*] Loading ALLANITE.md
 [*] Loading Andariel.md
 [*] Loading Aoqin_Dragon.md
 [*] Loading APT-C-36.md
 [*] Loading APT1.md
 [*] Loading APT12.md
 [*] Loading APT16.md
 [*] Loading APT17.md
 [*] Loading APT18.md
 [*] Loading APT19.md
 [*] Loading APT28.md
 [*] Loading APT29.md
 [*] Loading APT3.md
 [*] Loading APT30.md
 [*] Loading APT32.md
 [*] Loading APT33.md
 [*] Loading APT37.md
 [*] Loading APT38.md
 [*] Loading APT39.md
 [*] Loading APT41.md
 [*] Loading Aquatic_Panda.md
 [*] Loading Axiom.md
 [*] Loading BackdoorDiplomacy.md
 [*] Loading BITTER.md
 [*] Loading BlackOasis.md
 [*] Loading BlackTech.md
 [*] Loading Blue_Mockingbird.md
 [*] Loading BRONZE_BUTLER.md
 [*] Loading Carbanak.md
 [*] Loading Chimera.md
 [*] Loading Cleaver.md
 [*] Loading Cobalt_Group.md
 [*] Loading Confucius.md
 [*] Loading CopyKittens.md
 [*] Loading CURIUM.md
 [*] Loading Darkhotel.md
 [*] Loading DarkHyd

### Split Documents

In [8]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from tqdm.auto import tqdm
import hashlib

In [9]:
# Chunking Text
print('[+] Initializing RecursiveCharacterTextSplitter..')
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,  # number of tokens overlap between chunks
    length_function=tiktoken_len,
    separators=['\n\n', '\n', ' ', '']
)

print('[+] Splitting documents in chunks..')
chunks = text_splitter.split_documents(md_docs)
print(f'[+] Number of chunks: {len(chunks)}')

[+] Initializing RecursiveCharacterTextSplitter..
[+] Splitting documents in chunks..
[+] Number of chunks: 534


In [10]:
print('[+] Splitting text in chunks..')
json_documents = []
chunks_documents = []
m = hashlib.md5()
for doc in tqdm(md_docs):
    doc_name = os.path.basename(doc.metadata['source'])
    m.update(doc_name.encode('utf-8'))
    uid = m.hexdigest()[:12]
    chunks = text_splitter.split_text(doc.page_content)
    for i, chunk in enumerate(chunks):
        # Add JSON object to array
        json_documents.append({
            'id': f'{uid}-{i}',
            'text': chunk,
            'source': doc_name
        })
        # Create docs
        document = Document(page_content=chunk, metadata={"source": doc_name})
        chunks_documents.append(document)

print(f'[+] Final Documents count: {len(json_documents)}')

[+] Splitting text in chunks..


  0%|          | 0/134 [00:00<?, ?it/s]

[+] Final Documents count: 534


### Export Knowledge Base as JSONL File (Optional)

In [11]:
import json

In [12]:
if not os.path.exists(db_directory):
   print('[+] Creating database directory..')
   os.makedirs(db_directory)

print(f'[+] Exporting groups as .jsonl file..')
with open(f'{os.path.join(db_directory, "attack-groups.jsonl")}', 'w') as f:
    for doc in json_documents:
        f.write(json.dumps(doc) + '\n')

[+] Exporting groups as .jsonl file..


### Generate Embeddings

In [2]:
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
import openai
import os
from dotenv import load_dotenv

current_directory = os.path.dirname("__file__")
db_directory = os.path.join(current_directory, "db")

In [3]:
# Get your key: https://platform.openai.com/account/api-keys
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

In [4]:
db_file = os.path.join(db_directory,"db.pkl")

if not os.path.exists(db_file):
    print("[+] Starting embedding..")
    embeddings = OpenAIEmbeddings()

    # Send text chunks to OpenAI Embeddings API
    print("[+] Sending chunks to OpenAI Embeddings API..")
    db = FAISS.from_documents(chunks_documents, embeddings)

### Create Database Pickle File (Optional)

In [7]:
import pickle
import os

current_directory = os.path.dirname("__file__")
db_directory = os.path.join(current_directory, "db")

In [8]:
db_file = os.path.join(db_directory,"db.pkl")

if os.path.exists(db_file):
    with open(db_file, "rb") as f:
        db = pickle.load(f)
else:
    # Save vectorstore
    print("[+] Create Pickle file..")
    with open(db_file, "wb") as f:
        pickle.dump(db, f)

print(type(db))

<class 'langchain.vectorstores.faiss.FAISS'>


## Query ATT&CK Groups Knowledge Base

### Initialize OpenAI (Optional)

In [9]:
from dotenv import load_dotenv
import openai

# Get your key: https://platform.openai.com/account/api-keys
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

### Define Vector Store Retriever
The retriever interface is a generic interface that makes it easy to combine documents with language models. This interface exposes a get_relevant_documents method which takes in a query (a string) and returns a list of documents.

In [10]:
retriever = db.as_retriever(search_kwargs={"k":10})

### Get Relevant Documents

In [11]:
query = "What are some phishing techniques used by threat actors?"

In [12]:
print("[+] Getting relevant documents for query..")
relevant_docs = retriever.get_relevant_documents(query)
relevant_docs

[+] Getting relevant documents for query..


[Document(page_content='APT39 leveraged spearphishing emails with malicious attachments to initially compromise victims.(Citation: FireEye APT39 Jan 2019)(Citation: Symantec Chafer February 2018)(Citation: FBI FLASH APT39 September 2020)|', metadata={'source': 'APT39.md'}),
 Document(page_content='Lazarus Group has been observed targeting organizations using spearphishing documents with embedded malicious payloads. (Citation: Novetta Threat Research Group February 2016) Highly targeted spear phishing campaigns have been conducted against a U.S. electric grid company. (Citation: Eduard Kovacs March 2018)|', metadata={'source': 'Lazarus_Group.md'}),
 Document(page_content='TA505 has used spearphishing emails with malicious attachments to initially compromise victims.(Citation: Proofpoint TA505 Sep 2017)(Citation: Proofpoint TA505 June 2018)(Citation: Proofpoint TA505 Jan 2019)(Citation: Cybereason TA505 April 2019)(Citation: ProofPoint SettingContent-ms July 2018)(Citation: Proofpoint TA

### Question Answering

In [13]:
from langchain.chains.question_answering import load_qa_chain
from langchain.llms import OpenAI

In [14]:
chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff")
chain.run(input_documents=relevant_docs, question=query)

' Threat actors have used spearphishing emails with malicious attachments, links to HTML application files embedded with malicious code, links to malicious zip files, and messages with links designed to steal credentials or track victims.'