Skip to content

Commit

Permalink
Migrate sources to cluster (#1008)
Browse files Browse the repository at this point in the history
* Added data_sources to clusters

* Moved sources to cluster config

* Updated unit tests

* Changed CLUSTERS env var to actual cluster

* Updated tests, docs and references to sources TLK

* Docs update

* Python 2-3 compat issue

* Pylint fixes

* Realized validate sources is an internal function

* Added data_sources TLK to example json blocks

* Switched from using tuples

* Quote cleanup, moved more code into validate_sources

* More cleanup

* Get env variable at load time not runtime

* Fix cluster env errors, detect duplicate data sources

* Removed no-op i forgot about from testing

* Fixed docs errors, corrected dupe source checking

* Fixed confusion with missing sources versus invalid sources

* Fixed some namespacing and import order from rebase

* Fixed more namespacing

* Fixed global loading sources config in test classifier

* Added multi cluster support for testing

* Fixed some bad decisions

* Name changes

* Added brea for finding cluster in cli handler

* Wording cleanup, error on missind data_sources, other cleanup

* Fixed a missing ,
  • Loading branch information
blakemotl committed Oct 16, 2019
1 parent 07b6913 commit 15e06c1
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 217 deletions.
47 changes: 47 additions & 0 deletions conf/clusters/prod.json
@@ -1,5 +1,52 @@
{
"id": "prod",
"data_sources": {
"kinesis": {
"prefix_cluster1_streamalert": [
"cloudwatch",
"ghe",
"osquery"
]
},
"s3": {
"prefix.cluster.sample.bucket": [
"cloudtrail",
"carbonblack",
"fleet"
]
},
"sns": {
"prefix_cluster_sample_topic": [
"binaryalert"
]
},
"stream_alert_app": {
"prefix_cluster_box_admin_events_sm-app-name_app": [
"box"
],
"prefix_cluster_duo_admin_sm-app-name_app": [
"duo"
],
"prefix_cluster_duo_auth_sm-app-name_app": [
"duo"
],
"prefix_cluster_gsuite_admin_sm-app-name_app": [
"gsuite"
],
"prefix_cluster_onelogin-events-app-name_app": [
"onelogin"
],
"prefix_cluster_slack_access_sm-app-name_app": [
"slack"
],
"prefix_cluster_slack_integration_sm-app-name_app": [
"slack"
],
"prefix_cluster_aliyun_actiontrail_sm-app-name_app": [
"aliyun"
]
}
},
"modules": {
"cloudwatch_monitoring": {
"enabled": true,
Expand Down
69 changes: 0 additions & 69 deletions conf/sources.json

This file was deleted.

2 changes: 1 addition & 1 deletion docs/source/app-configuration.rst
Expand Up @@ -135,7 +135,7 @@ Once the above is completed, a logger statement similar to the following will co
StreamAlertCLI [INFO]: Successfully added 'duo_prod_collector' app integration to 'conf/clusters/prod.json' for service 'duo_auth'.
Your configuration files (``conf/clusters/<cluster>.json`` and ``conf/sources.json``) have now been updated and are ready to be deployed.
Your configuration file ``conf/clusters/<cluster>.json`` has now been updated and is ready to be deployed.
3. Deploy the new App and the Classifier
````````````````````````````````````````````
Expand Down
44 changes: 19 additions & 25 deletions docs/source/conf-datasources.rst
@@ -1,14 +1,16 @@
Datasource Configuration
========================
.. note:: As of release 3.0.0 data source configuration has moved
from sources.json into the ``data_sources`` top level key for each your clusters.

For background on supported datasource types, read `datasources <datasources.html>`_.
For background on supported data source types, read `data sources <datasources.html>`_.

Overview
--------

Datasources defined in ``conf/sources.json`` control which datasources can send to and be analyzed by StreamAlert.
Data sources defined in each cluster file in the ``conf/clusters`` directory under the ``data_sources`` top level key control which data sources can send to and be analyzed by StreamAlert.

Each datasource (``kinesis``, ``s3``, or ``sns``) contains a mapping of specific resource names (kinesis stream names, s3 bucket IDs) along with a list of logs coming from that source.
Each data source (``kinesis``, ``s3``, or ``sns``) contains a mapping of specific resource names (kinesis stream names, s3 bucket IDs) along with a list of logs coming from that source.

Log schemas are defined in one or more files in the ``conf/schemas`` directory.

Expand All @@ -21,39 +23,31 @@ Example:
.. code-block:: json
{
"kinesis": {
"abc_corporate_streamalert": {
"logs": [
"data_sources": {
"kinesis": {
"abc_corporate_streamalert": [
"box",
"pan"
]
},
"abc_production_stream_streamalert": {
"logs": [
],
"abc_production_stream_streamalert": [
"inspec",
"osquery"
]
}
},
"s3": {
"abc.webserver.logs": {
"logs": [
"nginx"
]
},
"abc.hids.logs": {
"logs": [
"s3": {
"abc.webserver.logs": [
"nginx"
],
"abc.hids.logs": [
"carbonblack"
]
}
},
"sns": {
"abc_sns_topic": {
"logs": [
},
"sns": {
"abc_sns_topic": [
"logstash"
]
}
}
}
Once datasources are defined, associated ``logs`` must have defined `schemas <conf-schemas.html>`_
Once data sources are defined, associated ``logs`` must have defined `schemas <conf-schemas.html>`_
16 changes: 7 additions & 9 deletions docs/source/firehose.rst
Expand Up @@ -15,22 +15,20 @@ Configuration

When enabling the Kinesis Firehose module, a dedicated Delivery Stream is created per each log type.

For example, if the ``sources.json`` defines the following:
For example, if the data_sources for a cluster named prod defined in ``conf/clusters/prod.json`` contains the following:

.. code-block:: json
{
"kinesis": {
"example_prod_streamalert": {
"logs": [
"data_sources": {
"kinesis": {
"example_prod_streamalert": [
"cloudwatch",
"osquery"
]
}
},
"s3": {
"example.prod.streamalert.cloudtrail": {
"logs": [
},
"s3": {
"example.prod.streamalert.cloudtrail": [
"cloudtrail"
]
}
Expand Down
8 changes: 4 additions & 4 deletions docs/source/getting-started.rst
Expand Up @@ -168,14 +168,14 @@ Open ``conf/clusters/prod.json`` and change the ``stream_alert`` module to look
}
5. Tell StreamAlert which `log schemas <conf-schemas.html>`_ will be sent to this input.
Open ``conf/sources.json`` and change the ``sns`` section to look like this:
Open ``conf/clusters/prod.json`` and change the ``data_sources`` section to look like this:

.. code-block:: json
{
"sns": {
"streamalert-test-data": {
"logs": [
"data_sources": {
"sns": {
"streamalert-test-data": [
"cloudwatch"
]
}
Expand Down
3 changes: 2 additions & 1 deletion docs/source/rules.rst
Expand Up @@ -182,7 +182,8 @@ logs

``logs`` define the log schema(s) supported by the rule.

Log `sources <conf-datasources.html>`_ are defined in ``conf/sources.json`` and their `schemas <conf-schemas.html>`_ are defined in one or more files in the ``conf/schemas`` directory.
Log `sources <conf-datasources.html>`_ are defined under the ``data_sources`` field for a cluster defined in ``conf/clusters/<cluster>.json``
and their `schemas <conf-schemas.html>`_ are defined in one or more files in the ``conf/schemas`` directory.

.. note:: Either ``logs`` or ``datatypes`` must be specified for each rule

Expand Down
2 changes: 1 addition & 1 deletion docs/source/testing.rst
Expand Up @@ -117,7 +117,7 @@ Key Type Required Description
This should be one of: ``kinesis``, ``s3``, ``sns``, or ``stream_alert_app``.
``source`` ``string`` Yes The name of the Kinesis Stream or S3 bucket, SNS topic or StreamAlert App
function where the data originated from. This value should match a source
provided in ``conf/sources.json``
provided in the ``data_sources`` field defined within a cluster in ``conf/clusters/<cluster>.json``
``trigger_rules`` ``list`` No A list of zero or more rule names that this test record should trigger.
An empty list implies this record should not trigger any alerts
``validate_schema_only`` ``boolean`` No Whether or not the test record should go through the rule processing engine.
Expand Down
8 changes: 5 additions & 3 deletions streamalert/classifier/classifier.py
Expand Up @@ -14,6 +14,7 @@
limitations under the License.
"""
from collections import OrderedDict
import os
import logging

from streamalert.classifier.clients import FirehoseClient, SQSClient
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self):

# Setup the normalization logic
Normalizer.load_from_config(self.config)

self._cluster = os.environ['CLUSTER']
self._payloads = []
self._failed_record_count = 0
self._processed_size = 0
Expand Down Expand Up @@ -86,7 +87,8 @@ def _load_logs_for_resource(self, service, resource):
bool: True if the resource's log sources loaded properly
"""
# Get all logs for the configured service/entity (s3, kinesis, or sns)
resources = self._config['sources'].get(service)

resources = self._config['clusters'][self._cluster]['data_sources'].get(service)
if not resources:
LOGGER.error('Service [%s] not declared in sources configuration', service)
return False
Expand All @@ -103,7 +105,7 @@ def _load_logs_for_resource(self, service, resource):
return OrderedDict(
(source, self.config['logs'][source])
for source in self.config['logs'].keys()
if source.split(':')[0] in source_config['logs']
if source.split(':')[0] in source_config
)

@classmethod
Expand Down

0 comments on commit 15e06c1

Please sign in to comment.