diff --git a/conf/lambda.json b/conf/lambda.json index 844802aea..b6bf7dc9a 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -50,6 +50,29 @@ "subnet_ids": [] } }, + "artifact_extractor_config": { + "concurrency_limit": 10, + "enabled": false, + "firehose_buffer_size": 128, + "firehose_buffer_interval": 900, + "log_level": "info", + "memory": 128, + "metric_alarms": { + "errors": { + "enabled": true, + "evaluation_periods": 1, + "period_secs": 300, + "threshold": 0 + }, + "throttles": { + "enabled": true, + "evaluation_periods": 1, + "period_secs": 300, + "threshold": 0 + } + }, + "timeout": 300 + }, "athena_partitioner_config": { "concurrency_limit": 10, "memory": 128, diff --git a/conf/schemas/carbonblack.json b/conf/schemas/carbonblack.json index c9d4e88de..4225f2457 100644 --- a/conf/schemas/carbonblack.json +++ b/conf/schemas/carbonblack.json @@ -1043,7 +1043,29 @@ "uid", "username", "sha256" - ] + ], + "normalization": { + "command": [ + { + "path": ["command_line"], + "function": "Command line" + } + ], + "path": [ + { + "path": ["path"], + "function": "Process path" + }, + { + "path": ["parent_path"], + "function": "Process parent path" + }, + { + "path": ["process_path"], + "function": "Process parent path" + } + ] + } } }, "carbonblack:ingress.event.regmod": { diff --git a/conf/schemas/cloudwatch.json b/conf/schemas/cloudwatch.json index 536581639..c08ae8c7f 100644 --- a/conf/schemas/cloudwatch.json +++ b/conf/schemas/cloudwatch.json @@ -88,7 +88,55 @@ "time": "string", "version": "string" }, - "parser": "json" + "parser": "json", + "configuration": { + "normalization": { + "event_name": ["detail", "eventName"], + "account": [ + { + "path": [ + "account" + ], + "function": "Destination account ID" + }, + { + "path": [ + "detail", + "userIdentity", + "principalId" + ], + "function": "Source account ID" + } + ], + "ip_address": [ + { + "path": [ + "detail", + "sourceIPAddress" + ], + "function": "Source IP addresses" + } + ], + "user_agent": [ + "detail", + "userAgent" + ], + "user_identity": [ + { + "path": ["detail", "userIdentity", "type"], + "function": "User identity type" + }, + { + "path": ["detail", "userIdentity", "arn"], + "function": "User identity arn" + }, + { + "path": ["detail", "userIdentity", "userName"], + "function": "User identity username" + } + ] + } + } }, "cloudwatch:flow_logs": { "schema": { diff --git a/conf/schemas/osquery.json b/conf/schemas/osquery.json index 5b4f8041e..637f426d4 100644 --- a/conf/schemas/osquery.json +++ b/conf/schemas/osquery.json @@ -48,7 +48,21 @@ "log_type", "logNumericsAsNumbers", "numerics" - ] + ], + "normalization": { + "command": [ + { + "path": ["columns", "command"], + "function": "Command line from shell history" + } + ], + "file_path": [ + { + "path": ["columns", "history_file"], + "function": "Shell history file path" + } + ] + } } }, "osquery:snapshot": { diff --git a/docs/images/artifacts.png b/docs/images/artifacts.png new file mode 100644 index 000000000..946d3d47b Binary files /dev/null and b/docs/images/artifacts.png differ diff --git a/docs/images/cloudwatch_events.png b/docs/images/cloudwatch_events.png new file mode 100644 index 000000000..62bfd21db Binary files /dev/null and b/docs/images/cloudwatch_events.png differ diff --git a/docs/images/normalization-arch.png b/docs/images/normalization-arch.png new file mode 100644 index 000000000..fbc7329e0 Binary files /dev/null and b/docs/images/normalization-arch.png differ diff --git a/docs/source/index.rst b/docs/source/index.rst index 45059978c..bd725c83a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -82,6 +82,7 @@ Table of Contents rule-promotion historical-search scheduled-queries + normalization conf-schemas-examples troubleshooting faq diff --git a/docs/source/normalization.rst b/docs/source/normalization.rst new file mode 100644 index 000000000..306aba856 --- /dev/null +++ b/docs/source/normalization.rst @@ -0,0 +1,350 @@ +############# +Normalization +############# + +StreamAlert has an unannounced feature Data Normalization. In its current implementation, it extracts recognized field names from classified records, and saves them to a top-level key on the same record. + +This is useful for rules, as they can be written to compare data fields against IoCs, such as IP Address, instead of writing one rule for each incoming data type. However, there are couple limitations we have identified as we use Normalization internally for a while. + +************************** +Normalization 2.0 (Reboot) +************************** + +In Normalization 2.0, we introduce a new lambda function ``Artifact Extractor`` by leveraging `Amazon Kinesis Data Firehose Data Transformation `_ feature to extract interesting artifacts from records processed by classifiers. The artifacts will be stored in the same S3 bucket where StreamAlert `Historical Search `_ feature uses and the Artifacts will be available for searching via Athena as well. + + +Artifacts Inventory +=================== + +An artifact is any field or subset of data within a record that bears meaning beyond the record itself, and is of interest in computer security. For example, a “carbonblack_version” would not be an artifact, as it is meaningless outside of the context of Carbon Black data. However, an ``ip_address`` would be an artifact. + +``Artifact Extractor`` Lambda function will build an artifacts inventory based on S3 and Athena services. It enables users to search for all artifacts across whole infrastructure from a single Athena table. + +Architecture +============ + +.. figure:: ../images/normalization-arch.png + :alt: Normalization V2 Architecture + :align: center + :target: _images/normalization-arch.png + + (click to enlarge) + +Configuration +============= +In Normalization v1, the normalized types are based on log source (e.g. ``osquery``, ``cloudwatch``, etc) and defined in ``conf/normalized_types.json`` file. + +In Normalization v2, the normalized types will be based on log type (e.g. ``osquery:differential``, ``cloudwatch:cloudtrail``, ``cloudwatch:events``, etc) and defined in ``conf/schemas/*.json``. Please note, ``conf/normalized_types.json`` will is deprecated. + +All normalized types are arbitrary, but only lower case alphabetic characters and underscores should be used for names in order to be compatible with Athena. + +Supported normalization configure syntax: + + .. code-block:: + + "cloudwatch:events": { + "schema": { + "field1": "string", + "field2": "string", + "field3": "string" + }, + "parser": "json", + "configuration": { + "normalization": { + "normalized_key_name1": [ + { + "path": ["path", "to", "original", "key"], + "function": "The purpose of normalized_key_name1", + "condition": { + "path": ["path", "to", "other", "key"], + "is|is_not|in|not_in|contains|not_contains": "string or a list" + } + } + ] + } + } + } + +* ``normalized_key_name1``: An arbitrary string to name the normalized key, e.g. ``ip_address``, ``hostname``, ``command`` etc. +* ``path``: A list contains a json path to the original key which will be normalized. +* ``function``: Describe the purpose of the normalizer. +* ``condition``: An optional block that is executed first. If the condition is not met, then this normalizer is skipped. + + * ``path``: A list contains a json path to the condition key. + * ``is|is_not|in|not_in|contains|not_contains``: Exactly one of these fields must be provided. This is the value that the conditional field that is compared against. E.g + + .. code-block:: + + "condition": { + "path": ["account"], + "is": "123456" + } + + "condition": { + "path": ["detail", "userIdentity", "userName"], + "in": ["root", "test_username"] + } + + .. note:: + + Use all lowercases string a list of strings in the conditional field. The value from the record will be converted to all lowercases. + +Below are some example configurations for normalization v2. + +* Normalize all ip addresses (``ip_address``) and user identities (``user_identity``) for ``cloudwatch:events`` logs + + ``conf/schemas/cloudwatch.json`` + + .. code-block:: + + "cloudwatch:events": { + "schema": { + "account": "string", + "detail": {}, + "detail-type": "string", + "id": "string", + "region": "string", + "resources": [], + "source": "string", + "time": "string", + "version": "string" + }, + "parser": "json", + "configuration": { + "normalization": { + "ip_address": [ + { + "path": [ + "detail", + "sourceIPAddress" + ], + "function": "Source IP addresses" + } + ], + "user_identity": [ + { + "path": ["detail", "userIdentity", "type"], + "function": "User identity type" + }, + { + "path": ["detail", "userIdentity", "arn"], + "function": "User identity arn" + }, + { + "path": ["detail", "userIdentity", "userName"], + "function": "User identity username" + } + ] + } + } + } + +* Normalize all commands (``command``) and file paths (``file_path``) for ``osquery:differential`` logs + + ``conf/schemas/osquery.json`` + + .. code-block:: + + "osquery:differential": { + "schema": { + "action": "string", + "calendarTime": "string", + "columns": {}, + "counter": "integer", + "decorations": {}, + "epoch": "integer", + "hostIdentifier": "string", + "log_type": "string", + "name": "string", + "unixTime": "integer", + "logNumericsAsNumbers": "string", + "numerics": "string" + }, + "parser": "json", + "configuration": { + "optional_top_level_keys": [ + "counter", + "decorations", + "epoch", + "log_type", + "logNumericsAsNumbers", + "numerics" + ], + "normalization": { + "command": [ + { + "path": ["columns", "command"], + "function": "Command line from shell history" + } + ], + "file_path": [ + { + "path": ["columns", "history_file"], + "function": "Shell history file path" + } + ] + } + } + } + +* Normalize username (``user_identity``) for ``cloudwatch:events`` logs when certain condition is met. In the following example, it will only normalize username related to AWS accounts ``11111111`` and ``22222222``. + + ``conf/schemas/cloudwatch.json`` + + .. code-block:: + + "cloudwatch:events": { + "schema": { + "account": "string", + "detail": {}, + "detail-type": "string", + "id": "string", + "region": "string", + "resources": [], + "source": "string", + "time": "string", + "version": "string" + }, + "parser": "json", + "configuration": { + "normalization": { + "user_identity": [ + { + "path": ["detail", "userIdentity", "userName"], + "function": "User identity username", + "condition": { + "path": ["account"], + "in": ["11111111", "22222222"] + } + } + ] + } + } + } + +Deployment +========== + +* Artifact Extractor will only work if Firehose and Historical Search are enabled in ``conf/global.json`` + + .. code-block:: + + "infrastructure": { + ... + "firehose": { + "use_prefix": true, + "buffer_interval": 60, + "buffer_size": 128, + "enabled": true, + "enabled_logs": { + "cloudwatch": {}, + "osquery": {} + } + } + ... + } + +* Enable Artifact Extractor feature in ``conf/lambda.json`` + + .. code-block:: + + "artifact_extractor_config": { + "concurrency_limit": 10, + "enabled": true, + ... + }, + +* Use StreamAlert cli to deploy Artifact Extractor Lambda function and new resources + + The deployment will add following resources. + + * A new Lambda function + * A new Glue catalog table ``artifacts`` for Historical Search via Athena + * A new Firehose to deliver artifacts to S3 bucket + * Update existing Firehose delivery streams to allow to invoke Artifact Extractor Lambda if it is enabled on the Firehose delivery streams + * New permissions, metrics and alarms. + + .. code-block:: bash + + python manage.py deploy --function artifact_extractor + +* Add other permissions to allow the Firehose delivery streams which have normalization configured to invoke Artifact Extractor lambda. + + We can just run a ``build`` to apply all the changes. + + .. code-block:: bash + + python manage.py build + + Or we can targeted apply the changes if we know which Firehose delivery streams having normalization configured. By default + + .. code-block:: bash + + python manage.py build --target kinesis_firehose_cloudwatch_events kinesis_firehose_osquery_differential kinesis_firehose_setup + +* If the normalization configuration has changed in ``conf/schemas/*.json``, make sure to deploy the classifier Lambda function as well + + .. code-block:: bash + + python manage.py deploy --function classifier + +Custom Metrics +============== + +Artifact Exactor comes with three custom metrics. + +#. ``ArtifactExtractor-ExtractedArtifacts``: Log the number of artifacts extracted from the records +#. ``ArtifactExtractor-FirehoseFailedRecords``: Log the number of records (artifacts) failed sent to Firehose +#. ``ArtifactExtractor-FirehoseRecordsSent``: Log the number of records (artifacts) sent to Firehose + +By default, the custom metrics is disabled. Enable custom metrics and follow by a ``build`` to create new ``aws_cloudwatch_log_metric_filter`` resources. + + .. code-block:: + + # conf/lambda.json + "artifact_extractor_config": { + "concurrency_limit": 10, + "enabled": true, + "enable_custom_metrics": true, + ... + } + + .. code-block:: + + python manage.py build --target "metric_filters_ArtifactExtractor_*" + + +Artifacts +========= + +Artifacts will be searchable within the Athena ``artifacts`` table while original logs are still searchable within dedicated table. + +Search ``cloudwatch:events`` logs: + +.. figure:: ../images/cloudwatch_events.png + :alt: Testing Results from cloudwatch_events Table + :align: center + :target: _images/cloudwatch_events.png + + (click to enlarge) + +All artifacts, including artifacts extracted from ``cloudwatch:events``, will live in ``artifacts`` table. + +.. figure:: ../images/artifacts.png + :alt: Artifacts from artifacts Table + :align: center + :target: _images/artifacts.png + + (click to enlarge) + +************** +Considerations +************** + +The Normalization Reboot will bring us good value in terms of how easy will be to search for artifacts across entire infrastructure in the organization. It will also make it possible to write more efficient scheduled queries to have correlated alerting in place. But, it is worth to mention that there may have some tradeoffs on requiring additional resources, adding additional data delay. + +#. Increase in Data Footprint: Each individual original record has the chance to add many artifacts. In practice, this will likely not be a huge issue as each artifact is very small and only contains few fields. + +#. Additional Delay: Firehose data transformation will add additional up to 900 seconds of delay on the data available for historical search. 900 seconds is a configurable setting on the Firehose where the artifacts extracted from. Reduce the firehose buffer_interval value if want to reduce delay. + +#. High memory usage: Artifact Extractor Lambda function may need at least 3x max(buffer size of firehoses where the artifacts extracted from). Because we are doing lots of data copy in Artifact Extractor lambda function. This may be improved by writing more efficient code in the Artifact Extractor Lambda function.. diff --git a/publishers/community/generic.py b/publishers/community/generic.py index 4d52eccc6..b388374ed 100644 --- a/publishers/community/generic.py +++ b/publishers/community/generic.py @@ -107,7 +107,7 @@ def remove_fields(alert, publication): @Register def remove_streamalert_normalization(_, publication): - """This publisher removes the super heavyweight 'streamalert:normalization' fields""" + """This publisher removes the super heavyweight 'streamalert_normalization' fields""" return _delete_dictionary_fields(publication, Normalizer.NORMALIZATION_KEY) diff --git a/rules/community/mitre_attack/defense_evasion/multi/obfuscated_files_or_information/right_to_left_character.py b/rules/community/mitre_attack/defense_evasion/multi/obfuscated_files_or_information/right_to_left_character.py index b373d908b..e25d07183 100644 --- a/rules/community/mitre_attack/defense_evasion/multi/obfuscated_files_or_information/right_to_left_character.py +++ b/rules/community/mitre_attack/defense_evasion/multi/obfuscated_files_or_information/right_to_left_character.py @@ -1,9 +1,9 @@ -"""Detection of the right to left override unicode character U+202E in filename or process name.""" +"""Detection of the right to left override unicode character U+202E in file_name or process name.""" from streamalert.shared.rule import rule from streamalert.shared.normalize import Normalizer -@rule(datatypes=['command', 'filePath', 'processPath', 'fileName']) +@rule(datatypes=['command', 'path', 'file_name']) def right_to_left_character(rec): """ author: @javutin @@ -27,17 +27,12 @@ def right_to_left_character(rec): if isinstance(command, str) and rlo in command: return True - file_paths = Normalizer.get_values_for_normalized_type(rec, 'filePath') - for file_path in file_paths: - if isinstance(file_path, str) and rlo in file_path: + paths = Normalizer.get_values_for_normalized_type(rec, 'path') + for path in paths: + if isinstance(path, str) and rlo in path: return True - process_paths = Normalizer.get_values_for_normalized_type(rec, 'processPath') - for process_path in process_paths: - if isinstance(process_path, str) and rlo in process_path: - return True - - file_names = Normalizer.get_values_for_normalized_type(rec, 'fileName') + file_names = Normalizer.get_values_for_normalized_type(rec, 'file_name') for file_name in file_names: if isinstance(file_name, str) and rlo in file_name: return True diff --git a/streamalert/artifact_extractor/__init__.py b/streamalert/artifact_extractor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/streamalert/artifact_extractor/artifact_extractor.py b/streamalert/artifact_extractor/artifact_extractor.py new file mode 100644 index 000000000..01415ed0d --- /dev/null +++ b/streamalert/artifact_extractor/artifact_extractor.py @@ -0,0 +1,284 @@ +""" +Copyright 2017-present Airbnb, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import base64 +import json +import re +from os import environ as env +import uuid + +from streamalert.shared.firehose import FirehoseClient +from streamalert.shared import ARTIFACT_EXTRACTOR_NAME, config +from streamalert.shared.metrics import MetricLogger +from streamalert.shared.normalize import Normalizer +from streamalert.shared.logger import get_logger + + +LOGGER = get_logger(__name__) + +RECORD_ID_KEY = 'streamalert_record_id' + +class Artifact: + """Encapsulation of a single Artifact that is extracted from an input record.""" + + def __init__(self, function, record_id, source_type, normalized_type, value): + """Create a new Artifact based on normalized information + + Args: + function (str): Describes how this field is used in the record, or what it means. + record_id (str): Currently it is reserved for future support. It will come from the + record processed by classifier. This field is very useful for cross reference back + to the original record in the future. It will be "None" if no "record_id" + information when searching artifacts in Athena. + source_type (str): The original source of the artifact(s) extracted from a record. + e.g. osquery_differential, cloudwatch_cloudtrail + normalized_type (str): Normalized types in a record processed by classifier. + value (str): This is the true value of the type. E.g, a record of type “ip_address” + could have a value of “50.50.50.50” + """ + # Enforce all fields are strings in a Artifact to prevent type corruption in Parquet format + self._function = str(function) + self._record_id = str(record_id) + self._source_type = str(source_type) + self._type = str(normalized_type) + self._value = str(value) + + @property + def record(self): + """Generate an artifact + + Returns: + dict: A dictionary contains artifact information. + """ + return { + 'function': self._function, + RECORD_ID_KEY: self._record_id, + 'source_type': self._source_type, + 'type': self._type, + 'value': self._value, + } + + +class FirehoseRecord: + """Encapsulation of single Firehose record and/or normalized artifacts""" + + def __init__(self, firehose_record, source_type): + """Create a new Firehose record contains original data and may extract multiple artifacts if + original data was normalized in the classifier. + The transformed data (insert a record_id only) which will be returned back to source + firehose for historical search. And the artifacts, if any, will be sent to a dedicated + firehose with simplified schema and land in streamalert data bucket for historical search + as well. + + Args: + firehose_record (dict): the record passed to lambda from source firehose. It has + following format, + + { + 'recordId': '12345678901230000000', + 'data': 'eyJyeXhpYXMiOiJZZXMiLCJtZXNzYWdlIjoiaGVsbG8gd29ybGQhIiwiZXZlbnRfZG==', + 'approximateArrivalTimestamp': 1583275634682 + } + + source_type (str): The original source of the artifact(s) extracted from a record. + e.g. osquery_differential, cloudwatch_cloudtrail + """ + self._firehose_record_id = firehose_record['recordId'] + self._firehose_data = firehose_record['data'] + self._decoded_record = json.loads(base64.b64decode(self._firehose_data)) + self._source_type = source_type + + @property + def artifacts(self): + """Extract all artifacts from a record + + Returns: + list: A list of Artifacts from a normalized record. + """ + artifacts = [] + + if not self._decoded_record.get(Normalizer.NORMALIZATION_KEY): + # Return an empty list if the record doesn't have normalization information. + return artifacts + + if not self._source_type: + # Return immediately if can not identify source_type. a.k.a do not extract artifacts. + return artifacts + + # + # normalized information in the record will be similar to + # { + # 'record': { + # 'region': 'us-east-1', + # 'detail': { + # 'awsRegion': 'us-west-2' + # } + # }, + # 'streamalert_normalization': { + # 'region': [ + # { + # 'values': ['region_name'], + # 'function': 'AWS region' + # }, + # { + # 'values': ['region_name'], + # 'function': 'AWS region' + # } + # ] + # } + # } + # + record_id = self._decoded_record.get(RECORD_ID_KEY) or str(uuid.uuid4()) + for key, values in self._decoded_record[Normalizer.NORMALIZATION_KEY].items(): + for value in values: + for val in value.get('values', []): + artifacts.append(Artifact( + function=value.get('function'), + record_id=record_id, + source_type=self._source_type, + normalized_type=key, + value=val + )) + + # Add a new key "streamalert_record_id" to "streamalert_normalization" field. This new key + # will be helpful tracing back to the original record when searching in "artifacts" table. + self._decoded_record[Normalizer.NORMALIZATION_KEY][RECORD_ID_KEY] = record_id + + return artifacts + + @property + def transformed_record(self): + """Create a transformed record with required fields. The transformed record will be sent + back to source Firehose and land in the S3 bucket for historical search. All transformed + records from Lambda must contain the following parameters, or Kinesis Data Firehose rejects + them and treats that as a data transformation failure. + https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + + recordId: The record ID is passed from source Firehose to Lambda during the invocation. The + transformed record must contain the same record ID. Any mismatch between the ID of the + original record and the ID of the transformed record is treated as a data transformation + failure. + + result: The status of the data transformation of the record. The possible values are: Ok, + Dropped, and ProcessingFailed. The purpose of ArtifactExtractor lambda is to extract + artifacts and it should not change the data. So the result will alway be 'Ok'. + + data: The transformed data payload, base64-encoded. The transformed data payload includes a + new key "streamalert_record_id" and it's the only difference from original data payload. + + Returns: + dict: A dictionary with required fields 'result', 'data' and 'recordId'. + """ + return { + 'recordId': self._firehose_record_id, + 'result': 'Ok', + 'data': base64.b64encode(self._json_serializer()).decode('utf-8') + } + + def _json_serializer(self): + """Serialize a transformed record to a JSON formatted string + + Returns: + str: a JSON formatted string with a newline appened. + """ + return (json.dumps(self._decoded_record, separators=(',', ':')) + '\n').encode('utf-8') + +class ArtifactExtractor: + """ArtifactExtractor class will extract normalized artifacts from batch of records from source + Firehose and return the original records back to Firehose where thoese records will be delivered + to S3 bucket for historical search. The artifacts will be sent a Firehose dedicated to artifacts + delivery to the same S3 bucket. + + The main purpose of this class is to build the artifacts inventory without interrupting current + historical search pipeline. So it will return the original records. + + To be noted here, most likely the ArtifactExtractor lambda function needs at least + 3 times of max(buffer size of firehoses where the artifacts extracted from). Because it has many + data copy actions. + """ + + STREAM_ARN_REGEX = re.compile(r".*streamalert_(?P.*)") + + _config = None + _firehose_client = None + + def __init__(self, region, src_firehose_arn): + self._region = region + self._src_firehose_arn = src_firehose_arn + self._dst_firehose_arn = env.get('DESTINATION_FIREHOSE_STREAM_NAME') + self._artifacts = list() + self._source_type = self._get_source_type() + + ArtifactExtractor._config = ArtifactExtractor._config or config.load_config(validate=True) + + ArtifactExtractor._firehose_client = ( + ArtifactExtractor._firehose_client or FirehoseClient.get_client( + prefix=self.config['global']['account']['prefix'], + artifact_extractor_config=self.config['lambda'].get('artifact_extractor_config', {}) + ) + ) + + @property + def config(self): + return ArtifactExtractor._config + + @property + def firehose(self): + return ArtifactExtractor._firehose_client + + def run(self, records): + LOGGER.debug('Extracting artifacts from %d %s logs', len(records), self._source_type) + + transformed_records = [] + for record in records: + # Extract artifacts, if any, and generate a transformed record with required parameters. + firehose_record = FirehoseRecord(record, self._source_type) + + for artifact in firehose_record.artifacts: + self._artifacts.append(artifact.record) + + transformed_records.append(firehose_record.transformed_record) + + LOGGER.debug('Extracted %d artifact(s)', len(self._artifacts)) + + MetricLogger.log_metric( + ARTIFACT_EXTRACTOR_NAME, + MetricLogger.EXTRACTED_ARTIFACTS, + len(self._artifacts) + ) + + self.firehose.send_artifacts(self._artifacts, self._dst_firehose_arn) + + return { + 'records': transformed_records + } + + def _get_source_type(self): + """Extract source type from source firehose arn which follows naming convention + *_streamalert_. The is the source type. + + Please note the log_name may be hashed out if the firehose stream name is too long, but it + is rare. + + Returns: + str: The original source of the artifact(s) extracted from a record, + e.g. osquery_differential, cloudwatch_cloudtrail + """ + match = self.STREAM_ARN_REGEX.search(self._src_firehose_arn) + if not match: + LOGGER.warning( + 'No valid source type found from firehose arn %s', self._src_firehose_arn + ) + # return early without result if source type is invalid + return + + return match.groups('source_type')[0] diff --git a/streamalert/artifact_extractor/main.py b/streamalert/artifact_extractor/main.py new file mode 100644 index 000000000..4f5fce05e --- /dev/null +++ b/streamalert/artifact_extractor/main.py @@ -0,0 +1,66 @@ +""" +Copyright 2017-present Airbnb, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from streamalert.artifact_extractor.artifact_extractor import ArtifactExtractor +from streamalert.shared.logger import get_logger + + +LOGGER = get_logger(__name__) + + +def handler(event, _): + """Main Lambda handler function for Artifact Extractor + + Args: + event (dict): This lambda function receives event like the following: + { + 'records': [ + { + 'recordId': '12345678901230000000', + 'data': 'eyJyeXhpYXMiOiJZZXMiLCJtZXNzYWdlIjoiaGVsbG8gd29ybGQhIiwiZXZlbnRfZG==', + 'approximateArrivalTimestamp': 1583275634682 + } + ], + 'region': 'us-east-1', + 'deliveryStreamArn': 'arn:aws:firehose:us-east-1:123456788901:deliverystream/aaaaa', + 'invocationId': '12345678-1234-5678-9000-124560291657' + } + + Returns: + dict: Return transformed records (although we don't transform the data) back is necessary + and firehose will deliver those records to S3 for historical search. + + The lambda handler is intended to return an event like this: + { + 'records': [ + { + 'result': 'Ok', + 'recordId': '12345678901230000000', + 'data': '{"blah":"blah"}' + } + ] + } + """ + try: + return ArtifactExtractor( + event['region'], + event['deliveryStreamArn'] + ).run(event.get('records', [])) + except Exception: + # FIXME: (Optional) Add retry for Timeout exceptions. If the Lambda function invocation + # fails because of a network timeout or the lambda invocation limit, Kinesis Data Firehose + # retries the invocation three times by default. If the invocation does not succeed, Kinesis + # Data Firehose then skips that batch of records. The skipped records are treated as + # unsuccessfully processed records. + # https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + LOGGER.exception('Invocation event: %s', event) + raise diff --git a/streamalert/classifier/classifier.py b/streamalert/classifier/classifier.py index a694259bc..f5ac9860d 100644 --- a/streamalert/classifier/classifier.py +++ b/streamalert/classifier/classifier.py @@ -17,7 +17,8 @@ import os import logging -from streamalert.classifier.clients import FirehoseClient, SQSClient +from streamalert.classifier.clients import SQSClient +from streamalert.shared.firehose import FirehoseClient from streamalert.classifier.parsers import get_parser from streamalert.classifier.payload.payload_base import StreamPayload from streamalert.shared import config, CLASSIFIER_FUNCTION_NAME as FUNCTION_NAME @@ -187,7 +188,13 @@ def _classify_payload(self, payload): self._log_bad_records(record, len(record.invalid_records)) for parsed_rec in record.parsed_records: - Normalizer.normalize(parsed_rec, record.log_type) + # + # In Normalization v1, the normalized types are defined based on log source + # (e.g. osquery, cloudwatch etc) and this will be deprecated. + # In Normalization v2, the normalized types are defined based on log type + # (e.g. osquery:differential, cloudwatch:cloudtrail, cloudwatch:events etc) + # + Normalizer.normalize(parsed_rec, record.log_schema_type) self._payloads.append(record) diff --git a/streamalert/classifier/clients/__init__.py b/streamalert/classifier/clients/__init__.py index 8f1e3347d..fe53c0ed2 100644 --- a/streamalert/classifier/clients/__init__.py +++ b/streamalert/classifier/clients/__init__.py @@ -1,3 +1,2 @@ """Imports from submodules to make higher up imports easier""" -from .firehose import FirehoseClient from .sqs import SQSClient diff --git a/streamalert/shared/__init__.py b/streamalert/shared/__init__.py index 00fdda2f2..14114911b 100644 --- a/streamalert/shared/__init__.py +++ b/streamalert/shared/__init__.py @@ -1,6 +1,7 @@ """Define some shared resources.""" ALERT_MERGER_NAME = 'alert_merger' ALERT_PROCESSOR_NAME = 'alert_processor' +ARTIFACT_EXTRACTOR_NAME = 'artifact_extractor' ATHENA_PARTITIONER_NAME = 'athena_partitioner' CLASSIFIER_FUNCTION_NAME = 'classifier' RULES_ENGINE_FUNCTION_NAME = 'rules_engine' diff --git a/streamalert/shared/config.py b/streamalert/shared/config.py index 8cc632264..f190702c3 100644 --- a/streamalert/shared/config.py +++ b/streamalert/shared/config.py @@ -443,3 +443,68 @@ def _validate_sources(cluster_name, data_sources, existing_sources): existing_sources.add(source) # FIXME (derek.wang) write a configuration validator for lookuptables (new one) + +def _artifact_extractor_enabled_helper(config, log_name): + """Validate if Artifactor Extractor enabled. + There are two cases need validate if Artifact Extractor enabled. + 1. For deploy Artifact Extractor Lambda function. + 2. To enable firehoses to invoke Artifact Extractor Lambda function. + + For case 1, Artifact Extractor Lambda function will be created and deployed if both + "artifact_extractor_config" in conf/lambda.json and "firehose" in conf/global.json both enabled. + + For case 2, in addition to above two conditions, a firehose will be setup to + "processing_configure" invoke Artifact Extractor lambda function if "normalization" is + configured in its schema configuration. + For example, the mapped firehose for following log type will setup: + { + "some_log_type": { + "schema": { + "key1": "string", + "key2": "string" + }, + "parser": "json", + "configuration": { + "normalization": { + "command": [ + "cmdline", + "command" + ] + } + } + } + } + + Args: + config (dict): The loaded config from the 'conf/' directory + log_name (string): expect to be original log names, e.g. 'aliyun', 'osquery:differential' + + Returns: + bool: For case 1, return True if both "artifact_extractor_config" in conf/lambda.json and + "firehose" in conf/global.json both enabled. + For case 2, return True in addition to have "normalization" configured in the log schema + configuration. + """ + if not config['lambda'].get('artifact_extractor_config', {}).get('enabled', False): + return False + + # Artifact extractor lambda is based on StreamAlert data Firehoses. Consider Artifact Extractor + # is enabled once when firehose is enabled + if not config['global']['infrastructure'].get('firehose', {}).get('enabled', False): + return False + + # if log_name is empty, it means caller only want to know if artifact extractor lambda + # function enabled or not, so return early. + if not log_name: + return True + + log_config = config.get('logs', {}).get(log_name, {}) + return 'normalization' in log_config.get('configuration', {}) + +def artifact_extractor_enabled_for_log(config, log_name): + """Validate if Artifact Extractor enabled for a log""" + return _artifact_extractor_enabled_helper(config, log_name=log_name) + +def artifact_extractor_enabled(config): + """Validate if Artifact Extractor enabled""" + return _artifact_extractor_enabled_helper(config, log_name=None) diff --git a/streamalert/classifier/clients/firehose.py b/streamalert/shared/firehose.py similarity index 82% rename from streamalert/classifier/clients/firehose.py rename to streamalert/shared/firehose.py index f6c3c2443..20b66d0bd 100644 --- a/streamalert/classifier/clients/firehose.py +++ b/streamalert/shared/firehose.py @@ -23,7 +23,7 @@ from botocore.exceptions import ClientError, HTTPClientError from botocore.exceptions import ConnectionError as BotocoreConnectionError -from streamalert.shared import CLASSIFIER_FUNCTION_NAME as FUNCTION_NAME +from streamalert.shared import ARTIFACT_EXTRACTOR_NAME, CLASSIFIER_FUNCTION_NAME import streamalert.shared.helpers.boto as boto_helpers from streamalert.shared.logger import get_logger from streamalert.shared.metrics import MetricLogger @@ -69,7 +69,7 @@ class FirehoseClient: FIREHOSE_NAME_MIN_HASH_LEN = 8 def __init__(self, prefix, firehose_config=None, log_sources=None): - self._prefix = prefix if firehose_config.get('use_prefix', True) else '' + self._prefix = prefix if firehose_config and firehose_config.get('use_prefix', True) else '' self._client = boto3.client('firehose', config=boto_helpers.default_config()) self.load_enabled_log_sources(firehose_config, log_sources, force_load=True) @@ -89,7 +89,7 @@ def _records_to_json_list(cls, records): ] @classmethod - def _record_batches(cls, records): + def _record_batches(cls, records, function_name): """Segment the records into batches that conform to Firehose restrictions This will log any single record that is too large to send, and skip it. @@ -116,7 +116,7 @@ def _record_batches(cls, records): if line_len > cls.MAX_RECORD_SIZE: LOGGER.error('Record too large (%d) to send to Firehose:\n%s', line_len, record) - cls._log_failed(1) + cls._log_failed(1, function_name) continue # Add the record to the batch @@ -206,7 +206,7 @@ def _categorize_records(self, payloads): return categorized_records @classmethod - def _finalize(cls, response, stream_name, size): + def _finalize(cls, response, stream_name, size, function_name): """Perform any final operations for this response, such as metric logging, etc Args: @@ -223,7 +223,7 @@ def _finalize(cls, response, stream_name, size): failed for failed in response['RequestResponses'] if failed.get('ErrorCode') ] - cls._log_failed(response['FailedPutCount']) + cls._log_failed(response['FailedPutCount'], function_name) # Only print the first 100 failed records to Cloudwatch logs LOGGER.error( @@ -233,7 +233,7 @@ def _finalize(cls, response, stream_name, size): ) return - MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.FIREHOSE_RECORDS_SENT, size) + MetricLogger.log_metric(function_name, MetricLogger.FIREHOSE_RECORDS_SENT, size) LOGGER.info( 'Successfully sent %d message(s) to firehose %s with RequestId \'%s\'', size, @@ -242,15 +242,15 @@ def _finalize(cls, response, stream_name, size): ) @classmethod - def _log_failed(cls, count): + def _log_failed(cls, count, function_name): """Helper to log the failed Firehose records metric Args: count (int): Number of failed records """ - MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.FIREHOSE_FAILED_RECORDS, count) + MetricLogger.log_metric(function_name, MetricLogger.FIREHOSE_FAILED_RECORDS, count) - def _send_batch(self, stream_name, record_batch): + def _send_batch(self, stream_name, record_batch, function_name): """Send record batches to Firehose Args: @@ -299,7 +299,7 @@ def _firehose_request_helper(data): LOGGER.exception('Firehose request failed') # Use the current length of the records_data in case some records were # successful but others were not - self._log_failed(len(records_data)) + self._log_failed(len(records_data), function_name) @classmethod def generate_firehose_name(cls, prefix, log_stream_name): @@ -334,6 +334,30 @@ def generate_firehose_name(cls, prefix, log_stream_name): base_name, hashlib.md5(stream_name.encode()).hexdigest() # nosec )[:cls.AWS_FIREHOSE_NAME_MAX_LEN] + @classmethod + def artifacts_firehose_stream_name(cls, config): + """Return Artifacts Firehose Stream Name + + Args: + config (dict): The loaded config from the 'conf/' directory + + Returns: + str: Artifacts Firehose Stream Name + """ + # support custom firehose stream name of Artifacts. User should make sure the length of + # the custom firehose name is no longer than 64 chars, otherwise the firehose will be + # failed to create. StreamAlert is not responsible for checking for custom firehose name + # since it should not change custom settings. + stream_name = config.get('lambda', {}).get( + 'artifact_extractor_config', {} + ).get('firehose_stream_name') + + return stream_name or cls.generate_firehose_name( + prefix=config['global']['account']['prefix'], + log_stream_name='artifacts' + ) + + @classmethod def enabled_log_source(cls, log_source_name): """Check that the incoming record is an enabled log source for Firehose @@ -414,6 +438,22 @@ def load_from_config(cls, prefix, firehose_config, log_sources): return return cls(prefix=prefix, firehose_config=firehose_config, log_sources=log_sources) + @classmethod + def get_client(cls, prefix, artifact_extractor_config): + """Get a Firehose client for sending artifacts + + Args: + prefix (str): Account prefix from global.json + artifact_extractor_config (dict): Loaded Artifact Extractor config from lambda.json + + Returns: + FirehoseClient or None: If disabled, this returns None, otherwise it returns an + instanec of FirehoseClient + """ + if not artifact_extractor_config.get('enabled'): + return + return cls(prefix=prefix) + def send(self, payloads): """Send all classified records to a respective Firehose Delivery Stream @@ -430,7 +470,33 @@ def send(self, payloads): formatted_stream_name = self.generate_firehose_name(self._prefix, log_type) # Process each record batch in the categorized payload set - for record_batch in self._record_batches(records): + for record_batch in self._record_batches(records, CLASSIFIER_FUNCTION_NAME): batch_size = len(record_batch) - response = self._send_batch(formatted_stream_name, record_batch) - self._finalize(response, formatted_stream_name, batch_size) + response = self._send_batch( + formatted_stream_name, + record_batch, + CLASSIFIER_FUNCTION_NAME + ) + + self._finalize( + response, + formatted_stream_name, + batch_size, + CLASSIFIER_FUNCTION_NAME + ) + + def send_artifacts(self, artifacts, stream_name): + """Send artifacts to artifacts Firehose delievery stream + Args: + artifacts (list(dict)): A list of artifacts extracted from normalized records. + stream_name (str): Stream name of destination Firehose. + """ + for artifact_batch in self._record_batches(artifacts, ARTIFACT_EXTRACTOR_NAME): + batch_size = len(artifact_batch) + response = self._send_batch(stream_name, artifact_batch, ARTIFACT_EXTRACTOR_NAME) + self._finalize( + response, + stream_name, + batch_size, + ARTIFACT_EXTRACTOR_NAME + ) diff --git a/streamalert/shared/metrics.py b/streamalert/shared/metrics.py index 65ea17b02..d6f9c6b56 100644 --- a/streamalert/shared/metrics.py +++ b/streamalert/shared/metrics.py @@ -18,6 +18,7 @@ from streamalert.shared import ( ALERT_MERGER_NAME, ALERT_PROCESSOR_NAME, + ARTIFACT_EXTRACTOR_NAME, ATHENA_PARTITIONER_NAME, CLASSIFIER_FUNCTION_NAME, RULES_ENGINE_FUNCTION_NAME @@ -33,6 +34,7 @@ # below when metrics are supported there FUNC_PREFIXES = { ALERT_MERGER_NAME: 'AlertMerger', + ARTIFACT_EXTRACTOR_NAME: 'ArtifactExtractor', CLASSIFIER_FUNCTION_NAME: 'Classifier', RULES_ENGINE_FUNCTION_NAME: 'RulesEngine' } @@ -75,6 +77,9 @@ class MetricLogger: # Alert Merger metric names ALERT_ATTEMPTS = 'AlertAttempts' + # Artifact Extractor metric names + EXTRACTED_ARTIFACTS = 'ExtractedArtifacts' + _default_filter = '{{ $.metric_name = "{}" }}' _default_value_lookup = '$.metric_value' @@ -89,6 +94,14 @@ class MetricLogger: ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup) }, ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics + ARTIFACT_EXTRACTOR_NAME: { + EXTRACTED_ARTIFACTS: (_default_filter.format(EXTRACTED_ARTIFACTS), + _default_value_lookup), + FIREHOSE_FAILED_RECORDS: (_default_filter.format(FIREHOSE_FAILED_RECORDS), + _default_value_lookup), + FIREHOSE_RECORDS_SENT: (_default_filter.format(FIREHOSE_RECORDS_SENT), + _default_value_lookup) + }, ATHENA_PARTITIONER_NAME: {}, # Placeholder for future athena processor metrics CLASSIFIER_FUNCTION_NAME: { FAILED_PARSES: (_default_filter.format(FAILED_PARSES), diff --git a/streamalert/shared/normalize.py b/streamalert/shared/normalize.py index c43544e31..7357e819c 100644 --- a/streamalert/shared/normalize.py +++ b/streamalert/shared/normalize.py @@ -13,22 +13,168 @@ See the License for the specific language governing permissions and limitations under the License. """ +from collections import defaultdict import logging +import itertools from streamalert.shared.config import TopLevelConfigKeys +from streamalert.shared.exceptions import ConfigError from streamalert.shared.logger import get_logger LOGGER = get_logger(__name__) LOGGER_DEBUG_ENABLED = LOGGER.isEnabledFor(logging.DEBUG) +CONST_FUNCTION = 'function' +CONST_PATH = 'path' +CONST_CONDITION = 'condition' +CONST_VALUES = 'values' + +class NormalizedType: + """The class encapsulates normalization information for each normalized type""" + + VALID_KEYS = {CONST_PATH, CONST_FUNCTION, CONST_CONDITION} + CONST_STR = 'str' + CONST_DICT = 'dict' + + def __init__(self, log_type, normalized_type, params): + """Init NormalizatedType + Args: + log_type (str): log type name, e.g. osquery:differential + normalized_type (str): Normalized type name defined in conf/, e.g. 'sourceAddress', + 'destination_ip' may be normalized to 'ip_address'. + params (list): a list of str or dict contains normalization configuration read from + conf/schemas/*.json. The params can be a list of str or a list of dict to specify + the path to the keys which will be normalized. + e.g. + ['path', 'to', 'the', 'key'] + or + [ + { + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + }, + { + 'path': ['path', 'to', 'the', 'key'], + 'function': 'destination ip address' + } + ] + """ + self._log_type = log_type + self._log_source = log_type.split(':')[0] + self._normalized_type = normalized_type + self._parsed_params = self._parse_params(params) + + def __eq__(self, other): + """Compare two NormalizedType instances and it is very helpful in unit test when use + assert_equal + """ + if not (self._log_type == other.log_type + and self._log_source == other.log_source + and self._normalized_type == other.normalized_type): + return False + + if len(self._parsed_params) != len(other.parsed_params): + return False + + for idx in range(len(self._parsed_params)): + if self._parsed_params[idx][CONST_PATH] == other.parsed_params[idx][CONST_PATH]: + continue + + return False + + return True + + @property + def log_type(self): + """Return the log type name, e.g. 'osquery:differential'""" + return self._log_type + + @property + def log_source(self): + """Return the log source name, e.g. 'osquery'""" + return self._log_source + + @property + def normalized_type(self): + """Return the normalized type, e.g. 'ip_address'""" + return self._normalized_type + + @property + def parsed_params(self): + """Return the normalization configuration which is a list of dict, e.g. + [ + { + 'path': ['path', 'to', 'the', 'key'], + 'function': None + } + ] + + or + [ + { + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + }, + { + 'path': ['path', 'to', 'the', 'destination', 'ip'], + 'function': 'destination ip address' + } + ] + """ + return self._parsed_params + + def _parse_params(self, params): + """Extract path and function information from params argument + + Args: + params (list): a list of str or dict contains normalization configuration. + """ + param_type = self._parse_param_type(params) + + if param_type == self.CONST_STR: + # Format params to include 'function' field which is set to None. + return [ + { + CONST_PATH: params, + CONST_FUNCTION: None + } + ] + + return params + + def _parse_param_type(self, params): + """Parse all param type in params + + Args: + params (list): a list of str or dict contains normalization configuration. + """ + if not isinstance(params, list): + raise ConfigError( + 'Unsupported params {} for normalization. Convert params to a list'.format(params) + ) + + if all(isinstance(param, str) for param in params): + return self.CONST_STR + + if all(isinstance(param, dict) and set(param.keys()).issubset(self.VALID_KEYS) + for param in params + ): + return self.CONST_DICT + + # FIXME: should we raise exception here? Or may just return False and log a warming message + raise ConfigError( + ('Unsupported type(s) used in {} or missing keys. Valid types are str or dict and ' + 'valid keys are {}').format(params, self.VALID_KEYS) + ) + class Normalizer: """Normalizer class to handle log key normalization in payloads""" - NORMALIZATION_KEY = 'streamalert:normalization' + NORMALIZATION_KEY = 'streamalert_normalization' - # Store the normalized CEF types mapping to original keys from the records + # Store the normalized types mapping to original keys from the records _types_config = dict() @classmethod @@ -43,64 +189,118 @@ def match_types(cls, record, normalized_types): dict: A dict of normalized keys with a list of values Example: - record={ - 'region': 'us-east-1', - 'detail': { - 'awsRegion': 'us-west-2' - } - } - normalized_types={ - 'region': ['region', 'awsRegion'] - } - - return={ - 'region': ['us-east-1', 'us-west-2'] + return + { + 'region': [ + { + 'values': ['us-east-1'] + 'function': 'AWS region' + }, + { + 'values': ['us-west-2'] + 'function': 'AWS region' + } + ] } """ - result = {} - for key, keys_to_normalize in normalized_types.items(): - values = set() - for value in cls._extract_values(record, set(keys_to_normalize)): - # Skip emtpy values - if value is None or value == '': - continue + results = {} + for type_name, type_info in normalized_types.items(): + result = list(cls._extract_values(record, type_info)) - values.add(value) + if result: + results[type_name] = result - if not values: - continue - - result[key] = sorted(values, key=str) + return results - return result + @classmethod + def _find_value(cls, record, path): + """Retrieve value from a record based on a json path""" + found_value = False + value = record + for key in path: + value = value.get(key) + if not value: + found_value = False + break + found_value = True + + if not found_value: + return False, None + + return True, value @classmethod - def _extract_values(cls, record, keys_to_normalize): + def _extract_values(cls, record, paths_to_normalize): """Recursively extract lists of path parts from a dictionary Args: record (dict): Parsed payload of log - keys_to_normalize (set): Normalized keys for which to extract paths + paths_to_normalize (set): Normalized keys for which to extract paths path (list=None): Parts of current path for which keys are being extracted Yields: - list: Parts of path in dictionary that contain normalized keys + dict: A dict contians the values of normalized types. For example, + { + 'values': ['1.1.1.2'] + 'function': 'Source ip address' + } """ - for key, value in record.items(): - if isinstance(value, dict): # If this is a dict, look for nested - for nested_value in cls._extract_values(value, keys_to_normalize): - yield nested_value + for param in paths_to_normalize.parsed_params: + if param.get(CONST_CONDITION) and not cls._match_condition(record, param['condition']): + # If optional 'condition' block is configured, it will only extract values if + # condition is matched. continue - if key not in keys_to_normalize: - continue + found_value, value = cls._find_value(record, param.get(CONST_PATH)) - if isinstance(value, list): # If this is a list of values, return all of them - for item in value: - yield item - continue + if found_value: + yield { + CONST_FUNCTION: param.get(CONST_FUNCTION) or None, + # if value not a list, it will be cast to a str even it is a dict or other + # types + CONST_VALUES: value if isinstance(value, list) else [str(value)] + } + + @classmethod + def _match_condition(cls, record, condition): + """Apply condition to a record before normalization kicked in. + + Returns: + bool: Return True if the value of the condition path matches to the condition, otherwise + return False. It is False if the path doesn't exist. + """ + if not condition.get('path'): + return False + + found_value, value = cls._find_value(record, condition['path']) + if not found_value: + return False + + # cast value to a str in all lowercases + value = str(value).lower() + + # Only support extract one condition. The result is not quaranteed if multiple conditions + # configured. + # FIXME: log a warning if more than one condition configured. + if condition.get('is'): + return value == condition['is'] + + if condition.get('is_not'): + return value != condition['is_not'] + + if condition.get('in'): + return value in condition['in'] + + if condition.get('not_in'): + return value not in condition['not_in'] + + if condition.get('contains'): + return condition['contains'] in value - yield value + if condition.get('not_contains'): + return condition['not_contains'] not in value + + return False @classmethod def normalize(cls, record, log_type): @@ -129,7 +329,10 @@ def get_values_for_normalized_type(cls, record, datatype): Returns: set: The values for the normalized type specified """ - return set(record.get(cls.NORMALIZATION_KEY, {}).get(datatype, set())) + normalization_results = record.get(cls.NORMALIZATION_KEY, {}).get(datatype) + if not normalization_results: + return + return set(itertools.chain(*[result.get(CONST_VALUES) for result in normalization_results])) @classmethod def load_from_config(cls, config): @@ -144,9 +347,63 @@ def load_from_config(cls, config): if cls._types_config: return cls # config is already populated - if TopLevelConfigKeys.NORMALIZED_TYPES not in config: - return cls # nothing to do - - cls._types_config = config[TopLevelConfigKeys.NORMALIZED_TYPES] + cls._types_config = cls._parse_normalization(config) return cls # there are no instance methods, so just return the class + + @classmethod + def _parse_normalization(cls, config): + """Load and parse normalization config from conf/schemas/*.json. Normalization will be + configured along with log schema and a path will be provided to find the original key. + + For example: conf/schemas/cloudwatch.json looks like + 'cloudwatch:events': { + 'schema': { + 'account': 'string', + 'source': 'string', + 'other_key': 'string' + }, + 'configuration': { + 'normalization': { + 'region': ['path', 'to', 'original', 'key'], + 'ip_address': [ + { + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + }, + { + 'path': ['path', 'to', 'original', 'key'], + 'function': 'destination ip address' + } + ] + } + } + } + + Args: + config (dict): Config read from 'conf/' directory + + Returns: + dict: return a dict contains normalization information per log type basis. + { + 'cloudwatch:events': { + 'region': NormalizedType(), + 'ip_address': NormalizedType() + } + } + """ + normalized_config = defaultdict(dict) + for log_type, val in config.get(TopLevelConfigKeys.LOGS, {}).items(): + result = defaultdict(dict) + + log_type_normalization = val.get('configuration', {}).get('normalization', {}) + + for normalized_type, params in log_type_normalization.items(): + # add normalization info if it is defined in log type configuration field + result[normalized_type] = NormalizedType(log_type, normalized_type, params) + + if result: + normalized_config[log_type] = result + + # return None is normalized_config is an empty defaultdict. + return normalized_config or None diff --git a/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/iam.tf b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/iam.tf new file mode 100644 index 000000000..b27b53a89 --- /dev/null +++ b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/iam.tf @@ -0,0 +1,107 @@ +// Allow the Artifact Extractor to write to the Firehose delivering Artifacts to S3 +resource "aws_iam_role_policy" "put_artifacts_firehose" { + name = "PutRecordsToArtifactsFirehose" + role = var.function_role_id + policy = data.aws_iam_policy_document.put_artifacts_firehose_policy.json +} + +data "aws_iam_policy_document" "put_artifacts_firehose_policy" { + statement { + effect = "Allow" + + actions = [ + "firehose:PutRecordBatch", + "firehose:DescribeDeliveryStream", + ] + + resources = [ + aws_kinesis_firehose_delivery_stream.streamalert_artifacts.arn + ] + } +} + +// IAM Role: Artifacts Firehose Delivery Stream permissions +resource "aws_iam_role" "streamalert_kinesis_firehose" { + name = "${var.prefix}_firehose_artifacts_delivery" + path = "/streamalert/" + assume_role_policy = data.aws_iam_policy_document.firehose_assume_role_policy.json + + tags = { + Name = "StreamAlert" + } +} + +// IAM Policy: Service AssumeRole +data "aws_iam_policy_document" "firehose_assume_role_policy" { + statement { + effect = "Allow" + actions = ["sts:AssumeRole"] + + principals { + type = "Service" + identifiers = ["firehose.amazonaws.com"] + } + } +} + +// IAM Policy: Write data to S3 +resource "aws_iam_role_policy" "streamalert_firehose_s3" { + name = "WriteArtifactsToS3" + role = aws_iam_role.streamalert_kinesis_firehose.id + policy = data.aws_iam_policy_document.firehose_s3.json +} + +// IAM Policy Document: Write data to S3 +data "aws_iam_policy_document" "firehose_s3" { + statement { + effect = "Allow" + + # Ref: http://amzn.to/2u5t0hS + actions = [ + "s3:AbortMultipartUpload", + "s3:GetBucketLocation", + "s3:GetObject", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:PutObject", + ] + + resources = [ + "arn:aws:s3:::${var.s3_bucket_name}", + "arn:aws:s3:::${var.s3_bucket_name}/*", + ] + } + + statement { + effect = "Allow" + + actions = [ + "kms:Encrypt", + "kms:Decrypt", + "kms:GenerateDataKey*", + ] + + resources = [var.kms_key_arn] + } +} + +// IAM Policy: Interact with the Glue Catalog +resource "aws_iam_role_policy" "streamalert_firehose_glue" { + name = "FirehoseReadGlueCatalog" + role = "${aws_iam_role.streamalert_kinesis_firehose.id}" + + policy = "${data.aws_iam_policy_document.firehose_glue_catalog.json}" +} + +// IAM Policy Document: Interact with the Glue Catalog +data "aws_iam_policy_document" "firehose_glue_catalog" { + statement { + effect = "Allow" + + actions = [ + "glue:GetTableVersions" + ] + + resources = ["*"] + } +} diff --git a/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/main.tf b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/main.tf new file mode 100644 index 000000000..45e5e8076 --- /dev/null +++ b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/main.tf @@ -0,0 +1,89 @@ +// AWS Firehose Stream dedicated to deliver Artifacts +// This firehose will only convert and save Artifacts in Parquet format in the S3 bucket to take the +// performance gain from Parquet format. +locals { + s3_path_prefix = "parquet/${var.glue_catalog_table_name}" +} + +locals { + data_location = "s3://${var.s3_bucket_name}/${local.s3_path_prefix}" +} + +resource "aws_kinesis_firehose_delivery_stream" "streamalert_artifacts" { + name = var.stream_name + destination = "extended_s3" + + // AWS Firehose Stream for Artifacts will only support Parquet format + extended_s3_configuration { + role_arn = aws_iam_role.streamalert_kinesis_firehose.arn + bucket_arn = "arn:aws:s3:::${var.s3_bucket_name}" + prefix = "${local.s3_path_prefix}/dt=!{timestamp:yyyy-MM-dd-HH}/" + error_output_prefix = "${local.s3_path_prefix}/!{firehose:error-output-type}/" + buffer_size = var.buffer_size + buffer_interval = var.buffer_interval + + # The S3 destination's compression format must be set to UNCOMPRESSED + # when data format conversion is enabled. + compression_format = "UNCOMPRESSED" + kms_key_arn = var.kms_key_arn + + data_format_conversion_configuration { + input_format_configuration { + deserializer { + # # more resilient with log schemas that have nested JSON comparing to hive_json_ser_de + open_x_json_ser_de {} + } + } + output_format_configuration { + serializer { + parquet_ser_de {} + } + } + schema_configuration { + database_name = aws_glue_catalog_table.artifacts.database_name + role_arn = aws_iam_role.streamalert_kinesis_firehose.arn + table_name = aws_glue_catalog_table.artifacts.name + } + } + } + + tags = { + Name = "StreamAlert" + } +} + +// Artifacts Athena table +resource "aws_glue_catalog_table" "artifacts" { + name = var.glue_catalog_table_name + database_name = var.glue_catalog_db_name + + table_type = "EXTERNAL_TABLE" + + partition_keys { + name = "dt" + type = "string" + } + + storage_descriptor { + location = local.data_location + input_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" + output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" + + ser_de_info { + name = "parque_ser_de" + serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" + parameters = { + ser_de_params_key = "serialization.format" + ser_de_params_value = "1" + } + } + + dynamic "columns" { + for_each = var.schema + content { + name = columns.value[0] + type = columns.value[1] + } + } + } +} diff --git a/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/variables.tf b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/variables.tf new file mode 100644 index 000000000..6457afe54 --- /dev/null +++ b/streamalert_cli/_infrastructure/modules/tf_artifact_extractor/variables.tf @@ -0,0 +1,56 @@ +variable "account_id" { + type = string +} + +variable "region" { + type = string +} + +variable "prefix" { + type = string +} + +variable "function_role_id" { + description = "Artifact Extractor function IAM Role ID, exported from the tf_lambda module" +} + +variable "function_alias_arn" { + type = string + description = "Fully qualified function arn of alias of Artifact extractor lambda" +} + +variable "glue_catalog_db_name" { + type = string + description = "Athena Database name" +} + +variable "glue_catalog_table_name" { + type = string + description = "Athena table name for Artifacts" +} + +variable "s3_bucket_name" { + type = string + description = "StreamAlert data bucket name" +} + +variable "stream_name" { + type = string + description = "Fully qualified name to use for delivery stream" +} + +variable "buffer_size" { + default = 5 +} + +variable "buffer_interval" { + default = 300 +} + +variable "kms_key_arn" { + type = string +} + +variable "schema" { + type = list(tuple([string, string])) +} diff --git a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/main.tf b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/main.tf index b1f2b53ca..4e4c3ce4b 100644 --- a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/main.tf +++ b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/main.tf @@ -78,6 +78,26 @@ resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" { table_name = var.glue_catalog_table_name } } + + + processing_configuration { + # only enabled when function_alias_arn (Artifact Extractor Lambda function) is not empty + enabled = var.function_alias_arn == "" ? false : true + + # processor block will only present if function_alias_arn is not empty + dynamic "processors" { + for_each = var.function_alias_arn == "" ? [] : [var.function_alias_arn] + + content { + type = "Lambda" + + parameters { + parameter_name = "LambdaArn" + parameter_value = var.function_alias_arn + } + } + } + } } } diff --git a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/variables.tf b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/variables.tf index 12e7729b3..0a9df7db8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/variables.tf +++ b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_delivery_stream/variables.tf @@ -66,3 +66,9 @@ variable "glue_catalog_table_name" { variable "schema" { type = list(tuple([string, string])) } + +variable "function_alias_arn" { + type = string + default = "" + description = "Fully qualified function arn of alias of Artifact extractor lambda" +} diff --git a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/iam.tf b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/iam.tf index 01c220b46..64b2b7b90 100644 --- a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/iam.tf +++ b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/iam.tf @@ -105,3 +105,30 @@ data "aws_iam_policy_document" "firehose_glue_catalog" { resources = ["*"] } } + +// IAM Policy: Invoke lambda function +resource "aws_iam_role_policy" "streamalert_firehose_lambda" { + count = var.artifact_extractor_enabled ? 1 : 0 + name = "streamalert_firehose_invoke_lambda" + role = "${aws_iam_role.streamalert_kinesis_firehose.id}" + + policy = "${data.aws_iam_policy_document.firehose_lambda[0].json}" +} + +// IAM Policy Document: Allow firehose to invoke artifact extractor lambda function +data "aws_iam_policy_document" "firehose_lambda" { + count = var.artifact_extractor_enabled ? 1 : 0 + + statement { + effect = "Allow" + + actions = [ + "lambda:InvokeFunction", + "lambda:GetFunctionConfiguration" + ] + + resources = [ + "${var.function_alias_arn}*" + ] + } +} diff --git a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/variables.tf b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/variables.tf index 4bf6ff0fb..e77cd07e8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/variables.tf +++ b/streamalert_cli/_infrastructure/modules/tf_kinesis_firehose_setup/variables.tf @@ -26,3 +26,15 @@ variable "s3_logging_bucket" { variable "kms_key_id" { type = string } + +variable "artifact_extractor_enabled" { + type = bool + default = false + description = "Is Artifact Extractor Lambda function enabled" +} + +variable "function_alias_arn" { + type = string + default = "" + description = "Fully qualified function arn of alias of Artifact extractor lambda" +} diff --git a/streamalert_cli/athena/handler.py b/streamalert_cli/athena/handler.py index 691798e36..f91083057 100644 --- a/streamalert_cli/athena/handler.py +++ b/streamalert_cli/athena/handler.py @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -from streamalert.classifier.clients import FirehoseClient +from streamalert.shared.firehose import FirehoseClient from streamalert.shared.utils import get_database_name, get_data_file_format from streamalert.shared.alert import Alert from streamalert.shared.athena import AthenaClient diff --git a/streamalert_cli/athena/helpers.py b/streamalert_cli/athena/helpers.py index 5c87937a6..9865a2863 100644 --- a/streamalert_cli/athena/helpers.py +++ b/streamalert_cli/athena/helpers.py @@ -15,7 +15,8 @@ """ import re -from streamalert.classifier.clients import FirehoseClient +from streamalert.artifact_extractor.artifact_extractor import Artifact +from streamalert.shared.firehose import FirehoseClient from streamalert.shared.logger import get_logger from streamalert.shared.alert import Alert from streamalert_cli.helpers import record_to_schema @@ -247,3 +248,21 @@ def generate_data_table_schema(config, table, schema_override=None): ) return format_schema_tf(athena_schema) + +def generate_artifacts_table_schema(): + """Generate the schema for artifacts table in terraform by using a test artifact instance + + Returns: + athena_schema (dict): Equivalent Athena schema used for generating create table statement + """ + artifact = artifact = Artifact( + normalized_type='test_normalized_type', + value='test_value', + source_type='test_source_type', + record_id='test_record_id', + function=None + ) + schema = record_to_schema(artifact.record) + athena_schema = logs_schema_to_athena_schema(schema, False) + + return format_schema_tf(athena_schema) diff --git a/streamalert_cli/manage_lambda/deploy.py b/streamalert_cli/manage_lambda/deploy.py index 5472efa8b..c74808145 100644 --- a/streamalert_cli/manage_lambda/deploy.py +++ b/streamalert_cli/manage_lambda/deploy.py @@ -183,6 +183,13 @@ def _lambda_terraform_targets(config, functions, clusters): }, 'enabled': True # required function }, + 'artifact_extractor': { + 'targets': { + 'module.artifact_extractor', + 'module.artifact_extractor_lambda' + }, + 'enabled': config['lambda'].get('artifact_extractor_config', {}).get('enabled', False) + }, 'athena': { 'targets': { 'module.athena_partitioner_iam', diff --git a/streamalert_cli/terraform/artifact_extractor.py b/streamalert_cli/terraform/artifact_extractor.py new file mode 100644 index 000000000..aa48ab350 --- /dev/null +++ b/streamalert_cli/terraform/artifact_extractor.py @@ -0,0 +1,70 @@ + +""" +Copyright 2017-present Airbnb, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from streamalert.shared import ARTIFACT_EXTRACTOR_NAME +from streamalert.shared.config import artifact_extractor_enabled, firehose_data_bucket +from streamalert.shared.firehose import FirehoseClient +from streamalert.shared.utils import get_database_name +from streamalert_cli.athena.helpers import generate_artifacts_table_schema +from streamalert_cli.terraform.common import infinitedict +from streamalert_cli.terraform.lambda_module import generate_lambda + +# FIXME: Should we provide custom artifacs table name? +DEFAULT_ARTIFACTS_TABLE_NAME = 'artifacts' + +def generate_artifact_extractor(config): + """Generate Terraform for the Artifact Extractor Lambda function + Args: + config (dict): The loaded config from the 'conf/' directory + Returns: + dict: Artifact Extractor Terraform definition to be marshaled to JSON + """ + result = infinitedict() + + if not artifact_extractor_enabled(config): + return + + ae_config = config['lambda']['artifact_extractor_config'] + stream_name = FirehoseClient.artifacts_firehose_stream_name(config) + + # Set variables for the artifact extractor module + result['module']['artifact_extractor'] = { + 'source': './modules/tf_artifact_extractor', + 'account_id': config['global']['account']['aws_account_id'], + 'prefix': config['global']['account']['prefix'], + 'region': config['global']['account']['region'], + 'function_role_id': '${module.artifact_extractor_lambda.role_id}', + 'function_alias_arn': '${module.artifact_extractor_lambda.function_alias_arn}', + 'glue_catalog_db_name': get_database_name(config), + 'glue_catalog_table_name': ae_config.get('table_name', DEFAULT_ARTIFACTS_TABLE_NAME), + 's3_bucket_name': firehose_data_bucket(config), + 'stream_name': stream_name, + 'buffer_size': ae_config.get('firehose_buffer_size', 128), + 'buffer_interval': ae_config.get('firehose_buffer_interval', 900), + 'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}', + 'schema': generate_artifacts_table_schema() + } + + # Set variables for the Lambda module + result['module']['artifact_extractor_lambda'] = generate_lambda( + '{}_streamalert_{}'.format(config['global']['account']['prefix'], ARTIFACT_EXTRACTOR_NAME), + 'streamalert.artifact_extractor.main.handler', + ae_config, + config, + # Only pass Firehose stream name. Firehose client will translate it to full ARN + environment={ + 'DESTINATION_FIREHOSE_STREAM_NAME': stream_name + } + ) + + return result diff --git a/streamalert_cli/terraform/firehose.py b/streamalert_cli/terraform/firehose.py index 95eac27bb..d210e52a2 100644 --- a/streamalert_cli/terraform/firehose.py +++ b/streamalert_cli/terraform/firehose.py @@ -13,8 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. """ -from streamalert.classifier.clients import FirehoseClient -from streamalert.shared.config import firehose_data_bucket +from streamalert.shared.firehose import FirehoseClient +from streamalert.shared.config import ( + artifact_extractor_enabled, + artifact_extractor_enabled_for_log, + firehose_data_bucket +) from streamalert.shared.utils import get_database_name, get_data_file_format from streamalert_cli.athena.helpers import generate_data_table_schema from streamalert_cli.terraform.common import monitoring_topic_arn @@ -49,6 +53,16 @@ def generate_firehose(logging_bucket, main_dict, config): 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}' } + # Only add allow firehose to invoke Artifact Extractor Lambda if Lambda if enabled in + # conf/lambda.json + if artifact_extractor_enabled(config): + main_dict['module']['kinesis_firehose_setup']['artifact_extractor_enabled'] = True + + # FIXME: change variable name to function arn + main_dict['module']['kinesis_firehose_setup']['function_alias_arn'] = ( + '${module.artifact_extractor_lambda.function_arn}' + ) + enabled_logs = FirehoseClient.load_enabled_log_sources( firehose_conf, config['logs'], @@ -109,4 +123,15 @@ def generate_firehose(logging_bucket, main_dict, config): else: module_dict['alarm_actions'] = [monitoring_topic_arn(config)] + # Only enable "processing_configuration" and pass Artifact Extractor Lambda function arn to + # a firehose if + # 1) lambda function is enabled in conf/lambda.json + # 2) "normalization" field is configured in the log schema settings in conf/schemas/*.json + # or conf/logs.json + if artifact_extractor_enabled_for_log(config, log_type_name): + # FIXME: change variable name to function_arn + module_dict['function_alias_arn'] = ( + '${module.artifact_extractor_lambda.function_alias_arn}' + ) + main_dict['module']['kinesis_firehose_{}'.format(log_stream_name)] = module_dict diff --git a/streamalert_cli/terraform/generate.py b/streamalert_cli/terraform/generate.py index 2a7900aac..ab2c21449 100644 --- a/streamalert_cli/terraform/generate.py +++ b/streamalert_cli/terraform/generate.py @@ -30,6 +30,7 @@ s3_access_logging_bucket, terraform_state_bucket, ) +from streamalert_cli.terraform.artifact_extractor import generate_artifact_extractor from streamalert_cli.terraform.alert_merger import generate_alert_merger from streamalert_cli.terraform.alert_processor import generate_alert_processor from streamalert_cli.terraform.apps import generate_apps @@ -63,7 +64,7 @@ RESTRICTED_CLUSTER_NAMES = ('main', 'athena') TERRAFORM_VERSION = '~> 0.12.9' -TERRAFORM_PROVIDER_VERSION = '~> 2.28.1' +TERRAFORM_PROVIDER_VERSION = '~> 2.48.0' LOGGER = get_logger(__name__) @@ -420,6 +421,16 @@ def terraform_generate_handler(config, init=False, check_tf=True, check_creds=Tr os.path.join(TERRAFORM_FILES_PATH, 'main.tf.json') ) + # Setup Artifact Extractor if it is enabled. + # artifact_extractor module is referenced in main.tf.json, so we need to generate it is tf file + # right after generating main.tf.json file for "manage.py destroy" command. + generate_global_lambda_settings( + config, + conf_name='artifact_extractor_config', + generate_func=generate_artifact_extractor, + tf_tmp_file_name='artifact_extractor' + ) + # Return early during the init process, clusters are not needed yet if init: return True diff --git a/streamalert_cli/utils.py b/streamalert_cli/utils.py index c2dd0e45d..a9540af74 100644 --- a/streamalert_cli/utils.py +++ b/streamalert_cli/utils.py @@ -46,6 +46,7 @@ def function_map(): 'alert': 'alert_processor', 'alert_merger': 'alert_merger', 'apps': None, # needs special handling + 'artifact_extractor': 'artifact_extractor', 'athena': 'athena_partitioner', 'classifier': None, # needs special handling 'rule': 'rules_engine', diff --git a/tests/unit/streamalert/artifact_extractor/__init__.py b/tests/unit/streamalert/artifact_extractor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/streamalert/artifact_extractor/helpers.py b/tests/unit/streamalert/artifact_extractor/helpers.py new file mode 100644 index 000000000..84d6d66be --- /dev/null +++ b/tests/unit/streamalert/artifact_extractor/helpers.py @@ -0,0 +1,116 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import json +import base64 + +from streamalert.shared.normalize import Normalizer + +MOCK_RECORD_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' + +def native_firehose_records(normalized=False, count=2): + """Generate sample firehose records for unit tests""" + json_data = [ + {'key_{}'.format(cnt): 'value_{}'.format(cnt)} for cnt in range(count) + ] + + if normalized: + for data in json_data: + data[Normalizer.NORMALIZATION_KEY] = { + 'normalized_type1': [ + { + 'values': ['value1'], + 'function': None + } + ], + 'normalized_type2': [ + { + 'values': ['value2', 'value3'], + 'function': None + } + ] + } + + return [ + { + 'recordId': 'record_id_{}'.format(cnt), + 'data': base64.b64encode( + (json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8') + ).decode('utf-8'), + 'approximateArrivalTimestamp': 1583275630000+int(cnt) + } for cnt in range(count) + ] + +def transformed_firehose_records(normalized=False, count=2): + """Generate sample transformed firehose records for unit tests""" + json_data = [ + {'key_{}'.format(cnt): 'value_{}'.format(cnt)} for cnt in range(count) + ] + + if normalized: + for data in json_data: + data[Normalizer.NORMALIZATION_KEY] = { + 'normalized_type1': [ + { + 'values': ['value1'], + 'function': None + } + ], + 'normalized_type2': [ + { + 'values': ['value2', 'value3'], + 'function': None + } + ], + 'streamalert_record_id': MOCK_RECORD_ID + } + + return { + 'records': [ + { + 'result': 'Ok', + 'data': base64.b64encode( + (json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8') + ).decode('utf-8'), + 'recordId': 'record_id_{}'.format(cnt) + } for cnt in range(count) + ] + } + +def generate_artifacts(): + """Generate sample artifacts for unit tests""" + + # These values are tight to the result of native_firehose_records() method + normalized_values = [ + ('normalized_type1', 'value1'), + ('normalized_type2', 'value2'), + ('normalized_type2', 'value3'), + ('normalized_type1', 'value1'), + ('normalized_type2', 'value2'), + ('normalized_type2', 'value3') + ] + artifacts = [ + { + 'function': 'None', + 'streamalert_record_id': MOCK_RECORD_ID, + 'source_type': 'unit_test', + 'type': type, + 'value': value + } for type, value in normalized_values + ] + + return [ + json.dumps(artifact, separators=(',', ':')) + '\n' for artifact in artifacts + ] diff --git a/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py b/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py new file mode 100644 index 000000000..2c7080a86 --- /dev/null +++ b/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py @@ -0,0 +1,107 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os + +from mock import call, patch +from nose.tools import assert_equal + +from streamalert.artifact_extractor.artifact_extractor import ( + Artifact, + ArtifactExtractor +) +from streamalert.shared.firehose import FirehoseClient +from tests.unit.streamalert.artifact_extractor.helpers import ( + native_firehose_records, + transformed_firehose_records, + generate_artifacts, + MOCK_RECORD_ID, +) + + +class TestArtifact: + """Test Artifact class""" + + def test_record(self): # pylint: disable=no-self-use + """Artifact - Test record property in the Artifact class""" + artifact = Artifact( + normalized_type='test_normalized_type', + value='test_value', + source_type='test_source_type', + record_id='test_record_id', + function=None + ) + expected_result = { + 'function': 'None', + 'streamalert_record_id': 'test_record_id', + 'source_type': 'test_source_type', + 'type': 'test_normalized_type', + 'value': 'test_value' + } + + assert_equal(artifact.record, expected_result) + + +class TestArtifactExtractor: + """Test ArtifactExtractor class """ + # pylint: disable=attribute-defined-outside-init,protected-access,no-self-use + + @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) + def setup(self): + """Setup before each method""" + with patch('boto3.client'): + ArtifactExtractor._firehose_client = FirehoseClient(prefix='unit-test') + + self._artifact_extractor = ArtifactExtractor( + 'us-east-1', 'prefix_streamalert_unit_test' + ) + + def teardown(self): + """Teardown after each method""" + ArtifactExtractor._firehose_client = None + + @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') + def test_run_zero_artifact(self, logger_mock): + """ArtifactExtractor - Test run method extract zero artifact""" + result = self._artifact_extractor.run(native_firehose_records()) + logger_mock.assert_has_calls([ + call.debug('Extracting artifacts from %d %s logs', 2, 'unit_test'), + call.debug('Extracted %d artifact(s)', 0) + ]) + + expected_result = transformed_firehose_records() + assert_equal(result, expected_result) + + @patch('uuid.uuid4') + @patch.object(FirehoseClient, '_send_batch') + @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') + def test_run(self, logger_mock, send_batch_mock, uuid_mock): + """ArtifactExtractor - Test run method extract artifacts""" + uuid_mock.return_value = MOCK_RECORD_ID + result = self._artifact_extractor.run(native_firehose_records(normalized=True)) + + logger_mock.assert_has_calls([ + call.debug('Extracting artifacts from %d %s logs', 2, 'unit_test'), + call.debug('Extracted %d artifact(s)', 6) + ]) + + send_batch_mock.assert_called_with( + 'unit_test_dst_fh_arn', + generate_artifacts(), + 'artifact_extractor' + ) + + expected_result = transformed_firehose_records(normalized=True) + assert_equal(result, expected_result) diff --git a/tests/unit/streamalert/artifact_extractor/test_main.py b/tests/unit/streamalert/artifact_extractor/test_main.py new file mode 100644 index 000000000..29ddaed4a --- /dev/null +++ b/tests/unit/streamalert/artifact_extractor/test_main.py @@ -0,0 +1,123 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os + +from mock import call, patch +from nose.tools import assert_equal + +# from streamalert.artifact_extractor.artifact_extractor import ArtifactExtractor +from streamalert.artifact_extractor.main import ArtifactExtractor, handler +from streamalert.shared.firehose import FirehoseClient + +from tests.unit.streamalert.artifact_extractor.helpers import ( + native_firehose_records, + transformed_firehose_records, + generate_artifacts, + MOCK_RECORD_ID, +) + + +class TestArtifactExtractorHandler: + """Test Artifact Extractor lambda function handler""" + # pylint: disable=attribute-defined-outside-init,protected-access,no-self-use + + @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) + def setup(self): + """Setup before each method""" + with patch('boto3.client'): + ArtifactExtractor._firehose_client = FirehoseClient(prefix='unit-test') + + self._artifact_extractor = ArtifactExtractor( + 'us-east-1', 'prefix_streamalert_unit_test' + ) + + def teardown(self): + """Teardown after each method""" + ArtifactExtractor._firehose_client = None + + @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) + @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') + def test_handler_zero_artifact(self, logger_mock): + """ArtifactExtractor - Test handler extracts zero artifact""" + event = { + 'records': native_firehose_records(), + 'region': 'us-east-1', + 'deliveryStreamArn': ( + 'arn:aws:firehose:us-east-1:123456788901:prefix_streamalert_unit_test' + ), + 'invocationId': '12345678-1234-5678-9000-124560291657' + } + result = handler(event, 'bala') + + logger_mock.assert_has_calls([ + call.debug('Extracting artifacts from %d %s logs', 2, 'unit_test'), + call.debug('Extracted %d artifact(s)', 0) + ]) + + expected_result = transformed_firehose_records() + assert_equal(result, expected_result) + + @patch('uuid.uuid4') + @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) + @patch.object(FirehoseClient, '_send_batch') + @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') + def test_handler(self, logger_mock, send_batch_mock, uuid_mock): + """ArtifactExtractor - Test handler""" + uuid_mock.return_value = MOCK_RECORD_ID + event = { + 'records': native_firehose_records(normalized=True), + 'region': 'us-east-1', + 'deliveryStreamArn': ( + 'arn:aws:firehose:us-east-1:123456788901:prefix_streamalert_unit_test' + ), + 'invocationId': '12345678-1234-5678-9000-124560291657' + } + result = handler(event, 'bala') + + logger_mock.assert_has_calls([ + call.debug('Extracting artifacts from %d %s logs', 2, 'unit_test'), + call.debug('Extracted %d artifact(s)', 6) + ]) + + send_batch_mock.assert_called_with( + 'unit_test_dst_fh_arn', + generate_artifacts(), + 'artifact_extractor' + ) + + expected_result = transformed_firehose_records(normalized=True) + assert_equal(result, expected_result) + + @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) + @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') + def test_handler_invalid_source_type(self, logger_mock): + """ArtifactExtractor - Test handler with invalid source type from firehose arn""" + event = { + 'records': native_firehose_records(), + 'region': 'us-east-1', + 'deliveryStreamArn': ( + 'arn:aws:firehose:us-east-1:123456788901:firehose-deliverystream' + ), + 'invocationId': '12345678-1234-5678-9000-124560291657' + } + handler(event, 'bala') + + logger_mock.assert_has_calls([ + call.warning( + 'No valid source type found from firehose arn %s', + 'arn:aws:firehose:us-east-1:123456788901:firehose-deliverystream' + ) + ]) diff --git a/tests/unit/streamalert/classifier/test_classifier.py b/tests/unit/streamalert/classifier/test_classifier.py index 0d0e1ae10..5c2df491a 100644 --- a/tests/unit/streamalert/classifier/test_classifier.py +++ b/tests/unit/streamalert/classifier/test_classifier.py @@ -246,7 +246,7 @@ def test_classify_payload(self, process_mock): ]) ) normalizer_mock.normalize.assert_called_with( - payload_record.parsed_records[-1], 'foo' + payload_record.parsed_records[-1], 'foo:bar' ) assert_equal(self._classifier._payloads, [payload_record]) log_mock.assert_called_with(payload_record, 1) diff --git a/tests/unit/streamalert/rules_engine/test_threat_intel.py b/tests/unit/streamalert/rules_engine/test_threat_intel.py index 3318dbcb9..91f4c44ee 100644 --- a/tests/unit/streamalert/rules_engine/test_threat_intel.py +++ b/tests/unit/streamalert/rules_engine/test_threat_intel.py @@ -85,7 +85,7 @@ def _sample_payload(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} } @@ -118,7 +118,7 @@ def test_threat_detection(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} }, @@ -149,7 +149,7 @@ def test_threat_detection_no_iocs(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} } diff --git a/tests/unit/streamalert/shared/test_config.py b/tests/unit/streamalert/shared/test_config.py index 7df73de53..e0d3cf36e 100644 --- a/tests/unit/streamalert/shared/test_config.py +++ b/tests/unit/streamalert/shared/test_config.py @@ -19,11 +19,15 @@ from nose.tools import ( assert_equal, assert_count_equal, + assert_false, assert_raises, + assert_true, ) from pyfakefs import fake_filesystem_unittest from streamalert.shared.config import ( + artifact_extractor_enabled, + artifact_extractor_enabled_for_log, _validate_config, load_config, parse_lambda_arn, @@ -292,3 +296,53 @@ def test_config_duplicate_sources(self): config = basic_streamalert_config() config['clusters']['dev'] = config['clusters']['prod'] assert_raises(ConfigError, _validate_config, config) + + +class TestConfigArtifactExtractor(): + """Shared - Test Artifact Extractor configuration with mocked config files""" + + def __init__(self): + self.default_conf_data = {} + + def setup(self): + self.default_conf_data = { + 'global': { + 'infrastructure': { + 'firehose': { + 'enabled': False, + 'enabled_logs': {} + } + } + }, + 'lambda': { + 'artifact_extractor_config': { + 'enabled': False + } + }, + 'logs': { + 'test_log:type_1': { + 'schema': {}, + 'configuration': { + 'normalization': {} + } + }, + 'test_log:type_2': { + 'schema': {}, + } + } + } + + def test_artifact_extractor_disabled_by_default(self): + """Shared - artifact extractor is disabled with default config""" + assert_false(artifact_extractor_enabled(self.default_conf_data)) + + + def test_artifact_extractor(self): + """Shared - test artifact_extractor_enabled helper""" + self.default_conf_data['lambda']['artifact_extractor_config']['enabled'] = True + assert_false(artifact_extractor_enabled(self.default_conf_data)) + + self.default_conf_data['global']['infrastructure']['firehose']['enabled'] = True + assert_true(artifact_extractor_enabled(self.default_conf_data)) + assert_true(artifact_extractor_enabled_for_log(self.default_conf_data, 'test_log:type_1')) + assert_false(artifact_extractor_enabled_for_log(self.default_conf_data, 'test_log:type_2')) diff --git a/tests/unit/streamalert/classifier/clients/test_firehose.py b/tests/unit/streamalert/shared/test_firehose.py similarity index 90% rename from tests/unit/streamalert/classifier/clients/test_firehose.py rename to tests/unit/streamalert/shared/test_firehose.py index 2b551f799..4816ad041 100644 --- a/tests/unit/streamalert/classifier/clients/test_firehose.py +++ b/tests/unit/streamalert/shared/test_firehose.py @@ -17,7 +17,7 @@ from mock import Mock, patch from nose.tools import assert_equal -from streamalert.classifier.clients.firehose import FirehoseClient +from streamalert.shared.firehose import FirehoseClient class TestFirehoseClient: @@ -97,7 +97,7 @@ def test_record_batches(self): ] ] - result = list(FirehoseClient._record_batches(records)) + result = list(FirehoseClient._record_batches(records, 'test_function_name')) assert_equal(result, expected_result) @patch.object(FirehoseClient, '_log_failed') @@ -107,15 +107,15 @@ def test_record_batches_rec_too_large(self, failure_mock): {'key': 'test' * 1000 * 1000} ] - result = list(FirehoseClient._record_batches(records)) + result = list(FirehoseClient._record_batches(records, 'test_function_name')) assert_equal(result, []) - failure_mock.assert_called_with(1) + failure_mock.assert_called_with(1, 'test_function_name') def test_record_batches_max_batch_count(self): """FirehoseClient - Record Batches, Max Batch Count""" records = self._sample_raw_records(count=501) - result = list(FirehoseClient._record_batches(records)) + result = list(FirehoseClient._record_batches(records, 'test_function_name')) assert_equal(len(result), 2) assert_equal(len(result[0]), 500) assert_equal(len(result[1]), 1) @@ -126,7 +126,7 @@ def test_record_batches_max_batch_size(self): {'key_{}'.format(i): 'test' * 100000} for i in range(10) ] - result = list(FirehoseClient._record_batches(records)) + result = list(FirehoseClient._record_batches(records, 'test_function_name')) assert_equal(len(result), 2) assert_equal(len(result[0]), 9) assert_equal(len(result[1]), 1) @@ -229,8 +229,8 @@ def test_finalize_failures(self, failure_mock): ] } - FirehoseClient._finalize(response, 'stream_name', 3) - failure_mock.assert_called_with(1) + FirehoseClient._finalize(response, 'stream_name', 3, 'test_function_name') + failure_mock.assert_called_with(1, 'test_function_name') @patch('logging.Logger.info') def test_finalize_success(self, log_mock): @@ -244,7 +244,7 @@ def test_finalize_success(self, log_mock): } } - FirehoseClient._finalize(response, stream_name, count) + FirehoseClient._finalize(response, stream_name, count, 'test_function_name') log_mock.assert_called_with( 'Successfully sent %d message(s) to firehose %s with RequestId \'%s\'', count, @@ -280,7 +280,7 @@ def test_send_batch(self): } ] - self._client._send_batch(stream_name, records) + self._client._send_batch(stream_name, records, 'test_function_name') boto_mock.put_record_batch.assert_called_with( DeliveryStreamName=stream_name, @@ -296,7 +296,7 @@ def test_send_batch_error(self, log_mock): error = ClientError({'Error': {'Code': 10}}, 'InvalidRequestException') boto_mock.put_record_batch.side_effect = error - self._client._send_batch(stream_name, ['data']) + self._client._send_batch(stream_name, ['data'], 'test_function_name') log_mock.assert_called_with('Firehose request failed') @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock): ] self._client.send(self._sample_payloads) send_batch_mock.assert_called_with( - 'unit_test_streamalert_log_type_01_sub_type_01', expected_batch + 'unit_test_streamalert_log_type_01_sub_type_01', expected_batch, 'classifier' ) @patch.object(FirehoseClient, '_send_batch') @@ -434,7 +434,7 @@ def test_send_no_prefixing(self, send_batch_mock): client.send(self._sample_payloads) send_batch_mock.assert_called_with( - 'streamalert_log_type_01_sub_type_01', expected_batch + 'streamalert_log_type_01_sub_type_01', expected_batch, 'classifier' ) @property @@ -476,7 +476,9 @@ def test_send_long_log_name(self, send_batch_mock): client.send(self._sample_payloads_long_log_name) send_batch_mock.assert_called_with( - 'streamalert_very_very_very_long_log_stream_name_abcdefg_7c88167b', expected_batch + 'streamalert_very_very_very_long_log_stream_name_abcdefg_7c88167b', + expected_batch, + 'classifier' ) def test_generate_firehose_name(self): @@ -522,3 +524,30 @@ def test_generate_firehose_name_prefix(self): ] assert_equal(expected_results, results) + + def test_artifacts_firehose_stream_name(self): + """FirehoseClient - Test generate artifacts firehose stream name""" + config_data = { + 'global': { + 'account': { + 'prefix': 'unittest' + } + }, + 'lambda': { + 'artifact_extractor_config': {} + } + } + + assert_equal( + self._client.artifacts_firehose_stream_name(config_data), + 'unittest_streamalert_artifacts' + ) + + config_data['lambda']['artifact_extractor_config']['firehose_stream_name'] = ( + 'test_artifacts_fh_name' + ) + + assert_equal( + self._client.artifacts_firehose_stream_name(config_data), + 'test_artifacts_fh_name' + ) diff --git a/tests/unit/streamalert/shared/test_normalizer.py b/tests/unit/streamalert/shared/test_normalizer.py index ba78c1875..1513c80ff 100644 --- a/tests/unit/streamalert/shared/test_normalizer.py +++ b/tests/unit/streamalert/shared/test_normalizer.py @@ -14,9 +14,10 @@ limitations under the License. """ from mock import patch -from nose.tools import assert_equal +from nose.tools import assert_equal, assert_false, assert_raises, assert_true -from streamalert.shared.normalize import Normalizer +from streamalert.shared.exceptions import ConfigError +from streamalert.shared.normalize import Normalizer, NormalizedType class TestNormalizer: @@ -42,17 +43,95 @@ def _test_record(cls): 'sourceIPAddress': '1.1.1.3' } + @classmethod + def _normalized_type_ip(cls): + return NormalizedType( + 'test_log_type', + 'ip_address', + [ + { + 'path': ['sourceIPAddress'], + 'function': 'source ip address' + }, + { + 'path': ['detail', 'source'], + 'function': 'source ip address' + } + ] + ) + + @classmethod + def _normalized_type_region(cls): + return NormalizedType( + 'test_log_type', + 'region', + [ + { + 'path': ['region'], + 'function': 'AWS region' + }, + { + 'path': ['detail', 'awsRegion'], + 'function': 'AWS region' + } + ] + ) + + @classmethod + def _normalized_type_account(cls): + return NormalizedType('test_log_type', 'account', ['account']) + + @classmethod + def _normalized_type_user_identity(cls): + return NormalizedType( + 'test_log_type', + 'user_identity', + [ + { + 'path': ['detail', 'userIdentity', 'userName'], + 'function': 'User name' + }, + { + 'path': ['detail', 'userIdentity', 'invokedBy'], + 'function': 'Service name' + } + ] + ) + def test_match_types(self): """Normalizer - Match Types""" normalized_types = { - 'region': ['region', 'awsRegion'], - 'sourceAccount': ['account', 'accountId'], - 'ipv4': ['destination', 'source', 'sourceIPAddress'] + 'region': self._normalized_type_region(), + 'account': self._normalized_type_account(), + 'ipv4': self._normalized_type_ip() } expected_results = { - 'sourceAccount': [123456], - 'ipv4': ['1.1.1.2', '1.1.1.3'], - 'region': ['region_name'] + 'account': [ + { + 'values': ['123456'], + 'function': None + } + ], + 'ipv4': [ + { + 'values': ['1.1.1.3'], + 'function': 'source ip address' + }, + { + 'values': ['1.1.1.2'], + 'function': 'source ip address' + } + ], + 'region': [ + { + 'values': ['region_name'], + 'function': 'AWS region' + }, + { + 'values': ['region_name'], + 'function': 'AWS region' + } + ] } results = Normalizer.match_types(self._test_record(), normalized_types) @@ -61,51 +140,60 @@ def test_match_types(self): def test_match_types_multiple(self): """Normalizer - Match Types, Mutiple Sub-keys""" normalized_types = { - 'account': ['account'], - 'region': ['region', 'awsRegion'], - 'ipv4': ['destination', 'source', 'sourceIPAddress'], - 'userName': ['userName', 'owner', 'invokedBy'] + 'account': self._normalized_type_account(), + 'ipv4': self._normalized_type_ip(), + 'region': self._normalized_type_region(), + 'user_identity': self._normalized_type_user_identity() } expected_results = { - 'account': [123456], - 'ipv4': ['1.1.1.2', '1.1.1.3'], - 'region': ['region_name'], - 'userName': ['Alice', 'signin.amazonaws.com'] + 'account': [ + { + 'values': ['123456'], + 'function': None + } + ], + 'ipv4': [ + { + 'values': ['1.1.1.3'], + 'function': 'source ip address' + }, + { + 'values': ['1.1.1.2'], + 'function': 'source ip address' + } + ], + 'region': [ + { + 'values': ['region_name'], + 'function': 'AWS region' + }, + { + 'values': ['region_name'], + 'function': 'AWS region' + } + ], + 'user_identity': [ + { + 'values': ['Alice'], + 'function': 'User name' + }, + { + 'values': ['signin.amazonaws.com'], + 'function': 'Service name' + } + ] } results = Normalizer.match_types(self._test_record(), normalized_types) assert_equal(results, expected_results) - def test_match_types_list(self): - """Normalizer - Match Types, List of Values""" - normalized_types = { - 'ipv4': ['sourceIPAddress'], - } - expected_results = { - 'ipv4': ['1.1.1.2', '1.1.1.3'] - } - - test_record = { - 'account': 123456, - 'sourceIPAddress': ['1.1.1.2', '1.1.1.3'] - } - - results = Normalizer.match_types(test_record, normalized_types) - assert_equal(results, expected_results) - def test_normalize(self): """Normalizer - Normalize""" log_type = 'cloudtrail' Normalizer._types_config = { log_type: { - 'region': { - 'region', - 'awsRegion' - }, - 'sourceAccount': { - 'account', - 'accountId' - } + 'region': self._normalized_type_region(), + 'ipv4': self._normalized_type_ip() } } record = self._test_record() @@ -123,9 +211,27 @@ def test_normalize(self): } }, 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': { - 'region': ['region_name'], - 'sourceAccount': [123456] + 'streamalert_normalization': { + 'region': [ + { + 'values': ['region_name'], + 'function': 'AWS region' + }, + { + 'values': ['region_name'], + 'function': 'AWS region' + } + ], + 'ipv4': [ + { + 'values': ['1.1.1.3'], + 'function': 'source ip address' + }, + { + 'values': ['1.1.1.2'], + 'function': 'source ip address' + } + ] } } @@ -136,14 +242,12 @@ def test_normalize_corner_case(self): log_type = 'cloudtrail' Normalizer._types_config = { log_type: { - 'normalized_key': { + 'normalized_key': NormalizedType( + log_type, 'normalized_key', - 'original_key' - }, - 'sourceAccount': { - 'account', - 'accountId' - } + ['original_key', 'original_key'] + ), + 'account': self._normalized_type_account() } } record = { @@ -159,8 +263,13 @@ def test_normalize_corner_case(self): 'original_key': { 'original_key': 'fizzbuzz', }, - 'streamalert:normalization': { - 'normalized_key': ['fizzbuzz'] + 'streamalert_normalization': { + 'normalized_key': [ + { + 'values': ['fizzbuzz'], + 'function': None + } + ] } } @@ -182,14 +291,24 @@ def test_key_does_not_exist(self): } normalized_types = { - 'region': ['region', 'awsRegion'], - 'sourceAccount': ['account', 'accountId'], + 'region': self._normalized_type_region(), + 'account': NormalizedType('test_log_type', 'account', ['accountId']), # There is no IP value in record, so normalization should not include this - 'ipv4': ['sourceIPAddress'] + 'ipv4': self._normalized_type_ip() } expected_results = { - 'sourceAccount': [123456], - 'region': ['region_name'] + 'account': [ + { + 'values': ['123456'], + 'function': None + } + ], + 'region': [ + { + 'values': ['region_name'], + 'function': 'AWS region' + } + ] } results = Normalizer.match_types(test_record, normalized_types) @@ -203,12 +322,17 @@ def test_empty_value(self): } normalized_types = { - 'region': ['region', 'awsRegion'], - 'sourceAccount': ['account', 'accountId'], - 'ipv4': ['sourceIPAddress'] + 'region': self._normalized_type_region(), + 'account': self._normalized_type_account(), + 'ipv4': self._normalized_type_ip() } expected_results = { - 'sourceAccount': [123456] + 'account': [ + { + 'values': ['123456'], + 'function': None + } + ] } results = Normalizer.match_types(test_record, normalized_types) @@ -219,8 +343,13 @@ def test_get_values_for_normalized_type(self): expected_result = {'1.1.1.3'} record = { 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': { - 'ip_v4': expected_result, + 'streamalert_normalization': { + 'ip_v4': [ + { + 'values': expected_result, + 'function': None + } + ], } } @@ -230,38 +359,38 @@ def test_get_values_for_normalized_type_none(self): """Normalizer - Get Values for Normalized Type, None""" record = { 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': {} + 'streamalert_normalization': {} } - assert_equal(Normalizer.get_values_for_normalized_type(record, 'ip_v4'), set()) + assert_equal(Normalizer.get_values_for_normalized_type(record, 'ip_v4'), None) + + def test_load_from_config_exist_types_config(self): + """Normalizer - Load normalized_types from conf when it was loaded previously""" + Normalizer._types_config = {'normalized_type1': {}} + assert_equal(Normalizer.load_from_config({'foo': 'bar'}), Normalizer) def test_load_from_config(self): """Normalizer - Load From Config""" config = { - 'normalized_types': { + 'logs': { 'cloudtrail': { - 'region': [ - 'region', - 'awsRegion' - ], - 'sourceAccount': [ - 'account', - 'accountId' - ] + 'schema': {}, + 'configuration': { + 'normalization': { + 'region': ['path', 'to', 'awsRegion'], + 'sourceAccount': ['path', 'to', 'accountId'] + } + } } } } normalizer = Normalizer.load_from_config(config) expected_config = { 'cloudtrail': { - 'region': [ - 'region', - 'awsRegion' - ], - 'sourceAccount': [ - 'account', - 'accountId' - ] + 'region': NormalizedType('cloudtrail', 'region', ['path', 'to', 'awsRegion']), + 'sourceAccount': NormalizedType( + 'cloudtrail', 'sourceAccount', ['path', 'to', 'accountId'] + ) } } assert_equal(normalizer, Normalizer) @@ -272,3 +401,311 @@ def test_load_from_config_empty(self): normalizer = Normalizer.load_from_config({}) assert_equal(normalizer, Normalizer) assert_equal(normalizer._types_config, None) + + def test_load_from_config_from_log_conf(self): + """Normalizer - Load normalization config from "logs" field in the config""" + config = { + 'logs': { + 'cloudwatch:events': { + 'schema': { + 'account': 'string', + 'source': 'string', + 'key': 'string' + }, + 'parser': 'json', + 'configuration': { + 'normalization': { + 'event_name': ['detail', 'eventName'], + 'region': [ + { + 'path': ['region'], + 'function': 'aws region information' + }, + { + 'path': ['detail', 'awsRegion'], + 'function': 'aws region information' + } + ], + 'ip_address': [ + { + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + } + ] + } + } + } + } + } + + expected_config = { + 'cloudwatch:events': { + 'event_name': NormalizedType( + 'cloudwatch:events', 'event_name', ['detail', 'eventName'] + ), + 'region': NormalizedType( + 'cloudwatch:events', + 'region', + [ + { + 'path': ['region'], + 'function': 'aws region information' + }, + { + 'path': ['detail', 'awsRegion'], + 'function': 'aws region information' + } + ] + ), + 'ip_address': NormalizedType( + 'cloudwatch:events', + 'ip_address', + [ + { + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + } + ] + ) + } + } + + normalizer = Normalizer.load_from_config(config) + assert_equal(normalizer, Normalizer) + assert_equal(normalizer._types_config, expected_config) + + def test_load_from_config_deprecate_normalized_types(self): + """Normalizer - Load normalization config and deprecate conf/normalized_types.json + """ + config = { + 'logs': { + 'cloudwatch:events': { + 'schema': { + 'account': 'string', + 'source': 'string', + 'key': 'string' + }, + 'parser': 'json', + 'configuration': { + 'normalization': { + 'ip_address': [ + { + 'path': ['path', 'to', 'sourceIPAddress'], + 'function': 'source ip address' + } + ] + } + } + }, + 'other_log_type': {} + }, + 'normalized_types': { + 'cloudwatch': { + 'region': ['region', 'awsRegion'], + 'sourceAccount': ['account', 'accountId'] + } + } + } + expected_config = { + 'cloudwatch:events': { + 'ip_address': NormalizedType( + 'cloudwatch:events', + 'ip_address', + [ + { + 'path': ['path', 'to', 'sourceIPAddress'], + 'function': 'source ip address' + } + ] + ) + } + } + + normalizer = Normalizer.load_from_config(config) + assert_equal(normalizer, Normalizer) + assert_equal(normalizer._types_config, expected_config) + + def test_load_from_config_error(self): + """Normalizer - Load normalization config raises ConfigError + """ + config = { + 'logs': { + 'cloudwatch:events': { + 'schema': { + 'account': 'string', + 'source': 'string', + 'key': 'string' + }, + 'parser': 'json', + 'configuration': { + 'normalization': { + 'foo': 'bar' + } + } + } + } + } + assert_raises(ConfigError, Normalizer.load_from_config, config) + + config = { + 'logs': { + 'cloudwatch:events': { + 'schema': { + 'account': 'string', + 'source': 'string', + 'key': 'string' + }, + 'parser': 'json', + 'configuration': { + 'normalization': { + 'ip_address':{ + 'path': ['detail', 'sourceIPAddress'], + 'function': 'source ip address' + } + } + } + }, + 'other_log_type': {} + } + } + assert_raises(ConfigError, Normalizer.load_from_config, config) + + def test_normalize_condition(self): + """Normalizer - Test normalization when condition applied""" + log_type = 'cloudtrail' + + region = NormalizedType( + 'test_log_type', + 'region', + [ + { + 'path': ['region'], + 'function': 'AWS region' + }, + { + 'path': ['detail', 'awsRegion'], + 'function': 'AWS region', + 'condition': { + 'path': ['detail', 'userIdentity', 'userName'], + 'not_in': ['alice', 'bob'] + } + } + ] + ) + + ipv4 = NormalizedType( + 'test_log_type', + 'ip_address', + [ + { + 'path': ['sourceIPAddress'], + 'function': 'source ip address', + 'condition': { + 'path': ['account'], + 'is': '123456' + } + }, + { + 'path': ['detail', 'source'], + 'function': 'source ip address', + 'condition': { + 'path': ['account'], + 'is_not': '123456' + } + } + ] + ) + + Normalizer._types_config = { + log_type: { + 'region': region, + 'ipv4': ipv4 + } + } + record = self._test_record() + Normalizer.normalize(record, log_type) + + expected_record = { + 'account': 123456, + 'region': 'region_name', + 'detail': { + 'awsRegion': 'region_name', + 'source': '1.1.1.2', + 'userIdentity': { + "userName": "Alice", + "invokedBy": "signin.amazonaws.com" + } + }, + 'sourceIPAddress': '1.1.1.3', + 'streamalert_normalization': { + 'region': [ + { + 'values': ['region_name'], + 'function': 'AWS region' + } + ], + 'ipv4': [ + { + 'values': ['1.1.1.3'], + 'function': 'source ip address' + } + ] + } + } + assert_equal(record, expected_record) + + def test_match_condition(self): + """Normalizer - Test match condition with different conditions""" + record = self._test_record() + + condition = { + 'path': ['account'], + 'is': '123456' + } + assert_true(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['account'], + 'is_not': '123456' + } + assert_false(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['detail', 'awsRegion'], + 'contains': 'region' + } + assert_true(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['detail', 'awsRegion'], + 'contains': 'not_region' + } + assert_false(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['detail', 'userIdentity', 'userName'], + 'not_contains': 'alice' + } + assert_false(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['sourceIPAddress'], + 'in': ['1.1.1.2', '1.1.1.3'] + } + assert_true(Normalizer._match_condition(record, condition)) + + condition = { + 'path': ['sourceIPAddress'], + 'not_in': ['1.1.1.2', '1.1.1.3'] + } + assert_false(Normalizer._match_condition(record, condition)) + + # Only support extract one condition. The result is not quaranteed if multiple conditions + # configured. In this test case, it is because 'not_in' condition is checked before + # 'contains' + condition = { + 'path': ['detail', 'userIdentity', 'invokedBy'], + 'contains': 'amazonaws.com', + 'not_in': ['signin.amazonaws.com', 's3.amazonaws.com'] + } + assert_false(Normalizer._match_condition(record, condition)) diff --git a/tests/unit/streamalert_cli/athena/test_handler.py b/tests/unit/streamalert_cli/athena/test_handler.py index 3d09d72ed..0354f8778 100644 --- a/tests/unit/streamalert_cli/athena/test_handler.py +++ b/tests/unit/streamalert_cli/athena/test_handler.py @@ -17,7 +17,7 @@ from mock import Mock, patch from nose.tools import assert_equal, assert_true -from streamalert.classifier.clients import FirehoseClient +from streamalert.shared.firehose import FirehoseClient from streamalert_cli.athena import handler from streamalert_cli.config import CLIConfig diff --git a/tests/unit/streamalert_cli/athena/test_helpers.py b/tests/unit/streamalert_cli/athena/test_helpers.py index 61294b4f8..8d55e03a2 100644 --- a/tests/unit/streamalert_cli/athena/test_helpers.py +++ b/tests/unit/streamalert_cli/athena/test_helpers.py @@ -18,7 +18,7 @@ from streamalert_cli.athena import helpers from streamalert_cli.config import CLIConfig -from streamalert.classifier.clients import FirehoseClient +from streamalert.shared.firehose import FirehoseClient CONFIG = CLIConfig(config_path='tests/unit/conf') @@ -150,3 +150,17 @@ def test_generate_data_table_schema_2(): assert_true(helpers.generate_data_table_schema(config, 'cloudwatch:test_match_types')) FirehoseClient._ENABLED_LOGS.clear() + +def test_generate_artifact_table_schema(): + """CLI - Athena test generate_artifact_table_schema helper""" + result = helpers.generate_artifacts_table_schema() + + expected_result = [ + ('function', 'string'), + ('source_type', 'string'), + ('streamalert_record_id', 'string'), + ('type', 'string'), + ('value', 'string') + ] + + assert_equal(result, expected_result) diff --git a/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py b/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py new file mode 100644 index 000000000..253a09b26 --- /dev/null +++ b/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py @@ -0,0 +1,103 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import json + +from nose.tools import assert_equal, assert_is_none + +from streamalert_cli.config import CLIConfig +from streamalert_cli.terraform import artifact_extractor + +class TestTerraformArtifactExtractor: + """Test class for test generating Artifact Extractor terrform modules""" + + def __init__(self): + """Init config for the test cases""" + self.config = CLIConfig(config_path='tests/unit/conf') + + def test_generate_artifact_extractor(self): + """CLI - Terraform generate artifact extractor""" + result = artifact_extractor.generate_artifact_extractor(self.config) + assert_is_none(result) + + self.config['lambda']['artifact_extractor_config'] = { + 'enabled': True, + 'memory': 128, + 'timeout': 300 + } + + self.config['global']['infrastructure']['firehose']['enabled_logs'] = { + 'unit_test:type_1', + 'unit_test:type_2' + } + + self.config['logs']['unit_test:type_1'] = { + 'schema': {}, + 'configuration': { + 'normalization': { + 'normalized_type': ['original_key1', 'original_key2'] + } + } + } + self.config['logs']['unit_test:type_2'] = { + 'schema': {} + } + + result = artifact_extractor.generate_artifact_extractor(self.config) + expected_result = { + 'module': { + 'artifact_extractor': { + 'source': './modules/tf_artifact_extractor', + 'account_id': '12345678910', + 'prefix': 'unit-test', + 'region': 'us-west-1', + 'function_role_id': '${module.artifact_extractor_lambda.role_id}', + 'function_alias_arn': '${module.artifact_extractor_lambda.function_alias_arn}', + 'glue_catalog_db_name': 'unit-test_streamalert', + 'glue_catalog_table_name': 'artifacts', + 's3_bucket_name': 'unit-test-streamalert-data', + 'stream_name': 'unit_test_streamalert_artifacts', + 'buffer_size': 128, + 'buffer_interval': 900, + 'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}', + 'schema': [ + ['function', 'string'], + ['source_type', 'string'], + ['streamalert_record_id', 'string'], + ['type', 'string'], + ['value', 'string'] + ] + }, + 'artifact_extractor_lambda': { + 'source': './modules/tf_lambda', + 'function_name': 'unit-test_streamalert_artifact_extractor', + 'description': 'Unit-Test Streamalert Artifact Extractor', + 'handler': 'streamalert.artifact_extractor.main.handler', + 'memory_size_mb': 128, + 'timeout_sec': 300, + 'environment_variables': { + 'ENABLE_METRICS': '0', + 'LOGGER_LEVEL': 'info', + 'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_streamalert_artifacts' + }, + 'tags': {} + } + } + } + + # FIMME: not sure why assert_equal between result (defaultdict) and expected_result (dict) + # fails. + # assert_equal(result, expected_result) + assert_equal(json.dumps(result), json.dumps(expected_result)) diff --git a/tests/unit/streamalert_cli/terraform/test_generate.py b/tests/unit/streamalert_cli/terraform/test_generate.py index da6c07a1f..3e3589d2d 100644 --- a/tests/unit/streamalert_cli/terraform/test_generate.py +++ b/tests/unit/streamalert_cli/terraform/test_generate.py @@ -93,7 +93,7 @@ def test_generate_main(self): tf_main_expected = { 'provider': { 'aws': { - 'version': '~> 2.28.1', # Changes to this should require unit test update + 'version': '~> 2.48.0', # Changes to this should require unit test update 'region': 'us-west-1' } },