diff --git a/conf/clusters/prod.json b/conf/clusters/prod.json index a8fc79813..3274e018e 100644 --- a/conf/clusters/prod.json +++ b/conf/clusters/prod.json @@ -1,5 +1,52 @@ { "id": "prod", + "data_sources": { + "kinesis": { + "prefix_cluster1_streamalert": [ + "cloudwatch", + "ghe", + "osquery" + ] + }, + "s3": { + "prefix.cluster.sample.bucket": [ + "cloudtrail", + "carbonblack", + "fleet" + ] + }, + "sns": { + "prefix_cluster_sample_topic": [ + "binaryalert" + ] + }, + "stream_alert_app": { + "prefix_cluster_box_admin_events_sm-app-name_app": [ + "box" + ], + "prefix_cluster_duo_admin_sm-app-name_app": [ + "duo" + ], + "prefix_cluster_duo_auth_sm-app-name_app": [ + "duo" + ], + "prefix_cluster_gsuite_admin_sm-app-name_app": [ + "gsuite" + ], + "prefix_cluster_onelogin-events-app-name_app": [ + "onelogin" + ], + "prefix_cluster_slack_access_sm-app-name_app": [ + "slack" + ], + "prefix_cluster_slack_integration_sm-app-name_app": [ + "slack" + ], + "prefix_cluster_aliyun_actiontrail_sm-app-name_app": [ + "aliyun" + ] + } + }, "modules": { "cloudwatch_monitoring": { "enabled": true, diff --git a/conf/sources.json b/conf/sources.json deleted file mode 100644 index 3c80b878f..000000000 --- a/conf/sources.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "kinesis": { - "prefix_cluster1_streamalert": { - "logs": [ - "cloudwatch", - "ghe", - "osquery" - ] - } - }, - "s3": { - "prefix.cluster.sample.bucket": { - "logs": [ - "cloudtrail", - "carbonblack", - "fleet" - ] - } - }, - "sns": { - "prefix_cluster_sample_topic": { - "logs": [ - "binaryalert" - ] - } - }, - "stream_alert_app": { - "prefix_cluster_box_admin_events_sm-app-name_app": { - "logs": [ - "box" - ] - }, - "prefix_cluster_duo_admin_sm-app-name_app": { - "logs": [ - "duo" - ] - }, - "prefix_cluster_duo_auth_sm-app-name_app": { - "logs": [ - "duo" - ] - }, - "prefix_cluster_gsuite_admin_sm-app-name_app": { - "logs": [ - "gsuite" - ] - }, - "prefix_cluster_onelogin-events-app-name_app": { - "logs": [ - "onelogin" - ] - }, - "prefix_cluster_slack_access_sm-app-name_app": { - "logs": [ - "slack" - ] - }, - "prefix_cluster_slack_integration_sm-app-name_app": { - "logs": [ - "slack" - ] - }, - "prefix_cluster_aliyun_actiontrail_sm-app-name_app": { - "logs": [ - "aliyun" - ] - } - } -} diff --git a/docs/source/app-configuration.rst b/docs/source/app-configuration.rst index 65ca3b8c8..dae6e7a08 100644 --- a/docs/source/app-configuration.rst +++ b/docs/source/app-configuration.rst @@ -135,7 +135,7 @@ Once the above is completed, a logger statement similar to the following will co StreamAlertCLI [INFO]: Successfully added 'duo_prod_collector' app integration to 'conf/clusters/prod.json' for service 'duo_auth'. -Your configuration files (``conf/clusters/.json`` and ``conf/sources.json``) have now been updated and are ready to be deployed. +Your configuration file ``conf/clusters/.json`` has now been updated and is ready to be deployed. 3. Deploy the new App and the Classifier ```````````````````````````````````````````` diff --git a/docs/source/conf-datasources.rst b/docs/source/conf-datasources.rst index 1f8188d58..074ba0c1b 100644 --- a/docs/source/conf-datasources.rst +++ b/docs/source/conf-datasources.rst @@ -1,14 +1,16 @@ Datasource Configuration ======================== +.. note:: As of release 3.0.0 data source configuration has moved + from sources.json into the ``data_sources`` top level key for each your clusters. -For background on supported datasource types, read `datasources `_. +For background on supported data source types, read `data sources `_. Overview -------- -Datasources defined in ``conf/sources.json`` control which datasources can send to and be analyzed by StreamAlert. +Data sources defined in each cluster file in the ``conf/clusters`` directory under the ``data_sources`` top level key control which data sources can send to and be analyzed by StreamAlert. -Each datasource (``kinesis``, ``s3``, or ``sns``) contains a mapping of specific resource names (kinesis stream names, s3 bucket IDs) along with a list of logs coming from that source. +Each data source (``kinesis``, ``s3``, or ``sns``) contains a mapping of specific resource names (kinesis stream names, s3 bucket IDs) along with a list of logs coming from that source. Log schemas are defined in one or more files in the ``conf/schemas`` directory. @@ -21,39 +23,31 @@ Example: .. code-block:: json { - "kinesis": { - "abc_corporate_streamalert": { - "logs": [ + "data_sources": { + "kinesis": { + "abc_corporate_streamalert": [ "box", "pan" - ] - }, - "abc_production_stream_streamalert": { - "logs": [ + ], + "abc_production_stream_streamalert": [ "inspec", "osquery" ] - } - }, - "s3": { - "abc.webserver.logs": { - "logs": [ - "nginx" - ] }, - "abc.hids.logs": { - "logs": [ + "s3": { + "abc.webserver.logs": [ + "nginx" + ], + "abc.hids.logs": [ "carbonblack" ] - } - }, - "sns": { - "abc_sns_topic": { - "logs": [ + }, + "sns": { + "abc_sns_topic": [ "logstash" ] } } } -Once datasources are defined, associated ``logs`` must have defined `schemas `_ +Once data sources are defined, associated ``logs`` must have defined `schemas `_ diff --git a/docs/source/firehose.rst b/docs/source/firehose.rst index 97e8d742d..c0e96eb5e 100644 --- a/docs/source/firehose.rst +++ b/docs/source/firehose.rst @@ -15,22 +15,20 @@ Configuration When enabling the Kinesis Firehose module, a dedicated Delivery Stream is created per each log type. -For example, if the ``sources.json`` defines the following: +For example, if the data_sources for a cluster named prod defined in ``conf/clusters/prod.json`` contains the following: .. code-block:: json { - "kinesis": { - "example_prod_streamalert": { - "logs": [ + "data_sources": { + "kinesis": { + "example_prod_streamalert": [ "cloudwatch", "osquery" ] - } - }, - "s3": { - "example.prod.streamalert.cloudtrail": { - "logs": [ + }, + "s3": { + "example.prod.streamalert.cloudtrail": [ "cloudtrail" ] } diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst index 6819d08b2..f2a1d7667 100644 --- a/docs/source/getting-started.rst +++ b/docs/source/getting-started.rst @@ -168,14 +168,14 @@ Open ``conf/clusters/prod.json`` and change the ``stream_alert`` module to look } 5. Tell StreamAlert which `log schemas `_ will be sent to this input. -Open ``conf/sources.json`` and change the ``sns`` section to look like this: +Open ``conf/clusters/prod.json`` and change the ``data_sources`` section to look like this: .. code-block:: json { - "sns": { - "streamalert-test-data": { - "logs": [ + "data_sources": { + "sns": { + "streamalert-test-data": [ "cloudwatch" ] } diff --git a/docs/source/rules.rst b/docs/source/rules.rst index 828bee868..ba3c1bacb 100644 --- a/docs/source/rules.rst +++ b/docs/source/rules.rst @@ -182,7 +182,8 @@ logs ``logs`` define the log schema(s) supported by the rule. -Log `sources `_ are defined in ``conf/sources.json`` and their `schemas `_ are defined in one or more files in the ``conf/schemas`` directory. +Log `sources `_ are defined under the ``data_sources`` field for a cluster defined in ``conf/clusters/.json`` +and their `schemas `_ are defined in one or more files in the ``conf/schemas`` directory. .. note:: Either ``logs`` or ``datatypes`` must be specified for each rule diff --git a/docs/source/testing.rst b/docs/source/testing.rst index fed467831..dc62272cb 100644 --- a/docs/source/testing.rst +++ b/docs/source/testing.rst @@ -117,7 +117,7 @@ Key Type Required Description This should be one of: ``kinesis``, ``s3``, ``sns``, or ``stream_alert_app``. ``source`` ``string`` Yes The name of the Kinesis Stream or S3 bucket, SNS topic or StreamAlert App function where the data originated from. This value should match a source - provided in ``conf/sources.json`` + provided in the ``data_sources`` field defined within a cluster in ``conf/clusters/.json`` ``trigger_rules`` ``list`` No A list of zero or more rule names that this test record should trigger. An empty list implies this record should not trigger any alerts ``validate_schema_only`` ``boolean`` No Whether or not the test record should go through the rule processing engine. diff --git a/streamalert/classifier/classifier.py b/streamalert/classifier/classifier.py index 4cd7df6f2..322bbea1b 100644 --- a/streamalert/classifier/classifier.py +++ b/streamalert/classifier/classifier.py @@ -14,6 +14,7 @@ limitations under the License. """ from collections import OrderedDict +import os import logging from streamalert.classifier.clients import FirehoseClient, SQSClient @@ -50,7 +51,7 @@ def __init__(self): # Setup the normalization logic Normalizer.load_from_config(self.config) - + self._cluster = os.environ['CLUSTER'] self._payloads = [] self._failed_record_count = 0 self._processed_size = 0 @@ -86,7 +87,8 @@ def _load_logs_for_resource(self, service, resource): bool: True if the resource's log sources loaded properly """ # Get all logs for the configured service/entity (s3, kinesis, or sns) - resources = self._config['sources'].get(service) + + resources = self._config['clusters'][self._cluster]['data_sources'].get(service) if not resources: LOGGER.error('Service [%s] not declared in sources configuration', service) return False @@ -103,7 +105,7 @@ def _load_logs_for_resource(self, service, resource): return OrderedDict( (source, self.config['logs'][source]) for source in self.config['logs'].keys() - if source.split(':')[0] in source_config['logs'] + if source.split(':')[0] in source_config ) @classmethod diff --git a/streamalert/shared/config.py b/streamalert/shared/config.py index 8e338557f..46d75d23d 100644 --- a/streamalert/shared/config.py +++ b/streamalert/shared/config.py @@ -21,6 +21,8 @@ LOGGER = get_logger(__name__) +SUPPORTED_SOURCES = {'kinesis', 's3', 'sns', 'stream_alert_app'} + class TopLevelConfigKeys: """Define the available top level keys in the loaded config""" @@ -31,7 +33,6 @@ class TopLevelConfigKeys: NORMALIZED_TYPES = 'normalized_types' OUTPUTS = 'outputs' SCHEMAS = 'schemas' - SOURCES = 'sources' THREAT_INTEL = 'threat_intel' LOOKUP_TABLES = 'lookup_tables' @@ -128,8 +129,7 @@ def load_config(conf_dir='conf/', exclude=None, include=None, validate=True): 'global': , 'lambda': , 'logs': , - 'outputs': , - 'sources': + 'outputs': } """ default_files = {file for file in os.listdir(conf_dir) if file.endswith('.json')} @@ -228,7 +228,7 @@ def _validate_config(config): Checks for `logs.json`: - each log has a schema and parser declared - Checks for `sources.json` + Checks for `cluster.json` data_sources: - the sources contains either kinesis or s3 keys - each sources has a list of logs declared @@ -242,32 +242,19 @@ def _validate_config(config): if TopLevelConfigKeys.LOGS in config: for log, attrs in config[TopLevelConfigKeys.LOGS].items(): if 'schema' not in attrs: - raise ConfigError('The \'schema\' is missing for {}'.format(log)) + raise ConfigError("The 'schema' is missing for {}".format(log)) if 'parser' not in attrs: - raise ConfigError('The \'parser\' is missing for {}'.format(log)) + raise ConfigError("The 'parser' is missing for {}".format(log)) # Check if the defined sources are supported and report any invalid entries - if TopLevelConfigKeys.SOURCES in config: - supported_sources = {'kinesis', 's3', 'sns', 'stream_alert_app'} - if not set(config[TopLevelConfigKeys.SOURCES]).issubset(supported_sources): - missing_sources = supported_sources - set(config[TopLevelConfigKeys.SOURCES]) - raise ConfigError( - 'The \'sources.json\' file contains invalid source entries: {}. ' - 'The following sources are supported: {}'.format( - ', '.join('\'{}\''.format(source) for source in missing_sources), - ', '.join('\'{}\''.format(source) for source in supported_sources) - ) - ) - - # Iterate over each defined source and make sure the required subkeys exist - for attrs in list(config[TopLevelConfigKeys.SOURCES].values()): - for entity, entity_attrs in attrs.items(): - if TopLevelConfigKeys.LOGS not in entity_attrs: - raise ConfigError('Missing \'logs\' key for entity: {}'.format(entity)) - - if not entity_attrs[TopLevelConfigKeys.LOGS]: - raise ConfigError('List of \'logs\' is empty for entity: {}'.format(entity)) + if TopLevelConfigKeys.CLUSTERS in config: + # Used to track duplicate sources in separate cluster config files + existing_sources = set() + for cluster_name, cluster_attrs in config[TopLevelConfigKeys.CLUSTERS].items(): + if 'data_sources' not in cluster_attrs: + raise ConfigError("'data_sources' missing for cluster {}".format(cluster_name)) + _validate_sources(cluster_name, cluster_attrs['data_sources'], existing_sources) if TopLevelConfigKeys.THREAT_INTEL in config: if TopLevelConfigKeys.NORMALIZED_TYPES not in config: @@ -284,8 +271,41 @@ def _validate_config(config): for log_keys in list(config[TopLevelConfigKeys.NORMALIZED_TYPES].values())): raise ConfigError( - 'IOC key \'{}\' within IOC type \'{}\' must be defined for at least ' - 'one log type in normalized types'.format(normalized_key, ioc_type) + "IOC key '{}' within IOC type '{}' must be defined for at least " + "one log type in normalized types".format(normalized_key, ioc_type) ) - # FIXME (derek.wang) write a configuration validator for lookuptables (new one) +def _validate_sources(cluster_name, data_sources, existing_sources): + """Validates the sources for a cluster + Args: + cluster_name (str): The name of the cluster we are validating sources for + data_sources (dict): The sources to validate + existing_sources(set): Aleady defined sources + Raises: + ConfigError: If the validation fails + """ + # Iterate over each defined source and make sure the required subkeys exist + if not set(data_sources).issubset(SUPPORTED_SOURCES): + invalid_sources = set(data_sources) - SUPPORTED_SOURCES + raise ConfigError( + 'The data sources for cluster {} contain invalid source entries: {}. ' + 'The following sources are supported: {}'.format( + cluster_name, + ', '.join("'{}'".format(source) for source in invalid_sources), + ', '.join("'{}'".format(source) for source in SUPPORTED_SOURCES) + ) + ) + for attrs in data_sources.values(): + for source, logs in attrs.items(): + + if not logs: + raise ConfigError("List of logs is empty for source: {}".format(source)) + + if source in existing_sources: + raise ConfigError( + "Duplicate data_source in cluster configuration {} " + "for cluster {}".format(source, cluster_name) + ) + existing_sources.add(source) + +# FIXME (derek.wang) write a configuration validator for lookuptables (new one) diff --git a/streamalert/shared/utils.py b/streamalert/shared/utils.py index d79923fef..67ca7dd47 100644 --- a/streamalert/shared/utils.py +++ b/streamalert/shared/utils.py @@ -10,7 +10,6 @@ LOGGER = get_logger(__name__) - def valid_ip(ip_address): """Verify that a ip_address string is valid diff --git a/streamalert_cli/config.py b/streamalert_cli/config.py index 89f60bc54..558612043 100644 --- a/streamalert_cli/config.py +++ b/streamalert_cli/config.py @@ -446,9 +446,11 @@ def add_app(self, func_name, app_info): # Add this service to the sources for this app integration # The `stream_alert_app` is purposely singular here - app_sources = self.config['sources'].get('stream_alert_app', {}) - app_sources[func_name] = {'logs': [app.service()]} - self.config['sources']['stream_alert_app'] = app_sources + app_sources = self.config['clusters'][cluster_name]['data_sources'].get( + 'stream_alert_app', {} + ) + app_sources[func_name] = [app.service()] + self.config['clusters'][cluster_name]['data_sources']['stream_alert_app'] = app_sources LOGGER.info('Successfully added \'%s\' app integration to \'conf/clusters/%s.json\' ' 'for service \'%s\'.', app_info['app_name'], app_info['cluster'], diff --git a/streamalert_cli/test/handler.py b/streamalert_cli/test/handler.py index b48543271..9b12ca043 100644 --- a/streamalert_cli/test/handler.py +++ b/streamalert_cli/test/handler.py @@ -229,7 +229,7 @@ def __init__(self, options, config): self._failed = 0 prefix = self._config['global']['account']['prefix'] env = { - 'CLUSTER': 'local-test', + 'CLUSTER': 'prod', 'STREAMALERT_PREFIX': prefix, 'AWS_ACCOUNT_ID': self._config['global']['account']['aws_account_id'], 'ALERTS_TABLE': '{}_streamalert_alerts'.format(prefix), @@ -349,6 +349,7 @@ def run(self): print('\nRunning tests for files found in: {}'.format(self._files_dir)) + for event_file in self._get_test_files(): test_event = TestEventFile(event_file.replace(self._files_dir, '')) # Iterate over the individual test events in the file @@ -359,6 +360,14 @@ def run(self): if not self._contains_filtered_rules(original_event): continue + resource = original_event['source'] + + for cluster_name, cluster_value in self._config['clusters'].items(): + for service in cluster_value['data_sources'].values(): + if resource in service: + os.environ['CLUSTER'] = cluster_name + break + classifier_result = self._run_classification(event) test_result = TestResult( diff --git a/tests/unit/conf/clusters/advanced.json b/tests/unit/conf/clusters/advanced.json index e8d8f4599..f05ebe68e 100644 --- a/tests/unit/conf/clusters/advanced.json +++ b/tests/unit/conf/clusters/advanced.json @@ -1,4 +1,5 @@ { + "data_sources": {}, "id": "advanced", "modules": { "cloudtrail": { diff --git a/tests/unit/conf/clusters/test.json b/tests/unit/conf/clusters/test.json index 5c341d292..9a138d50a 100644 --- a/tests/unit/conf/clusters/test.json +++ b/tests/unit/conf/clusters/test.json @@ -1,4 +1,34 @@ { + "data_sources": { + "kinesis": { + "test_cloudtrail_bucket": [ + "test_cloudtrail" + ], + "test_kinesis_stream": [ + "test_log_type_json", + "test_log_type_json_2", + "test_log_type_json_nested", + "test_log_type_json_nested_with_data", + "test_log_type_csv", + "test_log_type_csv_nested", + "test_log_type_kv_auditd", + "cloudwatch" + ], + "test_stream_2": [ + "test_multiple_schemas", + "test_log_type_json_2", + "test_log_type_json_nested_osquery", + "test_log_type_syslog" + ], + "test_stream_threat_intel": [ + "test_log_threat_intel_custom" + ], + "unit_test_default_stream": [ + "unit_test_simple_log", + "test_log_type_json_nested" + ] + } + }, "id": "test", "modules": { "cloudwatch_monitoring": { diff --git a/tests/unit/conf/clusters/trusted.json b/tests/unit/conf/clusters/trusted.json index ee455de77..4f0d100bc 100644 --- a/tests/unit/conf/clusters/trusted.json +++ b/tests/unit/conf/clusters/trusted.json @@ -1,4 +1,5 @@ { + "data_sources": {}, "id": "trusted", "modules": { "cloudwatch_monitoring": { diff --git a/tests/unit/conf/sources.json b/tests/unit/conf/sources.json deleted file mode 100644 index 18d316962..000000000 --- a/tests/unit/conf/sources.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "kinesis": { - "test_cloudtrail_bucket": { - "logs": [ - "test_cloudtrail" - ] - }, - "test_kinesis_stream": { - "logs": [ - "test_log_type_json", - "test_log_type_json_2", - "test_log_type_json_nested", - "test_log_type_json_nested_with_data", - "test_log_type_csv", - "test_log_type_csv_nested", - "test_log_type_kv_auditd", - "cloudwatch" - ] - }, - "test_stream_2": { - "logs": [ - "test_multiple_schemas", - "test_log_type_json_2", - "test_log_type_json_nested_osquery", - "test_log_type_syslog" - ] - }, - "test_stream_threat_intel": { - "logs": [ - "test_log_threat_intel_custom" - ] - }, - "unit_test_default_stream": { - "logs": [ - "unit_test_simple_log", - "test_log_type_json_nested" - ] - } - } -} \ No newline at end of file diff --git a/tests/unit/helpers/config.py b/tests/unit/helpers/config.py index e6c543c4a..d93244113 100644 --- a/tests/unit/helpers/config.py +++ b/tests/unit/helpers/config.py @@ -104,16 +104,6 @@ def basic_streamalert_config(): 'parser': 'csv' } }, - 'sources': { - 'kinesis': { - 'stream_1': { - 'logs': [ - 'json_log', - 'csv_log' - ] - } - } - }, 'lambda': { 'alert_merger_config': { 'memory': 128, @@ -171,6 +161,14 @@ def basic_streamalert_config(): 'clusters': { 'prod': { 'id': 'prod', + 'data_sources': { + 'kinesis': { + 'stream_1': [ + 'json_log', + 'csv_log' + ] + } + }, 'modules': { 'cloudwatch_monitoring': { 'enabled': True diff --git a/tests/unit/streamalert/classifier/test_classifier.py b/tests/unit/streamalert/classifier/test_classifier.py index e5cfb4610..d8f96c535 100644 --- a/tests/unit/streamalert/classifier/test_classifier.py +++ b/tests/unit/streamalert/classifier/test_classifier.py @@ -14,6 +14,7 @@ limitations under the License. """ from collections import OrderedDict +import os from mock import Mock, patch from nose.tools import assert_equal @@ -34,6 +35,7 @@ def setup(self): with patch.object(classifier_module, 'Normalizer'), \ patch.object(classifier_module, 'FirehoseClient'), \ patch.object(classifier_module, 'SQSClient'), \ + patch.dict(os.environ, {'CLUSTER': 'prod'}), \ patch('streamalert.classifier.classifier.config.load_config', Mock(return_value=self._mock_conf())): self._classifier = Classifier() @@ -47,7 +49,7 @@ def teardown(self): def _mock_conf(cls): return { 'logs': cls._mock_logs(), - 'sources': cls._mock_sources(), + 'clusters': {'prod': {'data_sources': cls._mock_sources()}}, 'global': cls._mock_global() } @@ -55,11 +57,9 @@ def _mock_conf(cls): def _mock_sources(cls): return { cls._service_name: { - cls._resource_name: { - 'logs': [ - 'log_type_01' - ] - } + cls._resource_name: [ + 'log_type_01' + ] } } diff --git a/tests/unit/streamalert/shared/test_config.py b/tests/unit/streamalert/shared/test_config.py index 893b65c10..ea6d9ffce 100644 --- a/tests/unit/streamalert/shared/test_config.py +++ b/tests/unit/streamalert/shared/test_config.py @@ -52,13 +52,12 @@ def setUp(self): config_data = basic_streamalert_config() # Add config files which should be loaded - self.fs.create_file('conf/clusters/prod.json', contents='{}') - self.fs.create_file('conf/clusters/dev.json', contents='{}') + self.fs.create_file('conf/clusters/prod.json', contents='{"data_sources": {}}') + self.fs.create_file('conf/clusters/dev.json', contents='{"data_sources": {}}') self.fs.create_file('conf/global.json', contents='{}') self.fs.create_file('conf/lambda.json', contents='{}') self.fs.create_file('conf/logs.json', contents='{}') self.fs.create_file('conf/outputs.json', contents='{}') - self.fs.create_file('conf/sources.json', contents='{}') self.fs.create_file( 'conf/threat_intel.json', contents=json.dumps(config_data['threat_intel']) @@ -72,13 +71,12 @@ def setUp(self): contents='{"csv_log2": {"schema": {"data": "string","uid": "integer"},"parser": "csv"}}' ) - # Create similar structure but with schemas folder instead of logs.json - self.fs.create_file('conf_schemas/clusters/prod.json', contents='{}') - self.fs.create_file('conf_schemas/clusters/dev.json', contents='{}') + # Create similar structure but with schemas folder instead of logs.json and 2 clusters. + self.fs.create_file('conf_schemas/clusters/prod.json', contents='{"data_sources": {}}') + self.fs.create_file('conf_schemas/clusters/dev.json', contents='{"data_sources": {}}') self.fs.create_file('conf_schemas/global.json', contents='{}') self.fs.create_file('conf_schemas/lambda.json', contents='{}') self.fs.create_file('conf_schemas/outputs.json', contents='{}') - self.fs.create_file('conf_schemas/sources.json', contents='{}') self.fs.create_file( 'conf_schemas/schemas/csv.json', contents='{"csv_log": {"schema": {"data": "string","uid": "integer"},"parser": "csv"}}' @@ -108,7 +106,6 @@ def test_load_all(): 'lambda', 'logs', 'outputs', - 'sources', 'threat_intel', 'normalized_types' } @@ -122,7 +119,6 @@ def test_load_exclude(): 'clusters', 'lambda', 'outputs', - 'sources', 'threat_intel', 'normalized_types' } @@ -137,7 +133,6 @@ def test_load_exclude_clusters(): 'lambda', 'logs', 'outputs', - 'sources', 'threat_intel', 'normalized_types' } @@ -152,7 +147,6 @@ def test_load_exclude_schemas(): 'global', 'lambda', 'outputs', - 'sources', } assert_equal(set(config), expected_keys) @@ -214,7 +208,7 @@ def test_config_no_logs_key(self): config = basic_streamalert_config() # Remove everything from the sources entry - config['sources']['kinesis']['stream_1'] = {} + config['clusters']['prod']['data_sources']['kinesis']['stream_1'] = {} assert_raises(ConfigError, _validate_config, config) @@ -224,7 +218,7 @@ def test_config_empty_logs_list(self): config = basic_streamalert_config() # Set the logs key to an empty list - config['sources']['kinesis']['stream_1']['logs'] = [] + config['clusters']['prod']['data_sources']['kinesis']['stream_1'] = [] assert_raises(ConfigError, _validate_config, config) @@ -234,7 +228,7 @@ def test_config_invalid_datasources(self): config = basic_streamalert_config() # Set the sources value to contain an invalid data source ('sqs') - config['sources'] = {'sqs': {'queue_1': {}}} + config['clusters']['prod']['data_sources'] = {'sqs': {'queue_1': {}}} assert_raises(ConfigError, _validate_config, config) @@ -276,3 +270,9 @@ def test_config_ioc_types_no_normalized_types(self): del config['normalized_types'] assert_raises(ConfigError, _validate_config, config) + + def test_config_duplicate_sources(self): + """Shared - Config Validator - Duplicate Data Sources in Cluster Configs""" + config = basic_streamalert_config() + config['clusters']['dev'] = config['clusters']['prod'] + assert_raises(ConfigError, _validate_config, config)