Skip to content

Commit

Permalink
Use MISPEvent for STIX converter
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Oct 26, 2016
1 parent 4ef5053 commit 77f0193
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 55 deletions.
63 changes: 31 additions & 32 deletions threatintel/converters/buildSTIXAttribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,16 @@ def buildAttribute(attr, pkg, ind):
"""

# Extract
cat = attr["category"] # TODO: Not used
type_ = attr["type"]
value = attr["value"]
type_ = attr.type
value = attr.value

if type_ == "ip-src":
# An IP address. Add it as an Address Object.
addr = address_object.Address(address_value=value)
addr.is_source = True
addr.is_destination = False
obs = stix.indicator.Observable(addr)
obs.title = attr["comment"] or "IP Source"
obs.title = attr.comment or "IP Source"
ind.add_observable(obs)

elif type_ == "ip-dst":
Expand All @@ -59,30 +58,30 @@ def buildAttribute(attr, pkg, ind):
addr.is_source = False
addr.is_destination = True
obs = stix.indicator.Observable(addr)
obs.title = attr["comment"] or "IP Destination"
obs.title = attr.comment or "IP Destination"
ind.add_observable(obs)

elif type_ == "domain":
# A domain. Add as a DomainName Object.
dn = domain_name_object.DomainName()
dn.value = value
obs = stix.indicator.Observable(dn)
obs.title = attr["comment"] or "Domain"
obs.title = attr.comment or "Domain"
ind.add_observable(obs)

elif type_ == "hostname":
# A hostname. Add as Hostname Object.
hst = hostname_object.Hostname()
hst.hostname_value = value
obs = stix.indicator.Observable(hst)
obs.title = attr["comment"] or "Hostname"
obs.title = attr.comment or "Hostname"
ind.add_observable(obs)

elif type_ in ["url", "uri"]:
# UR(i|l). I guess we'll use a URI object (as URLs ⊆ URIs).
url = uri_object.URI(value)
obs = stix.indicator.Observable(url)
obs.title = attr["comment"] or "URI"
obs.title = attr.comment or "URI"
ind.add_observable(obs)

elif type_ in ["md5", "sha1", "sha256", "sha512"]:
Expand All @@ -98,15 +97,15 @@ def buildAttribute(attr, pkg, ind):
elif type_ == "sha512":
f.sha512 == value
obs = stix.indicator.Observable(f)
obs.title = attr["comment"] or "Hash (Simple)"
obs.title = attr.comment or "Hash (Simple)"
ind.add_observable(f)

elif type_ == "filename":
# Just a filename. Add it to a File Object, then add that.
f = file_object.File()
f.file_name = value
obs = stix.indicator.Observable(f)
obs.title = attr["comment"] or "Filename"
obs.title = attr.comment or "Filename"
ind.add_observable(f)

elif type_ in ["filename|md5", "filename|sha1", "filename|sha256", "filename|sha512"]:
Expand All @@ -124,7 +123,7 @@ def buildAttribute(attr, pkg, ind):
f.sha512 = hsh
f.file_name = fname
obs = stix.indicator.Observable(f)
obs.title = attr["comment"] or "File"
obs.title = attr.comment or "File"
ind.add_observable(f)

elif type_ in ["ssdeep", "authentihash", "imphash"]:
Expand All @@ -135,7 +134,7 @@ def buildAttribute(attr, pkg, ind):
hsh.fuzzy_hash_value = value
f.add_hash(hsh)
obs = stix.indicator.Observable(f)
obs.title = attr["comment"] or "Hash (Fuzzy)"
obs.title = attr.comment or "Hash (Fuzzy)"
ind.add_observable(f)

elif type_ == "threat-actor":
Expand All @@ -144,7 +143,7 @@ def buildAttribute(attr, pkg, ind):
ta = stix.common.ThreatActor()
ta.title = value
if "comment" in attr:
ta.description = attr["comment"]
ta.description = attr.comment
pkg.add_threat_actor(ta)

elif type_ == "campaign-name":
Expand All @@ -159,7 +158,7 @@ def buildAttribute(attr, pkg, ind):
# Arbritary link value.
lnk = link_object.Link(value)
obs = stix.indicator.Observable(lnk)
obs.title = attr["comment"] or "Link"
obs.title = attr.comment or "Link"
ind.add_observable(obs)

elif type_ == "email-src":
Expand All @@ -170,7 +169,7 @@ def buildAttribute(attr, pkg, ind):
esrc.from_ = value
emsg.header = esrc
obs = stix.indicator.Observable(emsg)
obs.title = attr["comment"] or "Email Source Address"
obs.title = attr.comment or "Email Source Address"
ind.add_observable(obs)

elif type_ == "email-subject":
Expand All @@ -181,7 +180,7 @@ def buildAttribute(attr, pkg, ind):
esub.subject = value
emsg.header = esub
obs = stix.indicator.Observable(emsg)
obs.title = attr["comment"] or "Email Subject Line"
obs.title = attr.comment or "Email Subject Line"
ind.add_observable(obs)

elif type_ == "email-attachment":
Expand All @@ -191,7 +190,7 @@ def buildAttribute(attr, pkg, ind):
att.append(value)
emsg.attachments = att
obs = stix.indicator.Observable(emsg)
obs.title = attr["comment"] or "Email Attachment"
obs.title = attr.comment or "Email Attachment"
ind.add_observable(obs)

elif type_ in ["email-dst", "target-email"]:
Expand All @@ -203,7 +202,7 @@ def buildAttribute(attr, pkg, ind):
esub.to = value
emsg.header = esub
obs = stix.indicator.Observable(emsg)
obs.title = attr["comment"] or "Email Destination Address"
obs.title = attr.comment or "Email Destination Address"
ind.add_observable(obs)

elif type_ == "attachment":
Expand All @@ -217,7 +216,7 @@ def buildAttribute(attr, pkg, ind):
mut = mutex_object.Mutex()
mut.name = value
obs = stix.indicator.Observable(mut)
obs.title = attr["comment"] or "Mutex"
obs.title = attr.comment or "Mutex"
ind.add_observable(obs)

elif type_ == "x509-fingerprint-sha1":
Expand All @@ -241,7 +240,7 @@ def buildAttribute(attr, pkg, ind):
regs.append(reg)
whois.registrants = regs
obs = stix.indicator.Observable(whois)
obs.title = attr["comment"] or "WHOIS Email"
obs.title = attr.comment or "WHOIS Email"
ind.add_observable(obs)

elif type_ == "whois-registrant-name":
Expand All @@ -254,7 +253,7 @@ def buildAttribute(attr, pkg, ind):
regs.append(reg)
whois.registrants = regs
obs = stix.indicator.Observable(whois)
obs.title = attr["comment"] or "WHOIS Registrant Name"
obs.title = attr.comment or "WHOIS Registrant Name"
ind.add_observable(obs)

elif type_ == "whois-creation-date":
Expand All @@ -263,7 +262,7 @@ def buildAttribute(attr, pkg, ind):
whois = whois_object.WhoisEntry()
whois.creation_date = value
obs = stix.indicator.Observable(whois)
obs.title = attr["comment"] or "WHOIS Creation Date"
obs.title = attr.comment or "WHOIS Creation Date"
ind.add_observable(obs)

elif type_ == "whois-registrar":
Expand All @@ -273,7 +272,7 @@ def buildAttribute(attr, pkg, ind):
reg.name = value
whois.registrar_info = reg
obs = stix.indicator.Observable(whois)
obs.title = attr["comment"] or "WHOIS Registrar"
obs.title = attr.comment or "WHOIS Registrar"
ind.add_observable(obs)

elif type_ == "pdb":
Expand All @@ -287,23 +286,23 @@ def buildAttribute(attr, pkg, ind):

# Add the IP
addr = address_object.Address(address_value=ip)
obs = stix.indicator.Observable(addr, title=attr["comment"])
obs.title = attr["comment"] or "IP Address"
obs = stix.indicator.Observable(addr, title=attr.comment)
obs.title = attr.comment or "IP Address"
ind.add_observable(obs)

# Now add the domain
dn = domain_name_object.DomainName()
dn.value = dom
obs = stix.indicator.Observable(dn)
obs.title = attr["comment"] or "Domain"
obs.title = attr.comment or "Domain"
ind.add_observable(obs)

elif type_ == "vulnerability":
# It's a CVE. Easy enough to deal with.
vuln = stix.exploit_target.Vulnerability()
vuln.cve_id = value
et = stix.exploit_target.ExploitTarget()
et.title = attr["comment"] or "Vulnerability"
et.title = attr.comment or "Vulnerability"
et.add_vulnerability(vuln)
pkg.add_exploit_target(et)

Expand Down Expand Up @@ -345,7 +344,7 @@ def buildAttribute(attr, pkg, ind):
vals.append(val)
regentry.values = vals
obs = stix.indicator.Observable(regentry)
obs.title = attr["comment"] or "Registry Key"
obs.title = attr.comment or "Registry Key"
ind.add_observable(obs)

elif type_ == "pattern-in-traffic":
Expand All @@ -357,7 +356,7 @@ def buildAttribute(attr, pkg, ind):
net = network_packet_object.NetworkPacket()
net.internet_layer = lay
obs = stix.indicator.Observable(net)
obs.title = attr["comment"] or "Pattern In Traffic"
obs.title = attr.comment or "Pattern In Traffic"
ind.add_observable(obs)

elif type_ == "user-agent":
Expand All @@ -374,15 +373,15 @@ def buildAttribute(attr, pkg, ind):
ses = http_session_object.HTTPSession()
ses.http_request_response = resp
obs = stix.indicator.Observable(ses)
obs.title = attr["comment"] or "User Agent"
obs.title = attr.comment or "User Agent"
ind.add_observable(obs)

elif type_ == "named pipe":
# Pipe pipe pipe pipe pipe pipe
p = pipe_object.Pipe()
p.name = value
obs = stix.indicator.Observable(p)
obs.title = attr["comment"] or "Named Pipe"
obs.title = attr.comment or "Named Pipe"
ind.add_observable(obs)

elif type_ == "AS":
Expand All @@ -392,7 +391,7 @@ def buildAttribute(attr, pkg, ind):
except ValueError:
as_.name = value
obs = stix.indicator.Observable(as_)
obs.title = attr["comment"] or "Autonomous System"
obs.title = attr.comment or "Autonomous System"
ind.add_observable(obs)

else:
Expand Down
30 changes: 7 additions & 23 deletions threatintel/converters/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
# Imports
# Sys imports
import logging
import json
from tempfile import NamedTemporaryFile

from pymisp import mispevent

# Stix imports
from stix.core import STIXPackage
from stix.core import STIXHeader
from stix.indicator import Indicator

# Local imports
from threatintel.errors import MISPLoadError, STIXLoadError
from threatintel.errors import STIXLoadError
from threatintel.converters import buildSTIXAttribute
from threatintel.converters import buildMISPAttribute

Expand All @@ -29,17 +30,8 @@ def MISPtoSTIX(mispJSON):
:returns stix: A STIX stix with as much of the original
data as we could convert.
"""
if not isinstance(mispJSON, dict):
# It's likely not a loaded JSON. Attempt to load it.
try:
mispJSON = json.loads(mispJSON)
except json.decoder.JSONDecodeError:
# We couldn't make head nor tail of it
try:
mispJSON = json.load(mispJSON)
except:
raise MISPLoadError("COULD NOT LOAD MISP JSON!")

misp_event = mispevent.MISPEvent()
misp_event.load(mispJSON)
# We should now have a proper MISP JSON loaded.

# Create a base stix
Expand All @@ -49,21 +41,13 @@ def MISPtoSTIX(mispJSON):
stix.stix_header = STIXHeader()

# Try to use the event title as the stix title
if "info" in mispJSON:
stix.stix_header.title = mispJSON["Event"]["info"]
else:
# We don't have an easy name for it
stix.stix_header.title = "MISP Export"
# Best we can do really

# Get the event Attributes
attributes = mispJSON["Event"]["Attribute"]
stix.stix_header.title = misp_event.info

# We're going to store our observables inside an indicator
indicator = Indicator()

# Go through each attribute and transfer what we can.
for one_attrib in attributes:
for one_attrib in misp_event.attributes:
# Build an attribute from the JSON. Is all nice.
buildSTIXAttribute.buildAttribute(one_attrib, stix, indicator)
stix.add_indicator(indicator)
Expand Down

0 comments on commit 77f0193

Please sign in to comment.