From bf6244b510d71222946a994a0aca590203730029 Mon Sep 17 00:00:00 2001 From: darkjokelady Date: Thu, 22 Aug 2019 11:17:49 -0700 Subject: [PATCH 1/4] [apps] Correctly update aliyun timestamp (#978) --- stream_alert/apps/_apps/aliyun.py | 53 ++++++++++++- stream_alert/apps/app_base.py | 2 +- .../test_apps/test_aliyun.py | 78 ++++++++++++++++++- .../test_apps/test_app_base.py | 2 +- 4 files changed, 127 insertions(+), 8 deletions(-) diff --git a/stream_alert/apps/_apps/aliyun.py b/stream_alert/apps/_apps/aliyun.py index e604ac4d9..985bdd854 100644 --- a/stream_alert/apps/_apps/aliyun.py +++ b/stream_alert/apps/_apps/aliyun.py @@ -47,6 +47,7 @@ class AliyunApp(AppIntegration): https://www.alibabacloud.com/help/doc-detail/28849.htm """ + # The maximum number of results to be returned. Valid values: 0 to 50. _MAX_RESULTS = 50 def __init__(self, event, context): @@ -76,16 +77,64 @@ def date_formatter(cls): return '%Y-%m-%dT%H:%M:%SZ' def _gather_logs(self): - + """Fetch ActionTrail events and return a list of events + + Example response from do_action_with_exception method + + { + 'EndTime': '2019-08-22T04:41:32Z', + 'NextToken': '2', + 'RequestId': '562D9C08-E766-4038-B49F-B0D2BE1980FE', + 'StartTime': '2019-08-01T04:31:52Z', + 'Events': [{ + 'eventId': '60.152_1566447558068_1247', + 'eventVersion': '1', + 'acsRegion': 'cn-hangzhou', + 'additionalEventData': { + 'mfaChecked': 'true', + 'callbackUrl': 'https://home.console.aliyun.com/' + }, + 'eventType': 'ConsoleSignin', + 'errorMessage': 'success', + 'eventTime': '2019-08-22T04:19:18Z', + 'eventName': 'ConsoleSignin', + 'userIdentity': { + 'userName': 'dead_joke', + 'type': 'ram-user', + 'principalId': '222222222222222222', + 'accountId': '1111111111111111' + }, + 'eventSource': 'signin.aliyun.com', + 'requestId': '60.152_1566447558068_1247', + 'userAgent': 'some browser version', + 'sourceIpAddress': '1.1.1.1', + 'serviceName': 'AasSub' + }, { + 'eventId': '029B39F0-5E23-4931-B4C9-BA72C7261ADF', + ... + 'eventTime': '2019-08-21T22:26:09Z', + ... + }] + } + """ try: response = self.client.do_action_with_exception(self.request) json_response = json.loads(response) + + # Note: ActionTrail API return ActionTrail events in sorted order, and + # it is latest events first. There still has a small chance that it may not get + # all the logs when there are still more logs to pull when lambda function + # timeout reached, and remaining logs will be lost because the last_timestamp + # is updated to "EndTime" during the first lambda function call. + # + # To lower the data loss possibility, suggest to have longer timeout for lambda + # function (aliyun app) and set app schedule more frequently, e.g. every 10 mins + self._last_timestamp = json_response['EndTime'] if 'NextToken' in json_response: self._more_to_poll = True self.request.set_NextToken(json_response['NextToken']) else: self._more_to_poll = False - self._last_timestamp = json_response['EndTime'] return json_response['Events'] diff --git a/stream_alert/apps/app_base.py b/stream_alert/apps/app_base.py index 781489dda..6bc8298d5 100644 --- a/stream_alert/apps/app_base.py +++ b/stream_alert/apps/app_base.py @@ -200,7 +200,7 @@ def _initialize(self): """Method for performing any startup steps, like setting state to running""" # Perform another safety check to make sure this is not being invoked already if self._config.is_running: - LOGGER.error('[%s] App already running', self) + LOGGER.warning('[%s] App already running', self) return False # Check if this is an invocation spawned from a previous partial execution diff --git a/tests/unit/stream_alert_apps/test_apps/test_aliyun.py b/tests/unit/stream_alert_apps/test_apps/test_aliyun.py index 9f36fc915..b745ad24d 100644 --- a/tests/unit/stream_alert_apps/test_apps/test_aliyun.py +++ b/tests/unit/stream_alert_apps/test_apps/test_aliyun.py @@ -13,11 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. """ +import json import os from mock import patch from moto import mock_ssm -from nose.tools import assert_equal, assert_false, assert_items_equal +from nose.tools import assert_equal, assert_false, assert_items_equal, assert_true from aliyunsdkcore.acs_exception.exceptions import ServerException @@ -39,7 +40,7 @@ def setup(self): self._test_app_name = 'aliyun' put_mock_params(self._test_app_name) self._event = get_event(self._test_app_name) - self._context = get_mock_lambda_context(self._test_app_name) + self._context = get_mock_lambda_context(self._test_app_name, milliseconds=100000) self._app = AliyunApp(self._event, self._context) def test_sleep_seconds(self): @@ -94,10 +95,79 @@ def test_gather_logs_entries(self, client_mock): """AliyunApp - Gather Logs with some entries""" client_mock.return_value = '{"NextToken":"20","RequestId":'\ '"B1DE97F8-5450-4593-AB38-FB61B799E91D",' \ - '"Events":["filler data","filler data"],' \ + '"Events":[{"eventTime":"123"},{"eventTime":"123"}],' \ '"EndTime":"2018-07-23T19:28:00Z",' \ '"StartTime":"2018-06-23T19:28:30Z"}' logs = self._app._gather_logs() assert_equal(2, len(logs)) - assert self._app._more_to_poll # nosec + assert_true(self._app._more_to_poll) assert_equal(self._app.request.get_NextToken(), "20") + + @patch('stream_alert.apps.app_base.AppIntegration._invoke_successive_app') + @patch('stream_alert.apps.batcher.Batcher._send_logs_to_lambda') + @patch('stream_alert.apps._apps.aliyun.AliyunApp._sleep_seconds') + @patch('aliyunsdkcore.client.AcsClient.do_action_with_exception') + def test_gather_logs_last_timestamp(self, client_mock, sleep_mock, batcher_mock, _): + """AliyunApp - Test last_timestamp""" + # mock 3 responses + mock_resps = [ + { + 'NextToken': '50', + 'RequestId': 'AAAAAAAA', + 'Events': [ + { + 'eventTime': '2018-06-23T19:29:00Z' + }, + { + 'eventTime': '2018-06-23T19:28:00Z' + } + ], + 'EndTime': '2018-07-23T19:28:00Z', + 'StartTime': '2018-06-23T19:28:30Z' + }, + { + 'NextToken': '100', + 'RequestId': 'BBBBBBBBB', + 'Events': [ + { + 'eventTime': '2018-06-24T19:29:00Z' + }, + { + 'eventTime': '2018-06-24T19:28:00Z' + } + ], + 'EndTime': '2018-07-23T19:28:00Z', + 'StartTime': '2018-06-23T19:28:30Z' + }, + { + 'NextToken': '150', + 'RequestId': 'CCCCCCCC', + 'Events': [ + { + 'eventTime': '2018-06-25T19:29:00Z' + }, + { + 'eventTime': '2018-06-25T19:28:00Z' + } + ], + 'EndTime': '2018-07-23T19:28:00Z', + 'StartTime': '2018-06-23T19:28:30Z' + } + ] + client_mock.side_effect = [json.dumps(r, separators=(',', ':')) for r in mock_resps] + + # Mock remaining time. _sleep_seconds() methods will be called twice when + # make a call to gather logs via Aliyun API. Set sleep second to a large number + # to mimic corner case that there are still more logs to pull while lambda function + # timeout is reached. In this case, the _last_timestamp stamp should be updated + # correctly. + sleep_mock.side_effect = [0, 0, 0, 0, 1000000, 0] + + # Mock 3 batcher call to invoke successive lambda function since there are more logs + batcher_mock.side_effect = [True, True, True] + + self._app.gather() + assert_equal(self._app._poll_count, 3) + assert_true(self._app._more_to_poll) + assert_equal(self._app.request.get_NextToken(), "150") + assert_equal(self._app._last_timestamp, '2018-07-23T19:28:00Z') diff --git a/tests/unit/stream_alert_apps/test_apps/test_app_base.py b/tests/unit/stream_alert_apps/test_apps/test_app_base.py index 3bc4ed49c..3a2df9656 100644 --- a/tests/unit/stream_alert_apps/test_apps/test_app_base.py +++ b/tests/unit/stream_alert_apps/test_apps/test_app_base.py @@ -165,7 +165,7 @@ def test_initialize(self): """App Integration - Initialize, Valid""" assert_true(self._app._initialize()) - @patch('logging.Logger.error') + @patch('logging.Logger.warning') def test_initialize_running(self, log_mock): """App Integration - Initialize, Already Running""" self._app._config.current_state = 'running' From f336c7850235096052c36aa8836f577a506c02c3 Mon Sep 17 00:00:00 2001 From: ryandeivert Date: Fri, 23 Aug 2019 10:08:16 -0700 Subject: [PATCH 2/4] tweaking .gitignore file slightly for venv (#980) --- .gitignore | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index b186941cd..4733659ac 100644 --- a/.gitignore +++ b/.gitignore @@ -11,20 +11,14 @@ terraform/terraform.tfstate* terraform/*.zip terraform/*.tf.json -# MacOS artifacts -Thumbs.db -.DS_Store -*.swp -terminal.glue - # Coveralls repo token .coveralls.yml # nose coverage file .coverage -# virtualenv files -venv/ +# virtualenv files in root of repo +/venv -.vagrant/ -htmlcov/ +/vagrant +/htmlcov From 02b4a427050de89fdbf0ab743c41b172d7082cff Mon Sep 17 00:00:00 2001 From: JUUL Security Engineering <54082747+JUULShawn@users.noreply.github.com> Date: Fri, 30 Aug 2019 14:34:13 -0700 Subject: [PATCH 3/4] Hotfix/links and spelling (#967) * Fix broken link in the documentation * Minor spelling updates in comments, code, and docs --- docs/source/athena-arch.rst | 2 +- .../duo_bypass_code_create_non_auto_generated.py | 2 +- stream_alert/alert_processor/outputs/carbonblack.py | 2 +- stream_alert/apps/_apps/aliyun.py | 2 +- stream_alert/apps/app_base.py | 2 +- stream_alert/classifier/clients/firehose.py | 6 +++--- stream_alert/classifier/clients/sqs.py | 2 +- stream_alert/shared/athena.py | 6 +++--- stream_alert_cli/terraform/athena.py | 2 +- stream_alert_cli/terraform/cloudtrail.py | 2 +- tests/integration/rules/box/box_admin_events.json | 2 +- tests/integration/rules/gsuite/gsuite_admin.json | 2 +- tests/unit/helpers/aws_mocks.py | 4 ++-- 13 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/source/athena-arch.rst b/docs/source/athena-arch.rst index 97f1525df..9146fa8f7 100644 --- a/docs/source/athena-arch.rst +++ b/docs/source/athena-arch.rst @@ -13,7 +13,7 @@ The Athena Partition Refresh function utilizes: * `AWS S3 Event Notifications `_ * `AWS SQS `_ * `AWS Lambda Invocations by Schedule `_ -* `AWS Athena Repair Table `_ +* `AWS Athena Repair Table `_ Diagram ~~~~~~~ diff --git a/rules/community/duo_administrator/duo_bypass_code_create_non_auto_generated.py b/rules/community/duo_administrator/duo_bypass_code_create_non_auto_generated.py index e266a17a3..980b04a1e 100644 --- a/rules/community/duo_administrator/duo_bypass_code_create_non_auto_generated.py +++ b/rules/community/duo_administrator/duo_bypass_code_create_non_auto_generated.py @@ -1,4 +1,4 @@ -"""Alert when a DUO bypass code is artisanly crafted and not auto-generated.""" +"""Alert when a DUO bypass code is artisanally crafted and not auto-generated.""" from rules.helpers.base import safe_json_loads from stream_alert.shared.rule import rule diff --git a/stream_alert/alert_processor/outputs/carbonblack.py b/stream_alert/alert_processor/outputs/carbonblack.py index 2d2e59913..b25afe07b 100644 --- a/stream_alert/alert_processor/outputs/carbonblack.py +++ b/stream_alert/alert_processor/outputs/carbonblack.py @@ -87,7 +87,7 @@ def _dispatch(self, alert, descriptor): binary_hash = carbonblack_context.get('value') # The binary should already exist in CarbonBlack binary = client.select(Binary, binary_hash) - # Determine if the binary is currenty listed as banned + # Determine if the binary is currently listed as banned if binary.banned: # Determine if the banned action is enabled, if true exit if binary.banned.enabled: diff --git a/stream_alert/apps/_apps/aliyun.py b/stream_alert/apps/_apps/aliyun.py index 985bdd854..c4b86fe17 100644 --- a/stream_alert/apps/_apps/aliyun.py +++ b/stream_alert/apps/_apps/aliyun.py @@ -71,7 +71,7 @@ def service(cls): def date_formatter(cls): """Return a format string for a date, ie: 2014-05-26T12:00:00Z - This format is consisten with the format used by the Aliyun API: + This format is consistent with the format used by the Aliyun API: https://www.alibabacloud.com/help/doc-detail/28849.htm """ return '%Y-%m-%dT%H:%M:%SZ' diff --git a/stream_alert/apps/app_base.py b/stream_alert/apps/app_base.py index 6bc8298d5..e387571e8 100644 --- a/stream_alert/apps/app_base.py +++ b/stream_alert/apps/app_base.py @@ -340,7 +340,7 @@ def _make_post_request(self, full_url, headers, data, is_json=True): @_report_time def _gather(self): - """Protected entry point to peform the gather that returns the time the process took + """Protected entry point to perform the gather that returns the time the process took Returns: float: time, in seconds, for which the function ran diff --git a/stream_alert/classifier/clients/firehose.py b/stream_alert/classifier/clients/firehose.py index cfc0bb1ad..03ea96d3e 100644 --- a/stream_alert/classifier/clients/firehose.py +++ b/stream_alert/classifier/clients/firehose.py @@ -193,7 +193,7 @@ def _finalize(cls, response, stream_name, size): size (int): The original size of the batch being sent """ if not response: - return # Could happen in the case of backoff failing enitrely + return # Could happen in the case of backoff failing entirely # Check for failures that occurred in PutRecordBatch after several backoff attempts if response.get('FailedPutCount'): @@ -281,13 +281,13 @@ def _firehose_request_helper(data): @classmethod def firehose_log_name(cls, log_name): - """Convert conventional log names into Firehose delievery stream names + """Convert conventional log names into Firehose delivery stream names Args: log_name: The name of the log from logs.json Returns - str: Converted name which corresponds to a Firehose Delievery Stream + str: Converted name which corresponds to a Firehose delivery Stream """ return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name) diff --git a/stream_alert/classifier/clients/sqs.py b/stream_alert/classifier/clients/sqs.py index 464484d2b..e566e3506 100644 --- a/stream_alert/classifier/clients/sqs.py +++ b/stream_alert/classifier/clients/sqs.py @@ -104,7 +104,7 @@ def _finalize(self, response, count): response (string|bool): MessageId or False if this request failed count (int): The size of the batch being sent to be logged as successful or failed """ - if not response: # Could happen in the case of backoff failing enitrely + if not response: # Could happen in the case of backoff failing entirely MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.SQS_FAILED_RECORDS, count) return diff --git a/stream_alert/shared/athena.py b/stream_alert/shared/athena.py index 6cc18ebea..1310a2bc1 100644 --- a/stream_alert/shared/athena.py +++ b/stream_alert/shared/athena.py @@ -99,13 +99,13 @@ def _execute_and_wait(self, query): """ response = self._execute_query(query) - exeuction_id = response['QueryExecutionId'] + execution_id = response['QueryExecutionId'] # This will block until the execution is complete, or raise an # AthenaQueryExecutionError exception if an error occurs - self.check_query_status(exeuction_id) + self.check_query_status(execution_id) - return exeuction_id + return execution_id def _execute_query(self, query): """Execute an Athena query on the current database. This operation is non-blocking diff --git a/stream_alert_cli/terraform/athena.py b/stream_alert_cli/terraform/athena.py index a0b95358a..b101d09b7 100644 --- a/stream_alert_cli/terraform/athena.py +++ b/stream_alert_cli/terraform/athena.py @@ -90,7 +90,7 @@ def generate_athena(config): filter_pattern_idx, filter_value_idx = 0, 1 # Add filters for the cluster and aggregate - # Use a list of strings that represnt the following comma separated values: + # Use a list of strings that represent the following comma separated values: # ,, filters = ['{},{},{}'.format('{}-{}'.format(metric_prefix, metric), settings[filter_pattern_idx], diff --git a/stream_alert_cli/terraform/cloudtrail.py b/stream_alert_cli/terraform/cloudtrail.py index d33ca040f..f738d27a7 100644 --- a/stream_alert_cli/terraform/cloudtrail.py +++ b/stream_alert_cli/terraform/cloudtrail.py @@ -45,7 +45,7 @@ def generate_cloudtrail(cluster_name, cluster_dict, config): set([config['global']['account']['aws_account_id']] + modules['cloudtrail'].get( 'cross_account_ids', []))) - # Allow for backwards compatilibity + # Allow for backwards compatibility if enabled_legacy: del config['clusters'][cluster_name]['modules']['cloudtrail']['enabled'] config['clusters'][cluster_name]['modules']['cloudtrail']['enable_logging'] = True diff --git a/tests/integration/rules/box/box_admin_events.json b/tests/integration/rules/box/box_admin_events.json index 68a9e58b9..2b2e66497 100644 --- a/tests/integration/rules/box/box_admin_events.json +++ b/tests/integration/rules/box/box_admin_events.json @@ -21,7 +21,7 @@ }, "type": "event" }, - "description": "Box admin event log exmaple (validation only)", + "description": "Box admin event log example (validation only)", "log": "box:admin_events", "service": "stream_alert_app", "source": "prefix_cluster_box_admin_events_sm-app-name_app", diff --git a/tests/integration/rules/gsuite/gsuite_admin.json b/tests/integration/rules/gsuite/gsuite_admin.json index 98378e65e..10e7ee752 100644 --- a/tests/integration/rules/gsuite/gsuite_admin.json +++ b/tests/integration/rules/gsuite/gsuite_admin.json @@ -31,7 +31,7 @@ "kind": "audit#activity", "ownerDomain": "example.com" }, - "description": "G Suite Admin Report Log exmaple (validation only)", + "description": "G Suite Admin Report Log example (validation only)", "log": "gsuite:reports", "service": "stream_alert_app", "source": "prefix_cluster_gsuite_admin_sm-app-name_app", diff --git a/tests/unit/helpers/aws_mocks.py b/tests/unit/helpers/aws_mocks.py index 944a16a1b..766497f7a 100644 --- a/tests/unit/helpers/aws_mocks.py +++ b/tests/unit/helpers/aws_mocks.py @@ -104,7 +104,7 @@ def get_start_query_execution(**kwargs): } def start_query_execution(self, **kwargs): - """Start an Athena Query Exectuion.""" + """Start an Athena Query Execution.""" if self.raise_exception: raise ClientError({'Error': {'Code': 10}}, 'InvalidRequestException') new_query_execution = self.get_start_query_execution(**kwargs) @@ -116,7 +116,7 @@ def start_query_execution(self, **kwargs): } def get_query_execution(self, **kwargs): - """Get the status of an Athena Query Exectuion.""" + """Get the status of an Athena Query Execution.""" query_execution = self.query_executions.get(kwargs['QueryExecutionId']) query_execution['QueryExecution']['Status']['State'] = self.result_state From 7d198a3273781f66465420e90886a3ce53ec7559 Mon Sep 17 00:00:00 2001 From: Blake Motl Date: Fri, 30 Aug 2019 14:59:02 -0700 Subject: [PATCH 4/4] Bumped slack app timeout (#983) --- stream_alert/apps/_apps/slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream_alert/apps/_apps/slack.py b/stream_alert/apps/_apps/slack.py index 5c06e2880..54be61da3 100644 --- a/stream_alert/apps/_apps/slack.py +++ b/stream_alert/apps/_apps/slack.py @@ -35,7 +35,7 @@ class SlackApp(AppIntegration): contain details about your workspace's integrated apps """ - _DEFAULT_REQUEST_TIMEOUT = 10 + _DEFAULT_REQUEST_TIMEOUT = 30 _SLACK_API_BASE_URL = 'https://slack.com/api/' _SLACK_API_MAX_ENTRY_COUNT = 1000 _SLACK_API_MAX_PAGE_COUNT = 100