-
Notifications
You must be signed in to change notification settings - Fork 279
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix #376
- Loading branch information
Showing
4 changed files
with
144 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
|
||
import argparse | ||
from pathlib import Path | ||
|
||
from pymisp.tools import CSVLoader | ||
|
||
from pymisp import MISPEvent | ||
|
||
try: | ||
from keys import misp_url, misp_key, misp_verifycert | ||
from pymisp import ExpandedPyMISP | ||
offline = False | ||
except ImportError as e: | ||
offline = True | ||
print(f'Unable to import MISP parameters, unable to POST on MISP: {e}') | ||
|
||
''' | ||
Example: | ||
* If the CSV file has fieldnames matching the object-relation: | ||
load_csv.py -n file -p /tmp/foo.csv | ||
* If you want to force the fieldnames: | ||
load_csv.py -n file -p /tmp/foo.csv -f SHA1 fileName size-in-bytes | ||
''' | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser(description='Load a CSV file as MISP objects') | ||
parser.add_argument("-n", "--object_name", type=str, required=True, help="Template name of the objects in the CSV.") | ||
parser.add_argument("-p", "--path", required=True, type=Path, help="Path to the CSV file.") | ||
parser.add_argument("-f", "--fieldnames", nargs='*', default=[], help="Fieldnames of the CSV, have to match the object-relation allowed in the template. If empty, the fieldnames of the CSV have to match the template.") | ||
parser.add_argument("-s", "--skip_fieldnames", action='store_true', help="Skip fieldnames in the CSV.") | ||
parser.add_argument("-d", "--dump", action='store_true', help="(Debug) Dump the object in the terminal.") | ||
|
||
# Interact with MISP | ||
misp_group = parser.add_mutually_exclusive_group() | ||
misp_group.add_argument('-i', '--new_event', type=str, help="Info field of the new event") | ||
misp_group.add_argument('-u', '--update_event', type=int, help="ID of the existing event to update") | ||
|
||
args = parser.parse_args() | ||
|
||
if not args.fieldnames: | ||
has_fieldnames = True | ||
else: | ||
has_fieldnames = args.skip_fieldnames | ||
csv_loader = CSVLoader(template_name=args.object_name, csv_path=args.path, | ||
fieldnames=args.fieldnames, has_fieldnames=has_fieldnames) | ||
|
||
objects = csv_loader.load() | ||
if args.dump: | ||
for o in objects: | ||
print(o.to_json()) | ||
else: | ||
if offline: | ||
print('You are in offline mode, quitting.') | ||
else: | ||
misp = ExpandedPyMISP(url=misp_url, key=misp_key, ssl=misp_verifycert) | ||
if args.new_event: | ||
event = MISPEvent() | ||
event.info = args.new_event | ||
for o in objects: | ||
event.add_object(**o) | ||
new_event = misp.add_event(event) | ||
if isinstance(new_event, str): | ||
print(new_event) | ||
elif 'id' in new_event: | ||
print(f'Created new event {new_event.id}') | ||
else: | ||
print('Something went wrong:') | ||
print(new_event) | ||
else: | ||
for o in objects: | ||
new_object = misp.add_object(args.update_event, o) | ||
if isinstance(new_object, str): | ||
print(new_object) | ||
elif new_object.attributes: | ||
print(f'New {new_object.name} object added to {args.update_event}') | ||
else: | ||
print('Something went wrong:') | ||
print(new_event) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
|
||
from pathlib import Path | ||
import csv | ||
from pymisp import MISPObject | ||
|
||
|
||
class CSVLoader(): | ||
|
||
def __init__(self, template_name: str, csv_path: Path, fieldnames: list=[], has_fieldnames=False): | ||
self.template_name = template_name | ||
self.csv_path = csv_path | ||
self.fieldnames = [f.strip().lower() for f in fieldnames] | ||
if not self.fieldnames: | ||
# If the user doesn't pass fieldnames, we assume the CSV has them. | ||
self.has_fieldnames = True | ||
else: | ||
self.has_fieldnames = has_fieldnames | ||
|
||
def load(self): | ||
|
||
objects = [] | ||
|
||
with open(self.csv_path, newline='') as csvfile: | ||
reader = csv.reader(csvfile) | ||
if self.has_fieldnames: | ||
# The file has fieldnames, we either ignore it, or validate its validity | ||
fieldnames = [f.strip().lower() for f in reader.__next__()] | ||
if not self.fieldnames: | ||
self.fieldnames = fieldnames | ||
|
||
if not self.fieldnames: | ||
raise Exception(f'No fieldnames, impossible to create objects.') | ||
else: | ||
# Check if the CSV file has a header, and if it matches with the object template | ||
tmp_object = MISPObject(self.template_name) | ||
allowed_fieldnames = list(tmp_object._definition['attributes'].keys()) | ||
for fieldname in self.fieldnames: | ||
if fieldname not in allowed_fieldnames: | ||
raise Exception(f'{fieldname} is not a valid object relation for {self.template_name}: {allowed_fieldnames}') | ||
|
||
for row in reader: | ||
tmp_object = MISPObject(self.template_name) | ||
for object_relation, value in zip(self.fieldnames, row): | ||
tmp_object.add_attribute(object_relation, value=value) | ||
objects.append(tmp_object) | ||
return objects |