# ICS mitre mapping

This notebook attempts to visualize the mapping between several ICS threat actors and conducted attacks to the detections as to discover the attack.

This notebook uses data from the MITRE ATT&CK ICS framework using the MITRE STIX data in the [attack-stix-data](https://github.com/mitre-attack/attack-stix-data/tree/master) repository.

## Requirements

There are some libraries that we need and with which the code was tested

* **taxii2-client**: To talk to the taxii server
* **stix2**: To handle the stix2 encapsulated data
* **networkx**: To create and work with graphs
* **pygraphviz**: To visualize graphs
* **Pandas**: To create and manipulate dataframes

**NOTE:** Pygraphviz is actually a base package that needs to be installed via the terminal, for Ubuntu and other debian based systems run ```sudo apt-get install graphviz graphviz-dev```.

For other systems consult [pygraphviz install guide](https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt) 

In [1]:
%pip install taxii2-client==2.3.0
%pip install stix2==3.0.1
%pip install networkx==2.6.3
%pip install bokeh==2.3.3
%pip install pandas==1.3.5
%pip install pygraphviz==1.12
%pip install selenium
%pip install matplotlib
%pip install pyvis

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Import relevant modules for the notebook

In [2]:
from taxii2client.v20 import Server
from stix2 import TAXIICollectionSource, Filter
from taxii2client.v20 import Collection
import pandas as pd
import numpy as np
from pyvis.network import Network
import networkx as nx
import random

## Initial setup and retrieving data
The debugging level is lowered to only show critical messages, and we use the following url to work with: 
```https://cti-taxii.mitre.org/taxii/```


In [3]:
from taxii2client.v20 import Server

import logging
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
server = Server("https://cti-taxii.mitre.org/taxii/")

# Retrieve API root
api_root = server.api_roots[0]

# Select ICS ATT&CK data collection
MITRE_COLLECTION_ID = api_root.collections[3].id

Now get the actual collection, based on the selected collection ID

In [4]:
from stix2 import TAXIICollectionSource, Filter
from taxii2client.v20 import Collection

ATTACK_STIX_COLLECTIONS = "https://cti-taxii.mitre.org/stix/collections/"
MITRE_COLLECTION = Collection(ATTACK_STIX_COLLECTIONS + MITRE_COLLECTION_ID + "/")
MC_DATA = TAXIICollectionSource(MITRE_COLLECTION)

Also get the mitigation (course of action), detections, attacker groups, software, data source, asset and technique data. We retreive it as a mapping of unique IDs and human readable names.\
The function below is a simple helper function which queries the data and retrieves the human readable names from the STIX dataset

In [5]:
def generate_mc(type: str):
    MC_data = MC_DATA.query(Filter("type", "=", type))
    mc_data_mapping = {}
    for x in MC_data:
        mc_data_mapping[x['id']] = x['name']
    return mc_data_mapping

In [6]:
mc_mitigation_mapping = generate_mc('course-of-action')
mc_technique_mapping = generate_mc('attack-pattern')
mc_detection_mapping = generate_mc('x-mitre-data-component')
mc_intrusion_set_mapping = generate_mc('intrusion-set')
mc_malware_mapping = generate_mc('malware')
mc_data_source_mapping = generate_mc('x-mitre-data-source')
mc_asset_mapping = generate_mc('x-mitre-asset')
print(mc_asset_mapping)


{'x-mitre-asset--dcb1d1c1-b195-45bf-b4cf-5b98c5b859a5': 'Routers', 'x-mitre-asset--2b676abd-8263-49ea-81a4-78a7e1f776fe': 'Field I/O', 'x-mitre-asset--14932ed5-1098-4cc1-9f57-159ab7366787': 'Jump Host', 'x-mitre-asset--0804f037-a3b9-4715-98e1-9f73d19d6945': 'Virtual Private Network (VPN) Server', 'x-mitre-asset--69d1b1ef-e918-4cfd-9a98-29debd04cb32': 'Safety Controller', 'x-mitre-asset--68388d4f-8138-420b-be2b-5a7dfe9ff6b4': 'Data Gateway', 'x-mitre-asset--973bc51e-c41e-4cec-ac03-9389c71f3d0d': 'Application Server', 'x-mitre-asset--ecb81a8b-022e-4529-a404-55cffca7d3a3': 'Control Server', 'x-mitre-asset--e2c3336a-dd93-44d6-8246-f93cf132c499': 'Data Historian', 'x-mitre-asset--75f810ad-b678-4c57-b93b-fdc79bba0c04': 'Intelligent Electronic Device (IED)', 'x-mitre-asset--1769c499-55e5-462f-bab2-c39b8cd5ae32': 'Remote Terminal Unit (RTU)', 'x-mitre-asset--986c455b-0f43-42b6-8360-33ac48bd9990': 'Programmable Logic Controller (PLC)', 'x-mitre-asset--3a95f7e4-4877-4967-b2e8-e287976c3e64': 'Hum

## Setting of relevant visualization variables

In the code block below we set some relevant variables related to the visualization of the graphs

In [7]:
SHAPES = [
    "rect", 
    "rectangle",
    "box", 
    "circle",
    "ellipse",
    "triangle",
    "pentagon",
    "hexagon",
    "octagon",
    "diamond",
    "trapezium",
    "parallelogram",
    "house", 
    "invhouse",
    "Msquare",
    "Dsquare",
    "star",
    "cylinder",
    "note",
    "underline"
    ]
HEX_COLORS = [
    "#FF6666",
    "#66FF66",
    "#6666FF",
    "#FFFF66",
    "#66FFFF",
    "#FF66FF",
    "#B380B3",
    "#66B3B3",
    "#66B366",
    "#B38080",
    "#80808B",
    "#B3B366",
    "#B3B3B3",
    "#E0E0E0",
    "#C08080",
    "#66B3C0",
    "#B380DB",
    "#F0F0FA",
    "#D2C4B3",
    "#F0F0C0"
]
LEVEL_COUNT = 0

## Main graphing function

The code-block below contains the main functions for generating the desired graphs based on the STIX data.\
The following main functions are present in the code block below:

- Generate_relationships --> gets all relationships which are deemed relevant (add more if required)
- Generate network --> Main graph generation function
  - get_nodes --> simple helper function which gets all nodes from a specific type such as intrusion-set, attack-pattern, etc.
  - generate_nodes --> Helper function that actually creates the nodes and adds them to the graph
  - Generate edges --> Helper function to get all edges based in the node_types which are parsed to the method

### Usage

To genetate a new graph, call the generate `generate_network`, the only required positional argument is a name for the graph to be stored which needs to be a string.\
However this will generate and empty graph, so as to actually generate a graph with node please specify the node types to generate by setting the relevant booleans in the method call.\
Additionally there is an option to set the height of the generated graph, it is recommended to take a height greater that 1200 pixels as the graph is quite tall. \
The corresponding width is automagically generated with a 21:9 ratio. \
The function returns the generated graph after saving the visualization to a file, this is for further processing later so ensure to assign the method to a variable

### Limitations
There are some limitations with the functions as there are some relations which require other nodes, that they are sometimes generated as they are in an edge definition but not added as a node.
Additionally when gettting the initial set of relations, all (relevant) relations are collected.\
This was partly to ensure that no data was missing in the dataset, however it is not fully efficient.\
However, as this notebook will likely see limited use only, this is something that is taken for granted.


There is currently no data_source mapping as it is an embedded relationship for the detections, however that may require more re-work of the code that is currently worth it.

In [8]:
def generate_relationships():
    
    relationship_data = {'Source':[],'Destination':[],'source_human':[],'target_human':[]}
    data_source = []
    
    MC_RELATIONS = MC_DATA.query(Filter("type","=","relationship"))
    
    for idx, x in enumerate(MC_RELATIONS):
        if 'relationship_type' in x:
            if x['relationship_type'] == 'uses' and 'intrusion-set' in x['source_ref'] and 'malware' in x['target_ref']:
                relationship_data['Source'].append(x['source_ref'])
                relationship_data['Destination'].append(x['target_ref'])
                relationship_data['source_human'].append(mc_intrusion_set_mapping[x['source_ref']])
                relationship_data['target_human'].append(mc_malware_mapping[x['target_ref']])
            elif x['relationship_type'] == 'uses' and 'malware' in x['source_ref'] and 'attack-pattern' in x['target_ref']:
                relationship_data['Source'].append(x['source_ref'])
                relationship_data['Destination'].append(x['target_ref'])
                relationship_data['source_human'].append(mc_malware_mapping[x['source_ref']])
                relationship_data['target_human'].append(mc_technique_mapping[x['target_ref']])
            elif x['relationship_type'] == 'detects':
                relationship_data['Source'].append(x['source_ref'])
                relationship_data['Destination'].append(x['target_ref'])
                relationship_data['source_human'].append(mc_detection_mapping[x['source_ref']])
                relationship_data['target_human'].append(mc_technique_mapping[x['target_ref']])
                data_source.append(x['source_ref'])
            elif x['relationship_type'] == 'mitigates' and 'course-of-action' in x['source_ref'] and 'attack-pattern' in x['target_ref']:
                relationship_data['Source'].append(x['source_ref'])
                relationship_data['Destination'].append(x['target_ref'])
                relationship_data['source_human'].append(mc_mitigation_mapping[x['source_ref']])
                relationship_data['target_human'].append(mc_technique_mapping[x['target_ref']])
            elif x['relationship_type'] == 'targets':
                relationship_data['Source'].append(x['source_ref'])
                relationship_data['Destination'].append(x['target_ref'])
                relationship_data['source_human'].append(mc_technique_mapping[x['source_ref']])
                relationship_data['target_human'].append(mc_asset_mapping[x['target_ref']])
    data_df = pd.DataFrame(data=relationship_data)
    return data_df

def generate_network(
    graph_name: str,
    intrusions = False, 
    malware = False, 
    attack_patterns = False,
    detections = False,
    mitigations = False,
    assets = False,
    data_sources = False,
    height = 1200,
    ):
    Graph = nx.DiGraph()
    relationships = generate_relationships()
    
    global LEVEL_COUNT
    LEVEL_COUNT = 0
     
    def get_nodes(relations, location: str, sub_str: str):
        return relations[relations[location].str.startswith(sub_str)][location].unique()
    
    def generate_nodes(node_list, location:str, position:str, level:int):
        rand_int = random.randint(0,19)
        shape, color = SHAPES[rand_int], HEX_COLORS[rand_int]
        field = "source_human" if location == "Source" else "target_human"
        vertical_shift = height / node_list.size
        for node in node_list:
            node_title = relationships[relationships[location] == node][field].iloc[0]
            Graph.add_node(
                node, 
                label=node_title, 
                title=f"%(title)s, Stix: %(stix)s"%{'title':node_title, "stix":node}, 
                color= color, 
                level = level,
                x=x_positions[position], 
                y=y_positions[position],
                shape = shape
            )
            y_positions[position] += vertical_shift
   
    def generate_edges(relations, sub_strs):
        filtered_edges = relations[(relations['Source'].str.contains("|".join(sub_strs)))][['Source', 'Destination']].drop_duplicates()
        print(len(filtered_edges))
        return filtered_edges
        
        
    
    width = (height / 9)*21
    column_width = width / 6
    x_positions = {
        'IS':0*column_width,
        'MW':1*column_width,
        'AP':2*column_width,
        'DT':3*column_width, 
        'MT':4*column_width,
        'AS':5*column_width
        }
    y_positions = {'IS':0,'MW':0,'AP':0,'DT':0, 'MT':0, 'AS':0}
    
    edge_data = {'Source':[],'Destination':[],'source_human':[],'target_human':[]}
    sub_strings = []
    
    if intrusions:
        IS_nodes = get_nodes(relations=relationships, location="Source", sub_str="intrusion-set")
        generate_nodes(node_list=IS_nodes, location="Source", position="IS", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('intrusion-set')    
    if malware:
        MW_nodes = get_nodes(relations=relationships, location="Source", sub_str="malware")
        generate_nodes(node_list=MW_nodes, location="Source", position="MW", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('malware')    
    if attack_patterns:
        AP_nodes = get_nodes(relations=relationships, location="Destination", sub_str="attack-pattern")
        generate_nodes(node_list=AP_nodes, location="Destination", position="AP", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('attack-pattern')    
    if detections:
        DT_nodes = get_nodes(relations=relationships, location="Source", sub_str="x-mitre-data-component")
        generate_nodes(node_list=DT_nodes, location="Source", position="DT", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('x-mitre-data-component')    
    if mitigations:
        MT_nodes = get_nodes(relations=relationships, location="Source", sub_str="course-of-action")
        generate_nodes(node_list=MT_nodes, location="Source", position="MT", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('course-of-action')    
    if assets:
        AS_nodes = get_nodes(relations=relationships, location="Destination", sub_str="x-mitre-asset")
        generate_nodes(node_list=AS_nodes, location="Destination", position="AS", level=LEVEL_COUNT)
        LEVEL_COUNT += 1
        sub_strings.append('x-mitre-asset')    
    
    edges = generate_edges(relations=relationships, sub_strs=sub_strings)
    edge_list = [tuple(x) for x in edges.to_numpy()]
    Graph.add_edges_from(edge_list, color = 'skyblue')
    
    pyvis_network = Network(notebook=True, directed = True, filter_menu=True, height=f"%(height)spx"%{'height':height}, width=f"%(width)spx"%{'width':width})
    pyvis_network.from_nx(Graph)
    pyvis_network.toggle_physics(False)
    pyvis_network.save_graph(f"%(graph_name)s.html" % {"graph_name": graph_name})
    
    return Graph


## Example graphs

Below are function calls which generate two graphs one of which is a partial graph, with the other being a graph that includes all nodes.

In [9]:
partial_graph = generate_network('partial', intrusions=False, malware=False, attack_patterns=True, detections=True, mitigations=True, assets=True, height=2400)
full_graph = generate_network('full_DG', intrusions=True, malware=True, attack_patterns=True, detections=True, mitigations=True, assets=True, height=2400)

1055
1222


## Data analysis

**Note:** the cell below utilizes a graph that contains all possible nodes, thus most forms of analysis can be done.\
However there is no error handling within this notebook so there is the possiblility that errors will occur when performing analysis on nodes which are not within the graph.


Additionally in the code cell below we generate a graph with all nodes and edges possible which are extracted from the dataset.


### Definition degree

One of the concepts often covered in the blocks below is the concept of degree.\
Degree defines the number of edges coming into or going out of a node, this is mainly relevant for this notebook as many nodes have a one-to-many relationship in terms of edges.\
This allows for easy analysis of coverage from certain detections or mitigations, as well as teh impact of specific attack patterns.

In [175]:
df = generate_relationships()
DG = generate_network('analysis', intrusions=True, malware=True, attack_patterns=True, detections=True, mitigations=True, assets=True, height=2400)

1222


### Detection degrees

The code block below retrieves all all detections in the dataset as well as the degrees from each node.\
This gives us an overview of the detections which offer the most amount of coverage over the TTPs

In [176]:
# https://networkx.org/documentation/stable/auto_examples/drawing/plot_degree.html
detection_data = {'Detection':[],'Degree':[]}
for idx, deg in DG.degree():
    if idx.startswith('x-mitre-data'):
        human_detection = mc_detection_mapping[idx]
        detection_data['Detection'].append(human_detection)
        detection_data['Degree'].append(deg) 
detections = pd.DataFrame(data=detection_data)
detections = detections.sort_values(by=['Degree'], ascending=False, ignore_index=True)
display(detections)
#  detections.to_string(index=False)

Unnamed: 0,Detection,Degree
0,Network Traffic Content,35
1,Application Log Content,33
2,Network Traffic Flow,23
3,Process Creation,18
4,Command Execution,15
5,Device Alarm,13
6,OS API Execution,8
7,Process History/Live Data,8
8,Logon Session Creation,7
9,File Access,6


### Attack pattern degrees

The code block below does a similar thing as the block above where it gets the degree of each node. \
In this instance is show the number of different kinds of assets each attack pattern targets. \
The relevancy of this information comes to play for example to map the detections to an asset which it covers.

In [178]:
import numpy as np
ttp_data = {'Technique':[],'Degree':[]}
ttps = {}
for idx, deg in DG.degree():
    if idx.startswith('attack-pattern'):
        human_detection = mc_technique_mapping[idx]
        ttp_data['Technique'].append(human_detection)
        ttp_data['Degree'].append(deg) 
ttps = pd.DataFrame(data=ttp_data)
ttps.sort_values(by=['Degree'], ascending=False, ignore_index=True)

Unnamed: 0,Technique,Degree
0,Adversary-in-the-Middle,29
1,Valid Accounts,28
2,Device Restart/Shutdown,28
3,Service Stop,27
4,Exploitation of Remote Services,27
5,Remote Services,26
6,System Firmware,25
7,Data Destruction,24
8,Indicator Removal on Host,23
9,Remote System Information Discovery,23


### Mitigation degrees

The code block below does the same operation as the two blocks.\
The relevance of the data is similar to the data from two blocks back which is that it gives insight into the mitigations which cover the largest amount of ttps.

In [179]:
pd.set_option('display.max_rows', None)
mitigation_data = {'Mitigation':[],'Degree':[]}
for idx, deg in DG.degree():
    if idx.startswith('course-of-action'):
        human_detection = mc_mitigation_mapping[idx]
        mitigation_data['Mitigation'].append(human_detection)
        mitigation_data['Degree'].append(deg) 
mitigations = pd.DataFrame(data=mitigation_data)
mitigations = mitigations.sort_values(by=['Degree'], ascending=False, ignore_index=True)

### Small amount of detection coverage analysis

The code blocks below return the number of ttps which are covered when selecting some of the most covering detections or mitigations.\
A similar block still needs to be developed to ensure that we map the the coverage of asset types.

In [180]:
coverage_list = list(detections['Detection'].iloc[:5])
display(list(detections['Detection'].iloc[:5]))

# df[df['target_human'].isin(coverage_list)]
df_coverage = df.query('source_human.isin(@coverage_list)')
print(df_coverage['Destination'].unique().size)


['Network Traffic Content',
 'Application Log Content',
 'Network Traffic Flow',
 'Process Creation',
 'Command Execution']

60


In [182]:
mitigation_coverage = list(mitigations['Mitigation'].iloc[:5])
display(list(mitigations['Mitigation'].iloc[:10]))

# df[df['target_human'].isin(coverage_list)]
df_coverage = df.query('source_human.isin(@mitigation_coverage)')
print(df_coverage['Destination'].unique().size)

['Network Segmentation',
 'Network Allowlists',
 'Software Process and Device Authentication',
 'Human User Authentication',
 'Communication Authenticity',
 'Access Management',
 'Filter Network Traffic',
 'Audit',
 'Authorization Enforcement',
 'Out-of-Band Communications Channel']

39


From the two code blocks we can conclude that the coverage of the mitigations is less broad than that of the detections.\
This is an interesting statistical insight as it shows that active detections are still more important than mitigations in ICS environments.

In [187]:
coverage_data = {'Action':[],'Degree':[]}
for idx, deg in DG.degree:
    if idx.startswith('x-mitre-data'):
        human_detection = mc_detection_mapping[idx]
        coverage_data['Action'].append(human_detection)
        coverage_data['Degree'].append(deg) 
for idx, deg in DG.degree:
    if idx.startswith('course-of-action'):
        human_mitigation = mc_mitigation_mapping[idx]
        coverage_data['Action'].append(human_mitigation)
        coverage_data['Degree'].append(deg) 
coverage_df= pd.DataFrame(data=coverage_data)
coverage_df = coverage_df.sort_values(by=['Degree'], ascending=False, ignore_index=True)

The block below shows the coverage of ttps when combining both detections and mitigations.

In [189]:

coverage_list = list(coverage_df['Action'].iloc[:10])
# display(list(coverager'action'].iloc[:5]))
display(list(coverage_df['Action'].iloc[:10]))

# df[df['target_human'].isin(coverage_list)]
df_coverage = df.query('source_human.isin(@coverage_list)')
print(df_coverage['Destination'].unique().size)

['Network Traffic Content',
 'Application Log Content',
 'Network Segmentation',
 'Network Traffic Flow',
 'Network Allowlists',
 'Process Creation',
 'Software Process and Device Authentication',
 'Human User Authentication',
 'Communication Authenticity',
 'Command Execution']

64


## Things that still need doing

- troubleshoot breaking export to other folder.