Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added data sources function and field mappings #38

Merged
merged 2 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 72 additions & 3 deletions attackcti/attack_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from stix2 import TAXIICollectionSource, Filter, CompositeDataSource, FileSystemSource
from stix2.utils import get_type_from_id
#from stix2.v20.sdo import *
from taxii2client.v20 import Collection
import json
import os
Expand Down Expand Up @@ -86,6 +87,7 @@ def translate_stix_objects(self, stix_objects):
"x_mitre_difficulty_for_adversary_explanation": "difficulty_explanation",
"x_mitre_tactic_type": "tactic_type",
"x_mitre_impact_type": "impact_type",
"x_mitre_is_subtechnique": "is_subtechnique",
"external_references": "external_references"
}
mitigation_stix_mapping = {
Expand Down Expand Up @@ -190,6 +192,19 @@ def translate_stix_objects(self, stix_objects):
"name": "identity",
"identity_class": "identity_class"
}
data_source_stix_mapping = {
"type": "type",
"id": "id",
"created": "created",
"modified": "modified",
"name": "data_source",
"description": "description",
"created_by_ref": "created_by_ref",
"external_references": "external_references",
"x_mitre_platforms": "software_platform",
"x_mitre_collection_layers": "collection_layers",
"x_mitre_contributors": "contributors"
}

# ******** Helper Functions ********
def handle_list(list_object, object_type):
Expand Down Expand Up @@ -246,6 +261,8 @@ def handle_list(list_object, object_type):
stix_mapping = identity_stix_mapping
elif obj['type'] == "marking-definition":
stix_mapping = marking_stix_mapping
elif obj['type'] == "x-mitre-data-source":
stix_mapping = data_source_stix_mapping
else:
return stix_objects_list

Expand Down Expand Up @@ -438,6 +455,22 @@ def get_enterprise_tactics(self, stix_format=True):
if not stix_format:
enterprise_tactics = self.translate_stix_objects(enterprise_tactics)
return enterprise_tactics

def get_enterprise_data_sources(self, stix_format=True):
""" Extracts all the available data source STIX objects availalbe in the Enterprise ATT&CK matrix. This function filters all STIX objects by the type x-mitre-data-source.

Args:
stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique')

Returns:
List of STIX objects
"""
enterprise_data_sources = self.TC_ENTERPRISE_SOURCE.query(Filter("type", "=", "x-mitre-data-source"))
for ds in enterprise_data_sources:
ds['data_components']= self.get_data_components_by_data_source(ds)
if not stix_format:
enterprise_data_sources = self.translate_stix_objects(enterprise_data_sources)
return enterprise_data_sources

# ******** Pre ATT&CK Domain [DEPRECATED] 11/23/2020 *******
def get_pre(self, stix_format=True):
Expand Down Expand Up @@ -944,6 +977,21 @@ def get_tactics(self, stix_format=True):
if not stix_format:
all_tactics = self.translate_stix_objects(all_tactics)
return all_tactics

def get_data_sources(self, stix_format=True):
""" Extracts all the available data source STIX objects availalbe in the ATT&CK TAXII collections. This function filters all STIX objects by the type x-mitre-data-source and also retrieves data components for each data source object.

Args:
stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique')

Returns:
List of STIX objects

"""
data_sources = self.get_enterprise_data_sources()
if not stix_format:
data_sources = self.translate_stix_objects(data_sources)
return data_sources

# ******** Custom Functions ********
def get_technique_by_name(self, name, case=True, stix_format=True):
Expand Down Expand Up @@ -1593,16 +1641,16 @@ def get_data_component_by_technique(self, stix_object, stix_format=True):
all_data_component_list = self.translate_stix_objects(all_data_component_list)
return all_data_component_list

def get_data_sources(self):
""" Extracts data sources metadata from all technique STIX objects accross all ATT&CK matrices """
def get_data_sources_metadata(self):
""" Extracts data sources metadata from all technique STIX objects accross all ATT&CK matrices. This function uses the x_mitre_data_sources field from attack-pattern objects. This function does NOT retrieve data sources as objects. Data sources as objects are now retrieved by the get_data_sources() function."""
techniques = self.get_techniques()
data_sources = []
for t in techniques:
if 'x_mitre_data_sources' in t.keys():
data_sources += [d for d in t['x_mitre_data_sources'] if d not in data_sources]
return data_sources

def get_techniques_by_datasources(self, *args, stix_format=True):
def get_techniques_by_data_sources(self, *args, stix_format=True):
""" Extracts technique STIX objects by specific data sources accross all ATT&CK matrices

Args:
Expand Down Expand Up @@ -1679,3 +1727,24 @@ def export_groups_navigator_layers(self):
}
with open(('{0}_{1}.json'.format(k,v[0]['group_id'])), 'w') as f:
f.write(json.dumps(actor_layer))

def get_data_components_by_data_source(self, stix_object, stix_format=True):
""" Extracts data component STIX objects referenced by a data source STIX object.

Args:
stix_object (stix object) : STIX Object data source to retrieve data component SITX objects from.
stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique')

Returns:
List of STIX objects

"""

filter_objects = [
Filter('type', '=', 'x-mitre-data-component'),
Filter('x_mitre_data_source_ref', '=', stix_object['id'])
]
data_components = self.TC_ENTERPRISE_SOURCE.query(filter_objects)
if not stix_format:
data_components = self.translate_stix_objects(data_components)
return data_components
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
# Reference:
# https://packaging.python.org/tutorials/packaging-projects/

import setuptools
from setuptools import find_packages, setup

with open('README.md')as f:
long_description = f.read()

setuptools.setup(
setup(
name="attackcti",
version="0.3.4.4",
author="Roberto Rodriguez",
description="ATTACK CTI Libary",
description="MITRE ATTACK CTI Python Libary",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/OTRF/ATTACK-Python-Client",
keywords="threat hunting dfir cti cyber threat intelligence mitre att&ck",
packages=setuptools.find_packages(),
packages=find_packages(),
install_requires=[
'stix2',
'taxii2-client',
Expand Down