Skip to content
This repository has been archived by the owner on Oct 10, 2019. It is now read-only.

Commit

Permalink
Merge pull request #48 from automationator/master
Browse files Browse the repository at this point in the history
Adds support to filter indicators by NOT intel source
  • Loading branch information
automationator committed Mar 18, 2019
2 parents d721cff + 0126e9d commit 995f19e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
6 changes: 6 additions & 0 deletions services/web/project/api/routes/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,12 @@ def read_indicators():
modified_before = datetime.date.min
filters.add(Indicator.modified_time < modified_before)

# NOT Source filter (IntelReference)
if 'not_sources' in request.args:
not_sources = request.args.get('not_sources').split(',')
for ns in not_sources:
filters.add(~Indicator.references.any(IntelReference.source.has(IntelSource.value == ns)))

# Source filter (IntelReference)
if 'sources' in request.args:
sources = request.args.get('sources').split(',')
Expand Down
36 changes: 29 additions & 7 deletions services/web/project/tests/api/test_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,26 @@ def test_read_with_filters(client):

time.sleep(1)

indicator3_request, indicator3_response = create_indicator(client, 'Email', 'abcd@abcd.com', 'admin',
campaigns=['LOLcats'],
case_sensitive=False,
confidence='LOW',
impact='LOW',
intel_reference='http://blahblah2.com',
intel_source='VirusTotal',
status='New',
substring=False,
tags=['nanocore'])
assert indicator3_request.status_code == 201

time.sleep(1)

# Filter with bulk mode enabled.
request = client.get('/api/indicators?bulk=true')
response = gzip.decompress(request.data)
response = json.loads(response.decode('utf-8'))
assert request.status_code == 200
assert len(response) == 2
assert len(response) == 3

# Filter by case_sensitive
request = client.get('/api/indicators?case_sensitive=true')
Expand All @@ -806,7 +820,7 @@ def test_read_with_filters(client):
request = client.get('/api/indicators?created_before={}'.format(datetime.datetime.now()))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 2
assert len(response['items']) == 3
request = client.get('/api/indicators?created_before={}'.format(indicator2_response['created_time']))
response = json.loads(request.data.decode())
assert request.status_code == 200
Expand All @@ -817,11 +831,11 @@ def test_read_with_filters(client):
request = client.get('/api/indicators?created_after={}'.format(datetime.datetime.min))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 2
assert len(response['items']) == 3
request = client.get('/api/indicators?created_after={}'.format(indicator1_response['created_time']))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 1
assert len(response['items']) == 2
assert response['items'][0]['value'] == 'asdf@asdf.com'

# Filter by confidence
Expand All @@ -842,7 +856,7 @@ def test_read_with_filters(client):
request = client.get('/api/indicators?modified_before={}'.format(datetime.datetime.now()))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 2
assert len(response['items']) == 3
request = client.get('/api/indicators?modified_before={}'.format(indicator2_response['modified_time']))
response = json.loads(request.data.decode())
assert request.status_code == 200
Expand All @@ -853,11 +867,11 @@ def test_read_with_filters(client):
request = client.get('/api/indicators?modified_after={}'.format(datetime.datetime.min))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 2
assert len(response['items']) == 3
request = client.get('/api/indicators?modified_after={}'.format(indicator1_response['modified_time']))
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 1
assert len(response['items']) == 2
assert response['items'][0]['value'] == 'asdf@asdf.com'

# Filter by status
Expand Down Expand Up @@ -902,6 +916,14 @@ def test_read_with_filters(client):
assert len(response['items']) == 1
assert response['items'][0]['value'] == '1.1.1.1'

# Filter by NOT intel source
request = client.get('/api/indicators?not_sources=OSINT')
response = json.loads(request.data.decode())
assert request.status_code == 200
assert len(response['items']) == 2
assert response['items'][0]['value'] == 'asdf@asdf.com'
assert response['items'][1]['value'] == 'abcd@abcd.com'

# Filter by multiple
request = client.get('/api/indicators?tags=phish&type=IP')
response = json.loads(request.data.decode())
Expand Down

0 comments on commit 995f19e

Please sign in to comment.