Skip to content

Commit

Permalink
Added PatchPulse object
Browse files Browse the repository at this point in the history
Added object to make patching pulses easier
  • Loading branch information
chrisdoman committed May 4, 2017
1 parent 83f8412 commit 48c878f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 7 deletions.
4 changes: 2 additions & 2 deletions OTXv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ def get_pulse_indicators(self, pulse_id, limit=20):
return self.walkapi(self.create_url(PULSE_DETAILS + str(pulse_id) + "/indicators", limit=limit))


def edit_pulse_indicators(self, pulse_id, body):
def edit_pulse(self, pulse_id, body):
"""
Edits indicators in a pulse
Edits a pulse
:param pulse_id: The pulse you are editing the indicators in
:param body: The complete set of indicators this pulse will now contain
eg; body: {
Expand Down
24 changes: 24 additions & 0 deletions PatchPulse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class PatchPulse(object):
body = {}

def __init__(self, pulse_id):
self.pulse_id = pulse_id

def add(self, key, element):
if key not in self.body:
self.body[key] = {}
self.body[key]["add"] = element

def remove(self, key, element):
if key not in self.body:
self.body[key] = {}
self.body[key]["remove"] = element

def set(self, name, value):
self.body[name] = value

def getBody(self):
return self.body

def getPulseId(self):
return self.pulse_id
4 changes: 2 additions & 2 deletions howto_use_python_otx_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,7 @@
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
"version": 2.0
},
"file_extension": ".py",
"mimetype": "text/x-python",
Expand All @@ -2273,4 +2273,4 @@
},
"nbformat": 4,
"nbformat_minor": 0
}
}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
author_email='otx@alienvault.com',
url='https://github.com/AlienVault-Labs/OTX-Python-SDK',
download_url='https://github.com/AlienVault-Labs/OTX-Python-SDK/tarball/1.1.0',
py_modules=['OTXv2', 'IndicatorTypes'],
py_modules=['OTXv2', 'IndicatorTypes','PatchPulse'],
install_requires=['simplejson', 'requests']
)
27 changes: 25 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from utils import generate_rand_string
from OTXv2 import OTXv2, InvalidAPIKey, BadRequest, RetryError
import IndicatorTypes
from PatchPulse import PatchPulse



STRP_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
Expand Down Expand Up @@ -331,19 +333,40 @@ def test_create_pulse_and_update(self):

def test_create_pulse_and_edit(self):
"""
Test: create a pulse then add indicators
Test: create a pulse then add indicators via json directly
"""
indicator_list = [ {'indicator': "one.com", 'type': 'domain'} ]
indicators_to_add = [ {'indicator': "added.com", 'type': 'domain'} ]
add_indicators = { 'indicators': { 'add': indicators_to_add } }
name = "Pyclient-indicators-unittests-modify-pulse"
response = self.otx.create_pulse(name=name, public=False, indicators=indicator_list)
pulse_id = response['id']
response = self.otx.edit_pulse_indicators(pulse_id, add_indicators)
response = self.otx.edit_pulse(pulse_id, add_indicators)
new_indicators = str(response['indicators']['indicators'])
self.assertTrue('added.com' in new_indicators)
return


def test_create_pulse_and_edit_via_patch_pulse(self):
"""
Test: create a pulse then add indicators via a patch pulse object
"""
indicator_list = [ {'indicator': "one.com", 'type': 'domain'} ]
name = "Pyclient-indicators-unittests-modify-pulse-patch-pulse"
response = self.otx.create_pulse(name=name, public=False, indicators=indicator_list)
pulse_id = response['id']

# Edit the pulse using a patch pulse object
# We could also edit indicators etc. here
pp = PatchPulse(pulse_id)
pp.add("tags", ["addtag1", "addtag2"])
pp.set("description","New Description")

response = self.otx.edit_pulse(pulse_id, pp.getBody())
new_tags = str(response['tags'])
self.assertTrue('addtag1' in new_tags)
return

def test_create_pulse_tlp(self):
"""
Test: pulse with each TLP.
Expand Down

0 comments on commit 48c878f

Please sign in to comment.