In [95]:
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import pandas as pd
from stix2.v21 import (ThreatActor, Identity, AttackPattern, Campaign, IntrusionSet, Relationship, ExternalReference, Bundle, Grouping)
from efficient_apriori import apriori
import re
from pyattck import Attck
import requests
import json
from stix2 import MemoryStore, Filter

In [23]:
# Settings Values - for Apriori Algorithm
# TODO - 2: Do we need to adjust these numbers, in order to refine our algorithm metrics??
confidenceLevel = 0.70
supportLevel = 0.05
abstract = False

In [58]:
def NameToCode(gName):
    # create an instance of the Attck class
    attck = Attck()

    # get all APT groups in the framework
    apt_groups = attck.enterprise.actors

    # create a dictionary mapping APT group names to G codes
    group_to_gcode = {}
    for group in apt_groups:
        if group.name == gName:
            gcode = group.id
            return gcode
    return ""


'G0016'

In [84]:
def GenerateTestSet():
    df = pd.read_csv("datasets/Categorized_Adversary_TTPs.csv").loc[:, ['mitre_attack_name', 'mitre_attack_ttps']] # sample dataset of attacks
    test_threat_profiles = {}
    for row in df.values:
        gcode = NameToCode(row[0].strip())
        tcodes = (row[1].strip("'][").replace("'","").split(', '))
        test_threat_profiles[gcode] = tcodes
    
    return test_threat_profiles
    
def GenerateAprioriLists():

    #Data from Scott at Tidal Cyber
    ttpLists = []
    df1 = json.load(open("datasets/TidalCyberData/otx_running.json")) [1:]
    for row in df1:
        if len(row[7]) > 0 and type(row[7]) == type([]):
            ttpLists.append(["'" + ttp + "'" for ttp in row[7]])
    
    df2 = json.load(open("datasets/TidalCyberData/Tidal OSINT Technique Extraction.json"))
    for row in df2:
        if len(row['Tidal Extracted Techniques']) > 0:
            ttpLists.append(["'" + ttp2 + "'" for ttp2 in row['Tidal Extracted Techniques']])
    # Some data comes from this dataset with TTPs
    #df3 = pd.read_csv("datasets/Categorized_Adversary_TTPs.csv") # sample dataset of attacks

    # More data is gained by using attck data from tool and malware TTPs
    attack = Attck()
    
    malwares = attack.enterprise.malwares + attack.enterprise.tools
    
    for malware in malwares:
        ttpLists.append(["'" + ttp.id + "'" for ttp in malware.techniques])

    # To use the apriori we need to generate a list of lists
    # aprList = ttpLists
    # for row in df3.values:
    #     aprList.append((row[13].strip('][').split(', ')))

    # get rid of empty sets
    return [ttp for ttp in aprList if len(ttp) > 1]

In [26]:
def AbstractTTPs(ttpList):
    # Take sub-techniques and remove the .### to abstract them to parent techniques 
    for i in range(0,len(ttpList)):
        ttpList[i] = [re.sub(r'\.[0-9]+', '', ttp) for ttp in ttpList[i]]
    return ttpList 

In [27]:
# takes a list of lists and returns a list of rules sorted by size 
def AprioriMining(aprList):
    # Perform apriori rule association mining
    itemsets, rules = apriori(aprList, min_support=supportLevel, min_confidence=confidenceLevel)
    
    # Sort by size to get the 1:1 mappings first and so on. 
    ruleNums = np.array([len(rule.lhs+rule.rhs) for rule in rules])
    rules = np.array(rules)
    inds = ruleNums.argsort()[::]
    rules = rules[inds]
    
    # Maximum rule size of 4 to limit number of rules, any rules with size > 4 are redundant anyways
    rules = [x for x in filter(lambda rule: len(rule.lhs+rule.rhs) <= 4, rules)]
    return rules

In [28]:
# Downloads latest MITRE framework from the branch
def get_data_from_branch(domain):
    """get the ATT&CK STIX data from MITRE/CTI. Domain should be 'enterprise-attack', 'mobile-attack' or 'ics-attack'. Branch should typically be master."""
    stix_json = requests.get(f"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/{domain}/{domain}.json").json()
    return MemoryStore(stix_data=stix_json["objects"])

src = get_data_from_branch("enterprise-attack")

In [29]:
# Get TTPs from seed, where seed is an APT group code: GXXXX
def ExtractSeedTTPs(seed):
    seeds = []
    # This runs but says that "actors" is not found
    actor = actors[seed]
    for ttp in actor.techniques:
        seeds.append(ttp.id)
        
    # If we are using abstracted TTPs then remove sub technique
    if (abstract):
        for i in range(0, len(seeds)):
            seeds[i] = re.sub(r'\.[0-9]+', '', seeds[i])
    return seeds

In [30]:
def ExportBundle(bundle, filename):
    with open(filename, "w") as f:
        f.write(bundle.serialize())
        f.close()

In [100]:
# Each rule has a confidence, lift, support, lhs, rhs, conviction, rule power factor (RPF), 

# Takes an APTGroup and generates hypothesized relationships and attack pattern objects
# Returns a tuple of: (profile, attackPatterns, relationships, attackPatterns)
def CreateRelationships(seeds, aprioriLists):
    profile = {} #stores the TTP and tactic for easy logging
    seen = [] #stores TTPs that have been seen by the algorithm
    attackPatterns = {} #stores generated attack pattern objects
    relationships = {} # stores generated relationship objects
    # attackPatterns = {} # stores generated grouping objects

    # Get seeds out of the APT group
    # seeds = ExtractSeedTTPs(APTGroup)
    # seeds = ["T1566", "T1204"]
    
    # Perform Association Rule Mining
    rules = AprioriMining(aprioriLists) 
    
    # Add seeds to activity-attack-graph as nodes
    for seed in seeds:
       ## print(seed)
        profile[seed] = 1
        # Get the stix object for the TTP
        ttp = src.query([ Filter("external_references.external_id", "=", seed) ])[0]
        
        # Create and add objects to lists, context is used to identify seeds from hypothesized events
        attackPatterns[seed] = src.query([ Filter("external_references.external_id", "=", seed) ])[0]
    
    # print(profile)
    
    seedTotals = ["'"+x+"'" for x in seeds]
    # Use a queue to iterate through and create a tree of TTPs
    while len(seeds) > 0:
        for rule in rules:
            # Check to see if the left hand side of a rule is satisfied 
            if "'"+seeds[0]+"'" in rule.lhs and set(rule.lhs).issubset(seedTotals):
                # If the lhs is satisfied, then loop through each TTP in the rhs 
                for ttpName in rule.rhs:
                    # If this TTP hasn't been visited already then create STIX objects
                    if ttpName not in seen:
                        ttp = src.query([ Filter("external_references.external_id", "=", ttpName[1:-1]) ])[0]
                        
                        # rule tactics 
                        tactics = []
                        for i in ttp['kill_chain_phases']:
                            tactics.append(i['phase_name'])
                        
                        # create a new grouping object with the rule name. 
                        
                        attackPatterns[ttpName[1:-1]] = ttp
                        seedTotals.append(ttpName)
                        seen.append(ttpName)
                        seeds.append(ttp['external_references'][0]['external_id'])

                    # if the relationship already exists between two objects then we take the one with higher confidence
                    # make sure the exact relationship does not alraedy exist
                    if not (attackPatterns[seeds[0]]['id'], attackPatterns[ttpName[1:-1]]['id']) in relationships:
                        # if the opposite relationship exists then choose the one with the highest confidence to add
                        # we do this because if we do not then on the graph there will be two arrows going opposite directions to connect the same 2 TTPs
                        # this causes clutter and makes the confidence values unreadable since they will be layered on top of each other
                        if (attackPatterns[ttpName[1:-1]]['id'], attackPatterns[seeds[0]]['id']) in relationships:
                            # if existing relationship has higher confidence, move on
                            if float(relationships[(attackPatterns[ttpName[1:-1]]['id'], attackPatterns[seeds[0]]['id'])]['relationship_type']) > rule.confidence:
                                continue # move to next iteration of for loop, so code after this statement won't execute
                            # if existing relationship has lower confidence, delete it and let the new relationship take it's place
                            else:
                                del relationships[(attackPatterns[ttpName[1:-1]]['id'], attackPatterns[seeds[0]]['id'])]
                        relationships[(attackPatterns[seeds[0]]['id'], attackPatterns[ttpName[1:-1]]['id'])] = Relationship(attackPatterns[seeds[0]]['id'], str(round(rule.confidence, 3)), attackPatterns[ttpName[1:-1]]['id'])
                        if not((ttpName[1:-1] in profile.keys() and profile[ttpName[1:-1]] >= 1)):
                            profile[ttpName[1:-1]] = round(rule.confidence, 3)
 
        # pop to progress the queue
        seeds.pop(0) 
    
    for seed in seeds:
        profile[seed] = 1
        

    return (profile, attackPatterns, relationships)

In [32]:
aprioriList = AbstractTTPs(GenerateAprioriLists()) # generate lists for apriori

In [43]:
# 
intelSeed = ["T1566", "T1204"] # Use-case: Observed TTPs
profile, attackPatterns, relationships = CreateRelationships(intelSeed, aprioriList)

In [45]:
# Displays the results of the Apiori Algorithm 
print(relationships)

# This is the hypothesized profile
apt_x = profile

{('attack-pattern--7385dfaf-6886-4229-9ecd-6fd678040830', 'attack-pattern--8c32eb4d-805f-4fc5-bf60-c4d476c131b5'): Relationship(type='relationship', spec_version='2.1', id='relationship--0938da37-9b6a-4ace-9ab1-90793415914b', created='2023-04-01T00:58:46.645Z', modified='2023-04-01T00:58:46.645Z', relationship_type='0.77', source_ref='attack-pattern--7385dfaf-6886-4229-9ecd-6fd678040830', target_ref='attack-pattern--8c32eb4d-805f-4fc5-bf60-c4d476c131b5', revoked=False), ('attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a', 'attack-pattern--8c32eb4d-805f-4fc5-bf60-c4d476c131b5'): Relationship(type='relationship', spec_version='2.1', id='relationship--ba591998-4098-42bf-9a21-9b533074bbe7', created='2023-04-01T00:58:46.649001Z', modified='2023-04-01T00:58:46.649001Z', relationship_type='0.794', source_ref='attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a', target_ref='attack-pattern--8c32eb4d-805f-4fc5-bf60-c4d476c131b5', revoked=False), ('attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4

In [46]:
# Bundle STIX Objects for Visualization
bundle = Bundle(list(attackPatterns.values())+list(relationships.values()), allow_custom=True)

In [47]:
# Export bundle for visualization here: https://github.com/yukh1402/cti-stix-diamond-activity-attack-graph
ExportBundle(bundle, "test.json")

In [37]:
# Phase 2: Make APT Threat Profiles
attack = Attck()

# Dictionary to hold all the threat profiles. 
# The key is a given APT and the values are a list of TTPs
threat_profiles = {}

for actor in attack.enterprise.actors:
    technique_dict = {}
    for technique in actor.techniques:
        technique_dict[technique.id[0:5]] = 1
    threat_profiles[actor.id] = technique_dict
    
print(threat_profiles["G0085"])


# TODO: Add extended_threat profiles
# Create threatprofile Seed -- need to parse the value of the keys.
extended_threat_profiles = {}
for apt, t_codes in threat_profiles.items():
    ttp_list = list(t_codes.keys())
    profile, attackPatterns, relationships = CreateRelationships(ttp_list, aprioriList)
    extended_threat_profiles[apt] = profile

print(extended_threat_profiles["G0085"])

# TODO: Add progress bar??


{'T1114': 1, 'T1078': 1, 'T1566': 1, 'T1056': 1, 'T1204': 1, 'T1059': 1, 'T1564': 1, 'T1071': 1, 'T1090': 1}


KeyboardInterrupt: 

In [None]:
# Phase 2: WEIGHTED JACCARD SIMILARITY
# Side note: Find a way to relate techniques and sub-techniques. 

# TODO: Create a function that compares two threat profiles and produces a Weighted Jaccard. Just returns a number. 
def weighted_jac (profile1, profile2):

    # Extract the keys as sets
    keys1 = set(profile1.keys())
    keys2 = set(profile2.keys())

    # Calculate the Jaccard index
    # Only need the normal intersection and union
    intersection = keys1.intersection(keys2)
    union = keys1.union(keys2)
    # jaccard_index = len(intersection) / len(union)

    # Weight the Jaccard index by the values in the dictionaries
    weighted_intersection = sum(min(profile1[k], profile2[k]) for k in intersection)
    weighted_union = sum(max(profile1.get(k, 0), profile2.get(k, 0)) for k in union)
    weighted_jaccard_index = weighted_intersection / weighted_union
    
    return weighted_jaccard_index
    #print("Weighted Jaccard index:", weighted_jaccard_index)


# AAG APT
#p1 = {'T1566': 0.836, 'T1204': 0.944, 'T1059': 0.752, 'T1105': 0.74, 'T1027': 0.751, 'T1071': 0.778, 'T1082': 0.819, 'T1547': 0.745, 'T1140': 0.745, 'T1057': 0.753, 'T1083': 0.78, 'T1070': 0.7}
# Known APT: G0085
#p2 = {'T1114': 1, 'T1078': 1, 'T1566': 1, 'T1056': 1, 'T1204': 1, 'T1059': 1, 'T1564': 1, 'T1071': 1, 'T1090': 1}

# weighted_jac(p1, p2)

In [None]:
# TODO: Create another function that calls the previous function on all of the threat profiles and our threat profile of interest.
# We could do some sort of ranking like list the APTs with the highest Weighted Jaccard first. 
def compare_apt(h_threatprofile, threat_profiles):
    # Make another dictionary 
    rankings = {}
    
    for key, value in threat_profiles.items(): 
        similarity  = (weighted_jac(value, h_threatprofile)) * 100
        percentage = round(similarity, 2)
        rankings[key] = percentage
    
    return rankings

In [None]:
compare_apt_dict = compare_apt(apt_x, extended_threat_profiles)

# SORTED -- NICE!
sorted_dict = dict(sorted(compare_apt_dict.items(), key=lambda x: x[1], reverse=True))
print(sorted_dict)


In [None]:
# Make some sort of visualization
import matplotlib.pyplot as plt

# Example dictionary
data = sorted_dict

# Extract keys and values from dictionary
labels = list(data.keys())
values = list(data.values())

# Create a bar chart using matplotlib
plt.bar(labels, values)

# Set chart title and axis labels
plt.title('APT THANGS')
plt.xlabel('APTs')
plt.ylabel('Similarity (%)')

# Display the chart
plt.show()


In [56]:
# Write the dictionary to a JSON file
import json

with open("Extended_Threat_Profiles", "w") as outfile:
    json.dump(extended_threat_profiles, outfile)
    
# Ahhhhh purrrrrr (pian)

In [None]:
## Here we evaluate our models 
test_set = GenerateTestSet() # this is just G code and associated TTPs, need to make profiles
test_profiles = {}

# create profiles using the test data and make it into a dictionary
# we filter data here to remove all empty lists
for gcode, tcodes in tqdm(test_set.items()):
    if (len(tcodes) < 1):
        continue
    tcodes = [tc[0:5] for tc in tcodes]
    profile, attackPatterns, relationships = CreateRelationships(tcodes, aprioriList)
    test_profiles[gcode] = profile
    print(gcode)


  1%|▋                                                                                 | 1/124 [00:17<36:33, 17.84s/it]

G0118


  2%|█▎                                                                                | 2/124 [00:19<17:17,  8.51s/it]

G0003


  2%|█▉                                                                                | 3/124 [00:20<10:20,  5.12s/it]

G0036
