Skip to content

Commit

Permalink
server: remove host_filter from elastic rebuilders
Browse files Browse the repository at this point in the history
  • Loading branch information
bodik committed Nov 20, 2023
1 parent 1dcc0a0 commit 46c6cad
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 97 deletions.
8 changes: 2 additions & 6 deletions sner/server/storage/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,19 @@ def get_data(svc):
@click.option('--esd', help='elasticsearch url')
@click.option('--tlsauth_key', help='tlsauth key path')
@click.option('--tlsauth_cert', help='tlsauth cert path')
@click.option('--host_filter', type=click.Path(exists=True), help='path to host filter file')
def storage_rebuild_vulnsearch_elastic(**kwargs):
"""synchronize vulnsearch elk index"""

cvesearch = kwargs_or_config(kwargs, 'cvesearch')
esd = kwargs_or_config(kwargs, 'esd')
tlsauth_key = kwargs_or_config(kwargs, 'tlsauth_key')
tlsauth_cert = kwargs_or_config(kwargs, 'tlsauth_cert')
host_filter = Path(kwargs['host_filter']).read_text(encoding='utf-8').splitlines() if kwargs.get('host_filter') else None

if not all([cvesearch, esd]):
current_app.logger.error('configuration required (config or cmdline)')
sys.exit(1)

VulnsearchManager(cvesearch, tlsauth_key, tlsauth_cert).rebuild_elastic(esd, host_filter)
VulnsearchManager(cvesearch, tlsauth_key, tlsauth_cert).rebuild_elastic(esd)


@command.command(name='rebuild-vulnsearch-localdb', help='synchronize localdb vulnsearch')
Expand All @@ -181,20 +179,18 @@ def storage_rebuild_vulnsearch_localdb(**kwargs):
@click.option('--esd', help='elasticsearch url')
@click.option('--tlsauth_key', help='tlsauth key path')
@click.option('--tlsauth_cert', help='tlsauth cert path')
@click.option('--host_filter', type=click.Path(exists=True), help='path to host filter file')
def storage_rebuild_elasticstorage(**kwargs):
"""synchronize storage elk index"""

esd = kwargs_or_config(kwargs, 'esd')
tlsauth_key = kwargs_or_config(kwargs, 'tlsauth_key')
tlsauth_cert = kwargs_or_config(kwargs, 'tlsauth_cert')
host_filter = Path(kwargs['host_filter']).read_text(encoding='utf-8').splitlines() if kwargs.get('host_filter') else None

if not esd:
current_app.logger.error('configuration required (config or cmdline)')
sys.exit(1)

ElasticStorageManager(esd, tlsauth_key, tlsauth_cert).rebuild(host_filter)
ElasticStorageManager(esd, tlsauth_key, tlsauth_cert).rebuild()


@command.command(name='rebuild-versioninfo', help='rebuild versioninfo map')
Expand Down
28 changes: 7 additions & 21 deletions sner/server/storage/elasticstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from flask import current_app
from marshmallow import fields
from sqlalchemy import func
from sqlalchemy.dialects.postgresql import ARRAY, CIDR

from sner.server.api.schema import PublicHostSchema, PublicNoteSchema, PublicServiceSchema
from sner.server.utils import windowed_query
Expand Down Expand Up @@ -52,30 +50,22 @@ def __init__(self, esd_url, tlsauth_key=None, tlsauth_cert=None):
self.tlsauth_cert = tlsauth_cert
self.rebuild_buflen = current_app.config['SNER_ELASTICSTORAGE_REBUILD_BUFLEN']

def rebuild(self, host_filter=None):
def rebuild(self):
"""sychronize storage do elastic"""

index_time = datetime.now().strftime('%Y%m%d%H%M%S')
indexer = BulkIndexer(self.esd_url, self.tlsauth_key, self.tlsauth_cert, self.rebuild_buflen)
host_filter_expr = (
Host.address.op('<<=')(func.any(func.cast(host_filter, ARRAY(CIDR))))
if host_filter
else None
)
self.rebuild_hosts(indexer, index_time)
self.rebuild_services(indexer, index_time)
self.rebuild_notes(indexer, index_time)

self.rebuild_hosts(indexer, index_time, host_filter_expr)
self.rebuild_services(indexer, index_time, host_filter_expr)
self.rebuild_notes(indexer, index_time, host_filter_expr)

def rebuild_hosts(self, indexer, index_time, host_filter_expr=None):
def rebuild_hosts(self, indexer, index_time):
"""sync hosts to elastic"""

alias = 'storage_host'
index = f'{alias}-{index_time}'
schema = ElasticHostSchema()
query = Host.query
if host_filter_expr is not None:
query = query.filter(host_filter_expr)

for host in windowed_query(query, Host.id):
data_id = md5(f'{host.address}'.encode()).hexdigest()
Expand All @@ -95,15 +85,13 @@ def rebuild_hosts(self, indexer, index_time, host_filter_expr=None):
indexer.flush()
indexer.update_alias(alias, index)

def rebuild_services(self, indexer, index_time, host_filter_expr=None):
def rebuild_services(self, indexer, index_time):
"""sync services to elastic"""

alias = 'storage_service'
index = f'{alias}-{index_time}'
schema = ElasticServiceSchema()
query = Service.query.outerjoin(Host)
if host_filter_expr is not None:
query = query.filter(host_filter_expr)

for service in windowed_query(query, Service.id):
data_id = md5(f'{service.host.address}|{service.proto}|{service.port}'.encode()).hexdigest()
Expand All @@ -117,15 +105,13 @@ def rebuild_services(self, indexer, index_time, host_filter_expr=None):
indexer.flush()
indexer.update_alias(alias, index)

def rebuild_notes(self, indexer, index_time, host_filter_expr=None):
def rebuild_notes(self, indexer, index_time):
"""sync notes to elastic"""

alias = 'storage_note'
index = f'{alias}-{index_time}'
schema = ElasticNoteSchema()
query = Note.query.outerjoin(Host, Note.host_id == Host.id).outerjoin(Service, Note.service_id == Service.id)
if host_filter_expr is not None:
query = query.filter(host_filter_expr)

for note in windowed_query(query, Note.id):
data_id = md5(
Expand Down
11 changes: 3 additions & 8 deletions sner/server/storage/vulnsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import requests
from cpe import CPE
from flask import current_app
from sqlalchemy import func, inspect
from sqlalchemy.dialects.postgresql import ARRAY, CIDR, insert as pg_insert
from sqlalchemy import inspect
from sqlalchemy.dialects.postgresql import insert as pg_insert

from sner.server.extensions import db
from sner.server.storage.elastic import BulkIndexer
Expand Down Expand Up @@ -168,7 +168,7 @@ def cvefor(self, cpe): # pragma: nocover ; mocked
current_app.logger.warning('cvesearch call failed')
return []

def rebuild_elastic(self, elastic_url, host_filter=None):
def rebuild_elastic(self, elastic_url):
"""
rebuilds vulnsearch elastic index from localdb
"""
Expand All @@ -180,14 +180,9 @@ def rebuild_elastic(self, elastic_url, host_filter=None):
if elastic_url
else None
)

esd_indexer.initialize(index)

query = Vulnsearch.query
if host_filter:
host_filter_expr = Vulnsearch.host_address.op('<<=')(func.any(func.cast(host_filter, ARRAY(CIDR)))) if host_filter else None
query = query.filter(host_filter_expr)

column_attrs = inspect(Vulnsearch).mapper.column_attrs
for item in windowed_query(query, Vulnsearch.id):
item = {c.key: getattr(item, c.key) for c in column_attrs}
Expand Down
19 changes: 0 additions & 19 deletions tests/server/storage/test_elasticstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,3 @@ def test_rebuild_elasticstorage(app, tmpworkdir, service, note): # pylint: disa

es_bulk_mock.assert_called()
update_alias_mock.assert_called()


def test_rebuild_elasticstorage_filter(app, host_factory, note_factory): # pylint: disable=unused-argument
"""test sync-storage command"""

host_factory.create(address='127.0.1.1')
host_factory.create(address='2001:db8::11')
host2 = host_factory.create(address='127.0.2.1')
note_factory.create(host=host2, xtype='filteredout')

indexer_mock = Mock()

patch_indexer = patch.object(sner.server.storage.elasticstorage, 'BulkIndexer', indexer_mock)
with patch_indexer:
ElasticStorageManager('https://dummy:80').rebuild(['127.0.1.0/24', '2001:db8::/48'])

indexed_hosts = [x[0][2]['host_address'] for x in indexer_mock.return_value.index.call_args_list]
assert '127.0.2.1' not in indexed_hosts
assert '2001:db8::11' in indexed_hosts
43 changes: 0 additions & 43 deletions tests/server/storage/test_vulnsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,6 @@ def test_rebuild_elastic(app, tmpworkdir, vulnsearch): # pylint: disable=unused
update_alias_mock.assert_called_once()


def test_rebuild_elastic_filter(app, host_factory, vulnsearch_factory): # pylint: disable=unused-argument
"""test vulnsearch rebuild_elastic with filter"""

host1 = host_factory.create(address='127.0.1.1')
vulnsearch_factory(
host_id=host1.id,
service_id=None,
host_address=host1.address,
host_hostname=host1.hostname,
service_proto=None,
service_port=None
)
host2 = host_factory.create(address='127.0.2.1')
vulnsearch_factory(
host_id=host2.id,
service_id=None,
host_address=host2.address,
host_hostname=host2.hostname,
service_proto=None,
service_port=None
)
host3 = host_factory.create(address='2001:db8::11')
vulnsearch_factory(
host_id=host3.id,
service_id=None,
host_address=host3.address,
host_hostname=host3.hostname,
service_proto=None,
service_port=None
)

indexer_mock = Mock()
patch_indexer = patch.object(sner.server.storage.vulnsearch, 'BulkIndexer', indexer_mock)

with patch_indexer:
vmgr = VulnsearchManager('https://dummy:80')
vmgr.rebuild_elastic('https://dummy:80', ['127.0.1.0/24', '2001:db8::/48'])

indexed_hosts = [x[0][2]['host_address'] for x in indexer_mock.return_value.index.call_args_list]
assert '127.0.2.1' not in indexed_hosts
assert '2001:db8::11' in indexed_hosts


def test_rebuild_localdb(app, note_factory): # pylint: disable=unused-argument
"""test rebuild localdb"""

Expand Down

0 comments on commit 46c6cad

Please sign in to comment.