From 074d878d2f3f74d2575b98b58b03b450dc4b549c Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 31 Mar 2020 15:58:00 -0700 Subject: [PATCH 1/7] rename of athena function --- .../__init__.py | 0 .../main.py | 18 ++++---- streamalert/shared/__init__.py | 2 +- streamalert/shared/metrics.py | 6 +-- .../__init__.py | 0 .../test_main.py | 46 +++++++++---------- 6 files changed, 36 insertions(+), 36 deletions(-) rename streamalert/{athena_partition_refresh => athena_partitioner}/__init__.py (100%) rename streamalert/{athena_partition_refresh => athena_partitioner}/main.py (95%) rename tests/unit/streamalert/{athena_partition_refresh => athena_partitioner}/__init__.py (100%) rename tests/unit/streamalert/{athena_partition_refresh => athena_partitioner}/test_main.py (90%) diff --git a/streamalert/athena_partition_refresh/__init__.py b/streamalert/athena_partitioner/__init__.py similarity index 100% rename from streamalert/athena_partition_refresh/__init__.py rename to streamalert/athena_partitioner/__init__.py diff --git a/streamalert/athena_partition_refresh/main.py b/streamalert/athena_partitioner/main.py similarity index 95% rename from streamalert/athena_partition_refresh/main.py rename to streamalert/athena_partitioner/main.py index fb7667bb7..267c5a80f 100644 --- a/streamalert/athena_partition_refresh/main.py +++ b/streamalert/athena_partitioner/main.py @@ -31,11 +31,11 @@ LOGGER = get_logger(__name__) -class AthenaRefreshError(Exception): +class AthenaPartitionerError(Exception): """Generic Athena Partition Error for erroring the Lambda function""" -class AthenaRefresher: +class AthenaPartitioner: """Handle polling an SQS queue and running Athena queries for updating tables""" ALERTS_REGEX = re.compile(r'alerts/dt=(?P\d{4})' @@ -58,14 +58,14 @@ class AthenaRefresher: r'\-(?P\d{2})' r'\-(?P\d{2})\/.*') - ATHENA_S3_PREFIX = 'athena_partition_refresh' + ATHENA_S3_PREFIX = 'athena_partitioner' _ATHENA_CLIENT = None def __init__(self): config = load_config(include={'lambda.json', 'global.json'}) prefix = config['global']['account']['prefix'] - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] self._file_format = get_data_file_format(config) if self._file_format == 'parquet': @@ -78,7 +78,7 @@ def __init__(self): else: message = ( 'file format "{}" is not supported. Supported file format are ' - '"parquet", "json". Please update the setting in athena_partition_refresh_config ' + '"parquet", "json". Please update the setting in athena_partitioner_config ' 'in "conf/lambda.json"'.format(self._file_format) ) raise ConfigError(message) @@ -106,7 +106,7 @@ def _create_client(cls, db_name, results_bucket): # Check if the database exists when the client is created if not cls._ATHENA_CLIENT.check_database_exists(): - raise AthenaRefreshError('The \'{}\' database does not exist'.format(db_name)) + raise AthenaPartitionerError('The \'{}\' database does not exist'.format(db_name)) def _get_partitions_from_keys(self): """Get the partitions that need to be added for the Athena tables @@ -198,7 +198,7 @@ def _add_partitions(self): success = self._ATHENA_CLIENT.run_query(query=query) if not success: - raise AthenaRefreshError( + raise AthenaPartitionerError( 'The add hive partition query has failed:\n{}'.format(query) ) @@ -247,5 +247,5 @@ def run(self, event): def handler(event, _): - """Athena Partition Refresher Handler Function""" - AthenaRefresher().run(event) + """Athena Partitioner Handler Function""" + AthenaPartitioner().run(event) diff --git a/streamalert/shared/__init__.py b/streamalert/shared/__init__.py index 3832c9e6e..00fdda2f2 100644 --- a/streamalert/shared/__init__.py +++ b/streamalert/shared/__init__.py @@ -1,7 +1,7 @@ """Define some shared resources.""" ALERT_MERGER_NAME = 'alert_merger' ALERT_PROCESSOR_NAME = 'alert_processor' -ATHENA_PARTITION_REFRESH_NAME = 'athena_partition_refresh' +ATHENA_PARTITIONER_NAME = 'athena_partitioner' CLASSIFIER_FUNCTION_NAME = 'classifier' RULES_ENGINE_FUNCTION_NAME = 'rules_engine' RULE_PROMOTION_NAME = 'rule_promotion' diff --git a/streamalert/shared/metrics.py b/streamalert/shared/metrics.py index b16292aac..52a97cd36 100644 --- a/streamalert/shared/metrics.py +++ b/streamalert/shared/metrics.py @@ -18,7 +18,7 @@ from streamalert.shared import ( ALERT_MERGER_NAME, ALERT_PROCESSOR_NAME, - ATHENA_PARTITION_REFRESH_NAME, + ATHENA_PARTITIONER_NAME, CLASSIFIER_FUNCTION_NAME, RULES_ENGINE_FUNCTION_NAME ) @@ -29,7 +29,7 @@ CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster') # The FUNC_PREFIXES dict acts as a simple map to a human-readable name -# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the +# Add ATHENA_PARTITIONER_NAME: 'AthenaPartitionRefresh', to the # below when metrics are supported there FUNC_PREFIXES = { ALERT_MERGER_NAME: 'AlertMerger', @@ -89,7 +89,7 @@ class MetricLogger: ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup) }, ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics - ATHENA_PARTITION_REFRESH_NAME: {}, # Placeholder for future athena processor metrics + ATHENA_PARTITIONER_NAME: {}, # Placeholder for future athena processor metrics CLASSIFIER_FUNCTION_NAME: { FAILED_PARSES: (_default_filter.format(FAILED_PARSES), _default_value_lookup), diff --git a/tests/unit/streamalert/athena_partition_refresh/__init__.py b/tests/unit/streamalert/athena_partitioner/__init__.py similarity index 100% rename from tests/unit/streamalert/athena_partition_refresh/__init__.py rename to tests/unit/streamalert/athena_partitioner/__init__.py diff --git a/tests/unit/streamalert/athena_partition_refresh/test_main.py b/tests/unit/streamalert/athena_partitioner/test_main.py similarity index 90% rename from tests/unit/streamalert/athena_partition_refresh/test_main.py rename to tests/unit/streamalert/athena_partitioner/test_main.py index c9caadd5f..c498298e3 100644 --- a/tests/unit/streamalert/athena_partition_refresh/test_main.py +++ b/tests/unit/streamalert/athena_partitioner/test_main.py @@ -20,7 +20,7 @@ from mock import Mock, patch, call from nose.tools import assert_equal, assert_true -from streamalert.athena_partition_refresh.main import AthenaRefresher +from streamalert.athena_partitioner.main import AthenaPartitioner from streamalert.shared.config import load_config from tests.unit.helpers.aws_mocks import MockAthenaClient @@ -29,20 +29,20 @@ # Without this time.sleep patch, backoff performs sleep # operations and drastically slows down testing @patch('time.sleep', Mock()) -class TestAthenaRefresher: - """Test class for AthenaRefresher when output data in Parquet format""" +class TestAthenaPartitioner: + """Test class for AthenaPartitioner when output data in Parquet format""" - @patch('streamalert.athena_partition_refresh.main.load_config', + @patch('streamalert.athena_partitioner.main.load_config', Mock(return_value=load_config('tests/unit/conf/'))) @patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1'}) @patch('streamalert.shared.athena.boto3') def setup(self, boto_patch): - """Setup the AthenaRefresher tests""" + """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaRefresher() + self._refresher = AthenaPartitioner() def test_add_partitions(self): - """AthenaRefresher - Add Partitions""" + """AthenaPartitioner - Add Partitions""" self._refresher._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.parquet', @@ -69,13 +69,13 @@ def test_add_partitions(self): @patch('logging.Logger.warning') def test_add_partitions_none(self, log_mock): - """AthenaRefresher - Add Partitions, None to Add""" + """AthenaPartitioner - Add Partitions, None to Add""" result = self._refresher._add_partitions() log_mock.assert_called_with('No partitions to add') assert_equal(result, False) def test_get_partitions_from_keys_parquet(self): - """AthenaRefresher - Get Partitions From Keys in parquet format""" + """AthenaPartitioner - Get Partitions From Keys in parquet format""" expected_result = { 'alerts': { '(dt = \'2017-08-26-14\')': ('\'s3://unit-test-streamalerts/' @@ -129,7 +129,7 @@ def test_get_partitions_from_keys_parquet(self): @patch('logging.Logger.warning') def test_get_partitions_from_keys_error(self, log_mock): - """AthenaRefresher - Get Partitions From Keys, Bad Key""" + """AthenaPartitioner - Get Partitions From Keys, Bad Key""" bad_key = b'bad_match_string' self._refresher._s3_buckets_and_keys = { 'unit-test-streamalerts': { @@ -182,10 +182,10 @@ def _s3_record_placeholder_file(): def _create_test_message(count=2, placeholder=False): """Helper function for creating an sqs messsage body""" if placeholder: - body = json.dumps(TestAthenaRefresher._s3_record_placeholder_file()) + body = json.dumps(TestAthenaPartitioner._s3_record_placeholder_file()) else: count = min(count, 30) - body = json.dumps(TestAthenaRefresher._s3_record(count)) + body = json.dumps(TestAthenaPartitioner._s3_record(count)) return { 'Records': [ { @@ -199,9 +199,9 @@ def _create_test_message(count=2, placeholder=False): } @patch('logging.Logger.debug') - @patch('streamalert.athena_partition_refresh.main.AthenaRefresher._add_partitions') + @patch('streamalert.athena_partitioner.main.AthenaPartitioner._add_partitions') def test_run(self, add_mock, log_mock): - """AthenaRefresher - Run""" + """AthenaPartitioner - Run""" add_mock.return_value = True self._refresher.run(self._create_test_message(1)) log_mock.assert_called_with( @@ -212,7 +212,7 @@ def test_run(self, add_mock, log_mock): @patch('logging.Logger.info') def test_run_placeholder_file(self, log_mock): - """AthenaRefresher - Run, Placeholder File""" + """AthenaPartitioner - Run, Placeholder File""" self._refresher.run(self._create_test_message(1, True)) log_mock.assert_has_calls([ call( @@ -223,13 +223,13 @@ def test_run_placeholder_file(self, log_mock): @patch('logging.Logger.warning') def test_run_no_messages(self, log_mock): - """AthenaRefresher - Run, No Messages""" + """AthenaPartitioner - Run, No Messages""" self._refresher.run(self._create_test_message(0)) log_mock.assert_called_with('No partitions to add') @patch('logging.Logger.error') def test_run_invalid_bucket(self, log_mock): - """AthenaRefresher - Run, Bad Bucket Name""" + """AthenaPartitioner - Run, Bad Bucket Name""" event = self._create_test_message(0) bucket = 'bad.bucket.name' s3_record = self._s3_record(1) @@ -241,20 +241,20 @@ def test_run_invalid_bucket(self, log_mock): bucket) @patch('time.sleep', Mock()) -class TestAthenaRefresherJson: - """Test class for AthenaRefresher when output data in JSON format""" +class TestAthenaPartitionerJSON: + """Test class for AthenaPartitioner when output data in JSON format""" - @patch('streamalert.athena_partition_refresh.main.load_config', + @patch('streamalert.athena_partitioner.main.load_config', Mock(return_value=load_config('tests/unit/conf_athena/'))) @patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1'}) @patch('streamalert.shared.athena.boto3') def setup(self, boto_patch): - """Setup the AthenaRefresher tests""" + """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaRefresher() + self._refresher = AthenaPartitioner() def test_get_partitions_from_keys_json(self): - """AthenaRefresher - Get Partitions From Keys in json format""" + """AthenaPartitioner - Get Partitions From Keys in json format""" expected_result = { 'alerts': { '(dt = \'2017-08-26-14\')': ('\'s3://unit-test-streamalerts/' From beaabb72704c5f5c0b4782e31ea27f4c00048a00 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 31 Mar 2020 17:03:52 -0700 Subject: [PATCH 2/7] updating terraform generation code to use tf_lambda module --- streamalert_cli/terraform/athena.py | 79 +++++++------------ .../manage_lambda/test_rollback.py | 2 +- .../streamalert_cli/terraform/test_athena.py | 45 ++++++----- 3 files changed, 57 insertions(+), 69 deletions(-) diff --git a/streamalert_cli/terraform/athena.py b/streamalert_cli/terraform/athena.py index 73f994e10..6abd68531 100644 --- a/streamalert_cli/terraform/athena.py +++ b/streamalert_cli/terraform/athena.py @@ -13,14 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. """ -from streamalert.shared import metrics +from streamalert.shared import ATHENA_PARTITIONER_NAME from streamalert.shared.config import athena_partition_buckets -from streamalert_cli.manage_lambda.package import AthenaPackage +from streamalert_cli.manage_lambda.package import AthenaPartitionerPackage from streamalert_cli.terraform.common import ( infinitedict, - monitoring_topic_arn, s3_access_logging_bucket, ) +from streamalert_cli.terraform.lambda_module import generate_lambda def generate_athena(config): @@ -32,12 +32,12 @@ def generate_athena(config): Returns: dict: Athena dict to be marshalled to JSON """ - athena_dict = infinitedict() - athena_config = config['lambda']['athena_partition_refresh_config'] - - data_buckets = sorted(athena_partition_buckets(config)) + result = infinitedict() prefix = config['global']['account']['prefix'] + athena_config = config['lambda']['athena_partitioner_config'] + + data_buckets = sorted(athena_partition_buckets(config)) database = athena_config.get('database_name', '{}_streamalert'.format(prefix)) results_bucket_name = athena_config.get( @@ -51,52 +51,33 @@ def generate_athena(config): ).strip() logging_bucket, _ = s3_access_logging_bucket(config) - athena_dict['module']['streamalert_athena'] = { - 's3_logging_bucket': logging_bucket, + + # Set variables for the athena partitioner's IAM permissions + result['module']['athena_partitioner_iam'] = { 'source': './modules/tf_athena', + 'account_id': config['global']['account']['aws_account_id'], + 'prefix': prefix, + 's3_logging_bucket': logging_bucket, 'database_name': database, 'queue_name': queue_name, + 'athena_data_buckets': data_buckets, 'results_bucket': results_bucket_name, 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', - 'lambda_handler': AthenaPackage.lambda_handler, - 'lambda_memory': athena_config.get('memory', '128'), - 'lambda_timeout': athena_config.get('timeout', '60'), - 'lambda_log_level': athena_config.get('log_level', 'info'), - 'athena_data_buckets': data_buckets, - 'concurrency_limit': athena_config.get('concurrency_limit', 10), - 'account_id': config['global']['account']['aws_account_id'], - 'prefix': prefix + 'function_role_id': '${module.athena_partitioner_lambda.role_id}', + 'function_name': '${module.athena_partitioner_lambda.function_name}', + 'function_alias_arn': '${module.athena_partitioner_lambda.function_alias_arn}', } - # Cloudwatch monitoring setup - athena_dict['module']['athena_monitoring'] = { - 'source': './modules/tf_monitoring', - 'sns_topic_arn': monitoring_topic_arn(config), - 'lambda_functions': ['{}_streamalert_athena_partition_refresh'.format(prefix)], - 'kinesis_alarms_enabled': False - } - - # Metrics setup - if not athena_config.get('enable_custom_metrics', False): - return athena_dict - - # Check to see if there are any metrics configured for the athena function - current_metrics = metrics.MetricLogger.get_available_metrics() - if metrics.ATHENA_PARTITION_REFRESH_NAME not in current_metrics: - return athena_dict - - metric_prefix = 'AthenaRefresh' - filter_pattern_idx, filter_value_idx = 0, 1 - - # Add filters for the cluster and aggregate - # Use a list of strings that represent the following comma separated values: - # ,, - filters = ['{},{},{}'.format('{}-{}'.format(metric_prefix, metric), - settings[filter_pattern_idx], - settings[filter_value_idx]) - for metric, settings in - current_metrics[metrics.ATHENA_PARTITION_REFRESH_NAME].items()] - - athena_dict['module']['streamalert_athena']['athena_metric_filters'] = filters - - return athena_dict + # Set variables for the Lambda module + result['module']['athena_partitioner_lambda'] = generate_lambda( + '{}_streamalert_{}'.format(prefix, ATHENA_PARTITIONER_NAME), + AthenaPartitionerPackage.package_name + '.zip', + AthenaPartitionerPackage.lambda_handler, + athena_config, + config, + tags={ + 'Subcomponent': 'AthenaPartitioner' + } + ) + + return result diff --git a/tests/unit/streamalert_cli/manage_lambda/test_rollback.py b/tests/unit/streamalert_cli/manage_lambda/test_rollback.py index 650ddb1d0..13087ef3e 100644 --- a/tests/unit/streamalert_cli/manage_lambda/test_rollback.py +++ b/tests/unit/streamalert_cli/manage_lambda/test_rollback.py @@ -76,7 +76,7 @@ def test_rollback_all(self, mock_helper): mock.call(mock.ANY, 'unit-test_streamalert_alert_merger'), mock.call(mock.ANY, 'unit-test_corp_box_admin_events_box_collector_app'), mock.call(mock.ANY, 'unit-test_corp_duo_admin_duo_admin_collector_app'), - mock.call(mock.ANY, 'unit-test_streamalert_athena_partition_refresh'), + mock.call(mock.ANY, 'unit-test_streamalert_athena_partitioner'), mock.call(mock.ANY, 'unit-test_corp_streamalert_classifier'), mock.call(mock.ANY, 'unit-test_prod_streamalert_classifier'), mock.call(mock.ANY, 'unit-test_streamalert_rules_engine'), diff --git a/tests/unit/streamalert_cli/terraform/test_athena.py b/tests/unit/streamalert_cli/terraform/test_athena.py index 2079b35fd..a2b701e35 100644 --- a/tests/unit/streamalert_cli/terraform/test_athena.py +++ b/tests/unit/streamalert_cli/terraform/test_athena.py @@ -20,10 +20,11 @@ CONFIG = CLIConfig(config_path='tests/unit/conf') + def test_generate_athena(): - """CLI - Terraform Generate Athena""" + """CLI - Terraform Generate Athena Partitioner""" - CONFIG['lambda']['athena_partition_refresh_config'] = { + CONFIG['lambda']['athena_partitioner_config'] = { 'timeout': '60', 'memory': '128', 'third_party_libraries': [] @@ -33,32 +34,38 @@ def test_generate_athena(): expected_athena_config = { 'module': { - 'streamalert_athena': { - 's3_logging_bucket': '{}-streamalert-s3-logging'.format(prefix), + 'athena_partitioner_iam': { 'source': './modules/tf_athena', + 's3_logging_bucket': '{}-streamalert-s3-logging'.format(prefix), + 'prefix': 'unit-test', + 'account_id': '12345678910', 'database_name': '{}_streamalert'.format(prefix), 'queue_name': '{}_streamalert_athena_s3_notifications'.format(prefix), 'results_bucket': '{}-streamalert-athena-results'.format(prefix), - 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', - 'lambda_handler': 'streamalert.athena_partition_refresh.main.handler', - 'lambda_log_level': 'info', - 'lambda_memory': '128', - 'lambda_timeout': '60', 'athena_data_buckets': [ 'unit-test-streamalert-data', 'unit-test-streamalerts' ], - 'prefix': 'unit-test', - 'account_id': '12345678910', - 'concurrency_limit': 10 + 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', + 'function_role_id': '${module.athena_partitioner_lambda.role_id}', + 'function_name': '${module.athena_partitioner_lambda.function_name}', + 'function_alias_arn': '${module.athena_partitioner_lambda.function_alias_arn}', }, - 'athena_monitoring': { - 'source': './modules/tf_monitoring', - 'sns_topic_arn': ( - 'arn:aws:sns:us-west-1:12345678910:unit-test_streamalert_monitoring' - ), - 'kinesis_alarms_enabled': False, - 'lambda_functions': ['unit-test_streamalert_athena_partition_refresh'] + 'athena_partitioner_lambda': { + 'description': 'Unit-Test Streamalert Athena Partitioner', + 'environment_variables': { + 'ENABLE_METRICS': '0', + 'LOGGER_LEVEL': 'info' + }, + 'tags': { + 'Subcomponent': 'AthenaPartitioner' + }, + 'filename': 'athena_partitioner.zip', + 'function_name': 'unit-test_streamalert_athena_partitioner', + 'handler': 'streamalert.athena_partitioner.main.handler', + 'memory_size_mb': '128', + 'source': './modules/tf_lambda', + 'timeout_sec': '60', } } } From e85f050a5f822b147dd11b97563d94659b933a57 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 31 Mar 2020 17:07:28 -0700 Subject: [PATCH 3/7] updating tf_athena module to remove lambda code --- .../modules/tf_athena/README.md | 96 +----------- .../_infrastructure/modules/tf_athena/iam.tf | 127 ++++----------- .../_infrastructure/modules/tf_athena/kms.tf | 39 ++++- .../_infrastructure/modules/tf_athena/main.tf | 147 +++++------------- .../modules/tf_athena/outputs.tf | 12 -- .../modules/tf_athena/variables.tf | 39 +---- 6 files changed, 120 insertions(+), 340 deletions(-) diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/README.md b/streamalert_cli/_infrastructure/modules/tf_athena/README.md index be1ba1796..4490ffa44 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/README.md +++ b/streamalert_cli/_infrastructure/modules/tf_athena/README.md @@ -1,88 +1,8 @@ -# StreamAlert Athena Terraform Module -This Terraform module creates a Lambda function for refreshing Athena Partitions once new data is written to S3 - -## Components -* A Python3.7 Lambda Function to perform a table refresh -* IAM Role and Policy to allow for Athena execution -* S3 bucket notifications -* Lambda permissions - -## Example -``` -module "streamalert_athena" { - source = "../modules/tf_athena" - lambda_s3_bucket = "my-source-bucket" - lambda_s3_key = "source/athena_partition_refresh_code.zip" - athena_data_buckets = ["my-org-streamalerts"] -} -``` - -## Inputs - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PropertyDescriptionDefaultRequired
lambda_handlerThe Python function entry point"main.handler"False
lambda_timeoutThe max runtime in seconds for the Lambda function60 secondsFalse
lambda_memoryThe memory allocation in MB for the Lambda function128MBFalse
lambda_s3_bucketThe name of the S3 bucket to store Lambda deployment packagesNoneTrue
lambda_s3_keyThe object in S3 containing the Lambda sourceNoneTrue
lambda_log_levelThe current log level of the Lambda functioninfoFalse
current_versionThe currently published version of the Lambda production aliasNoneTrue
athean_data_bucketsA list of buckets to monitor changes to for Hive partitioningNoneTrue
prefixThe resource prefix, normally an organizational name or descriptorNoneTrue
schedule_expressionThe Cloudwatch-Lambda invocation schedule expressionrate(10 minutes)False
+# Athena Partitioner Permissions +This module adds IAM permissions and other specific resources needed in the Athena partitioner function: + * Athena Database for querying alerts and historical data + * S3 Bucket for storing the results of Athena queries + * SQS Queue for receiving event notifications from S3 buckets + * S3 Event Notifications for sending messages to SQS Queue when objects are created + * KMS Key and Alias for encrypting/decrypting messages on SQS Queue + * Permissions for sending data to SQS Queue and reading/writing data in S3 diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf b/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf index 76a3ede5d..2cb6a492d 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf @@ -1,56 +1,7 @@ -// IAM Role: Lambda Execution Role -resource "aws_iam_role" "athena_partition_role" { - name = "${var.prefix}_athena_partition_refresh" - path = "/streamalert/" - assume_role_policy = data.aws_iam_policy_document.lambda_assume_role_policy.json - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// IAM Policy Doc: Generic Lambda trust relationship policy -data "aws_iam_policy_document" "lambda_assume_role_policy" { - statement { - effect = "Allow" - actions = ["sts:AssumeRole"] - - principals { - type = "Service" - identifiers = ["lambda.amazonaws.com"] - } - } -} - -// IAM Role Policy: Allow the Lambda function to use Cloudwatch logging -resource "aws_iam_role_policy" "cloudwatch" { - name = "CloudWatchPutLogs" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.cloudwatch.json -} - -// IAM Policy Doc: Cloudwatch creation and logging of events -data "aws_iam_policy_document" "cloudwatch" { - statement { - effect = "Allow" - - actions = [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ] - - resources = [ - "arn:aws:logs:*:*:*", - ] - } -} - -// IAM Role Policy: Allow the Lambda function to use Cloudwatch logging +// IAM Role Policy: Allow the function read and delete SQS messages resource "aws_iam_role_policy" "sqs" { name = "SQSReadDeleteMessages" - role = aws_iam_role.athena_partition_role.id + role = var.function_role_id policy = data.aws_iam_policy_document.sqs.json } @@ -92,21 +43,21 @@ data "aws_iam_policy_document" "sqs" { ] resources = [ - aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn, + aws_sqs_queue.data_bucket_notifications.arn, ] } } -// IAM Role Policy: Allow the Lambda function to execute Athena queries +// IAM Role Policy: Allow the Lambda function to execute Athena queries and perform Glue operations // Ref: http://amzn.to/2tSyxUV -resource "aws_iam_role_policy" "athena_query_permissions" { - name = "AthenaQuery" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.athena_permissions.json +resource "aws_iam_role_policy" "athena_glue_permissions" { + name = "AthenaGlueAccess" + role = var.function_role_id + policy = data.aws_iam_policy_document.athena_glue_permissions.json } -// IAM Policy Doc: Athena and S3 permissions -data "aws_iam_policy_document" "athena_permissions" { +// IAM Policy Doc: Athena and Glue permissions +data "aws_iam_policy_document" "athena_glue_permissions" { statement { effect = "Allow" @@ -149,7 +100,18 @@ data "aws_iam_policy_document" "athena_permissions" { "*", ] } +} + +// IAM Role Policy: Allow the Lambda function to read data buckets +resource "aws_iam_role_policy" "athena_results_bucket" { + name = "S3ResultsBucket" + role = var.function_role_id + policy = data.aws_iam_policy_document.athena_results_bucket.json +} +// IAM Policy Doc: Allow Athena to read data from configured buckets +// This is necessary for table repairs +data "aws_iam_policy_document" "athena_results_bucket" { statement { effect = "Allow" @@ -171,15 +133,15 @@ data "aws_iam_policy_document" "athena_permissions" { } // IAM Role Policy: Allow the Lambda function to read data buckets -resource "aws_iam_role_policy" "athena_query_data_bucket_permissions" { - name = "AthenaGetData" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.athena_data_bucket_read.json +resource "aws_iam_role_policy" "data_bucket" { + name = "S3DataBucket" + role = var.function_role_id + policy = data.aws_iam_policy_document.data_bucket.json } // IAM Policy Doc: Allow Athena to read data from configured buckets // This is necessary for table repairs -data "aws_iam_policy_document" "athena_data_bucket_read" { +data "aws_iam_policy_document" "data_bucket" { statement { effect = "Allow" @@ -202,7 +164,7 @@ data "aws_iam_policy_document" "athena_data_bucket_read" { } // IAM Policy Doc: Allow configured data buckets to send SQS messages -data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { +data "aws_iam_policy_document" "data_bucket_sqs" { statement { effect = "Allow" @@ -216,7 +178,7 @@ data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { } resources = [ - aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn, + aws_sqs_queue.data_bucket_notifications.arn, ] condition { @@ -227,36 +189,3 @@ data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { } } } - -// Allow S3 to use the SSE key when publishing events to SQS -data "aws_iam_policy_document" "kms_sse_allow_s3" { - statement { - sid = "Enable IAM User Permissions" - effect = "Allow" - - principals { - type = "AWS" - identifiers = ["arn:aws:iam::${var.account_id}:root"] - } - - actions = ["kms:*"] - resources = ["*"] - } - - statement { - sid = "AllowS3ToUseKey" - effect = "Allow" - - principals { - type = "Service" - identifiers = ["s3.amazonaws.com"] - } - - actions = [ - "kms:Decrypt", - "kms:GenerateDataKey", - ] - - resources = ["*"] - } -} diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf b/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf index 92a1002f3..bfde44fe8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf @@ -3,11 +3,11 @@ resource "aws_kms_key" "sse" { description = "Athena SQS server-side encryption" enable_key_rotation = true - policy = data.aws_iam_policy_document.kms_sse_allow_s3.json + policy = data.aws_iam_policy_document.kms_sse.json tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } } @@ -15,3 +15,36 @@ resource "aws_kms_alias" "sse" { name = "alias/${var.prefix}_streamalert_sqs_sse" target_key_id = aws_kms_key.sse.key_id } + +// Allow S3 to use the SSE key when publishing events to SQS +data "aws_iam_policy_document" "kms_sse" { + statement { + sid = "Enable IAM User Permissions" + effect = "Allow" + + principals { + type = "AWS" + identifiers = ["arn:aws:iam::${var.account_id}:root"] + } + + actions = ["kms:*"] + resources = ["*"] + } + + statement { + sid = "AllowS3ToUseKey" + effect = "Allow" + + principals { + type = "Service" + identifiers = ["s3.amazonaws.com"] + } + + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey", + ] + + resources = ["*"] + } +} diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/main.tf b/streamalert_cli/_infrastructure/modules/tf_athena/main.tf index e884e056a..c340214f8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/main.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/main.tf @@ -1,58 +1,7 @@ -// Lambda Function: Athena Parition Refresh -resource "aws_lambda_function" "athena_partition_refresh" { - function_name = "${var.prefix}_streamalert_athena_partition_refresh" - description = "StreamAlert Athena Refresh" - runtime = "python3.7" - role = aws_iam_role.athena_partition_role.arn - handler = var.lambda_handler - memory_size = var.lambda_memory - timeout = var.lambda_timeout - - filename = var.filename - source_code_hash = filebase64sha256(var.filename) - publish = true - - // Maximum number of concurrent executions allowed - reserved_concurrent_executions = var.concurrency_limit - - environment { - variables = { - LOGGER_LEVEL = var.lambda_log_level - } - } - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// Policy for S3 bucket -data "aws_iam_policy_document" "athena_results_bucket" { - # Force SSL access only - statement { - sid = "ForceSSLOnlyAccess" - - effect = "Deny" - - principals { - type = "AWS" - identifiers = ["*"] - } - - actions = ["s3:*"] - - resources = [ - "arn:aws:s3:::${var.results_bucket}", - "arn:aws:s3:::${var.results_bucket}/*", - ] - - condition { - test = "Bool" - variable = "aws:SecureTransport" - values = ["false"] - } - } +// Athena Database: streamalert +resource "aws_athena_database" "streamalert" { + name = var.database_name + bucket = aws_s3_bucket.athena_results_bucket.bucket } // S3 Bucket: Athena Query Results and Metastore Bucket @@ -63,8 +12,8 @@ resource "aws_s3_bucket" "athena_results_bucket" { force_destroy = false tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } versioning { @@ -86,22 +35,36 @@ resource "aws_s3_bucket" "athena_results_bucket" { } } -// Athena Database: streamalert -resource "aws_athena_database" "streamalert" { - name = var.database_name - bucket = aws_s3_bucket.athena_results_bucket.bucket -} +// Policy for S3 bucket +data "aws_iam_policy_document" "athena_results_bucket" { + # Force SSL access only + statement { + sid = "ForceSSLOnlyAccess" -// Lambda Alias: Athena Function Production Alias -resource "aws_lambda_alias" "athena_partition_refresh_production" { - name = "production" - description = "Production StreamAlert Athena Parition Refresh Alias" - function_name = aws_lambda_function.athena_partition_refresh.arn - function_version = aws_lambda_function.athena_partition_refresh.version + effect = "Deny" + + principals { + type = "AWS" + identifiers = ["*"] + } + + actions = ["s3:*"] + + resources = [ + "arn:aws:s3:::${var.results_bucket}", + "arn:aws:s3:::${var.results_bucket}/*", + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } } // SQS Queue: Athena Data Bucket Notificaitons -resource "aws_sqs_queue" "streamalert_athena_data_bucket_notifications" { +resource "aws_sqs_queue" "data_bucket_notifications" { name = var.queue_name # Enables SQS Long Polling: https://amzn.to/2wn10CR @@ -117,20 +80,20 @@ resource "aws_sqs_queue" "streamalert_athena_data_bucket_notifications" { kms_master_key_id = aws_kms_key.sse.arn tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } } // SQS Queue Policy: Allow data buckets to send SQS messages -resource "aws_sqs_queue_policy" "streamalert_athena_data_bucket_notifications" { - queue_url = aws_sqs_queue.streamalert_athena_data_bucket_notifications.id - policy = data.aws_iam_policy_document.athena_data_bucket_sqs_sendmessage.json +resource "aws_sqs_queue_policy" "data_bucket_notifications" { + queue_url = aws_sqs_queue.data_bucket_notifications.id + policy = data.aws_iam_policy_document.data_bucket_sqs.json } -resource "aws_lambda_event_source_mapping" "streamalert_athena_sqs_event_source" { - event_source_arn = aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn - function_name = "${aws_lambda_function.athena_partition_refresh.arn}:${aws_lambda_alias.athena_partition_refresh_production.name}" +resource "aws_lambda_event_source_mapping" "athena_sqs" { + event_source_arn = aws_sqs_queue.data_bucket_notifications.arn + function_name = var.function_alias_arn } // S3 Bucekt Notificaiton: Configure S3 to notify Lambda @@ -139,35 +102,9 @@ resource "aws_s3_bucket_notification" "bucket_notification" { bucket = element(var.athena_data_buckets, count.index) queue { - queue_arn = aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn + queue_arn = aws_sqs_queue.data_bucket_notifications.arn events = ["s3:ObjectCreated:*"] } - depends_on = [aws_sqs_queue_policy.streamalert_athena_data_bucket_notifications] -} - -// Log Retention Policy -resource "aws_cloudwatch_log_group" "athena" { - name = "/aws/lambda/${aws_lambda_function.athena_partition_refresh.function_name}" - retention_in_days = 14 - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// CloudWatch metric filters for the athena partition refresh function -// The split list is made up of: , , -resource "aws_cloudwatch_log_metric_filter" "athena_partition_refresh_cw_metric_filters" { - count = length(var.athena_metric_filters) - name = element(split(",", var.athena_metric_filters[count.index]), 0) - pattern = element(split(",", var.athena_metric_filters[count.index]), 1) - log_group_name = aws_cloudwatch_log_group.athena.name - - metric_transformation { - name = element(split(",", var.athena_metric_filters[count.index]), 0) - namespace = var.namespace - value = element(split(",", var.athena_metric_filters[count.index]), 2) - } + depends_on = [aws_sqs_queue_policy.data_bucket_notifications] } diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf b/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf index 8fa1d5816..932ce00ce 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf @@ -1,15 +1,3 @@ -output "lambda_arn" { - value = aws_lambda_function.athena_partition_refresh.arn -} - -output "lambda_role_arn" { - value = aws_iam_role.athena_partition_role.arn -} - -output "lambda_role_id" { - value = aws_iam_role.athena_partition_role.id -} - output "results_bucket_arn" { value = aws_s3_bucket.athena_results_bucket.arn } diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf b/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf index 1fd1869b5..5f5e35d96 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf @@ -6,29 +6,16 @@ variable "prefix" { type = string } -variable "lambda_handler" { - type = string - default = "main.handler" +variable "function_role_id" { + description = "Athena Partitioner function IAM Role ID, exported from the tf_lambda module" } -variable "lambda_memory" { - type = string - default = "128" +variable "function_alias_arn" { + description = "Athena Partitioner function alias arn, exported from the tf_lambda module" } -variable "lambda_timeout" { - type = string - default = "60" -} - -variable "filename" { - type = string - default = "athena_partition_refresh.zip" -} - -variable "lambda_log_level" { - type = string - default = "info" +variable "function_name" { + description = "Athena Partitioner function name, exported from the tf_lambda module" } variable "athena_data_buckets" { @@ -54,17 +41,3 @@ variable "database_name" { variable "queue_name" { type = string } - -variable "athena_metric_filters" { - type = list(string) - default = [] -} - -variable "namespace" { - type = string - default = "StreamAlert" -} - -variable "concurrency_limit" { - default = 10 -} From 6d0e727dbff9275d0e584c0ce80976d4315c2a5a Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 31 Mar 2020 17:14:47 -0700 Subject: [PATCH 4/7] updates for packaging, rollback, and deploy --- streamalert_cli/manage_lambda/deploy.py | 2 +- streamalert_cli/manage_lambda/package.py | 12 ++++++------ streamalert_cli/manage_lambda/rollback.py | 2 +- streamalert_cli/terraform/generate.py | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/streamalert_cli/manage_lambda/deploy.py b/streamalert_cli/manage_lambda/deploy.py index b581f5301..6f83c864c 100644 --- a/streamalert_cli/manage_lambda/deploy.py +++ b/streamalert_cli/manage_lambda/deploy.py @@ -211,7 +211,7 @@ def _create(function_name, config, clusters=None): for info in config['clusters'].values()) ), 'athena': PackageMap( - streamalert_packages.AthenaPackage, + streamalert_packages.AthenaPartitionerPackage, {'module.streamalert_athena'}, True ), diff --git a/streamalert_cli/manage_lambda/package.py b/streamalert_cli/manage_lambda/package.py index d8f7d38ce..acc0a3f69 100644 --- a/streamalert_cli/manage_lambda/package.py +++ b/streamalert_cli/manage_lambda/package.py @@ -268,17 +268,17 @@ class AppPackage(LambdaPackage): } -class AthenaPackage(LambdaPackage): - """Create the Athena Partition Refresh Lambda function package""" - config_key = 'athena_partition_refresh_config' - lambda_handler = 'streamalert.athena_partition_refresh.main.handler' +class AthenaPartitionerPackage(LambdaPackage): + """Create the Athena Partitioner Lambda function package""" + config_key = 'athena_partitioner_config' + lambda_handler = 'streamalert.athena_partitioner.main.handler' package_files = { 'conf', 'streamalert/__init__.py', - 'streamalert/athena_partition_refresh', + 'streamalert/athena_partitioner', 'streamalert/shared' } - package_name = 'athena_partition_refresh' + package_name = 'athena_partitioner' package_libs = {'netaddr'} diff --git a/streamalert_cli/manage_lambda/rollback.py b/streamalert_cli/manage_lambda/rollback.py index e12b3bb51..058c80f1e 100644 --- a/streamalert_cli/manage_lambda/rollback.py +++ b/streamalert_cli/manage_lambda/rollback.py @@ -127,7 +127,7 @@ def handler(cls, options, config): if rollback_all or 'athena' in options.function: success = success and _rollback_production( client, - '{}_streamalert_athena_partition_refresh'.format(prefix) + '{}_streamalert_athena_partitioner'.format(prefix) ) if rollback_all or 'classifier' in options.function: diff --git a/streamalert_cli/terraform/generate.py b/streamalert_cli/terraform/generate.py index f6bf40e18..e24b2156f 100644 --- a/streamalert_cli/terraform/generate.py +++ b/streamalert_cli/terraform/generate.py @@ -460,7 +460,7 @@ def terraform_generate_handler(config, init=False, check_tf=True, check_creds=Tr # Setup Athena generate_global_lambda_settings( config, - config_name='athena_partition_refresh_config', + config_name='athena_partitioner_config', generate_func=generate_athena, tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'athena.tf.json'), message='Removing old Athena Terraform file' @@ -607,16 +607,16 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ - if config_name == 'athena_partition_refresh_config': + if config_name == 'athena_partitioner_config': # Raise ConfigError when user doesn't explicitly set `file_format` - # in `athena_partition_refresh_config` in conf/lambda.json when upgrade to v3.1.0. + # in `athena_partitioner_config` in conf/lambda.json when upgrade to v3.1.0. file_format = get_data_file_format(config) if not file_format or file_format not in ('parquet', 'json'): message = ( '[WARNING] ' 'It is required to explicitly set "file_format" for ' - 'athena_partition_refresh_config in "conf/lambda.json" when upgrading to v3.1.0. ' + 'athena_partitioner_config in "conf/lambda.json" when upgrading to v3.1.0. ' 'Available values are "parquet" and "json". For more information, refer to ' 'https://github.com/airbnb/streamalert/issues/1143. ' 'In the future release, the default value of "file_format" will ' From f4f7d64be0291e35422ba6d71c1b2c66473d107e Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 31 Mar 2020 17:15:12 -0700 Subject: [PATCH 5/7] misc updates related to config path renaming, etc --- conf/lambda.json | 4 ++- docs/source/architecture.rst | 2 +- docs/source/getting-started.rst | 4 +-- docs/source/historical-search.rst | 22 +++++++------- streamalert/rule_promotion/promoter.py | 2 +- streamalert/shared/athena.py | 2 +- streamalert/shared/config.py | 4 +-- streamalert/shared/metrics.py | 2 +- streamalert/shared/utils.py | 4 +-- streamalert_cli/athena/handler.py | 10 +++---- streamalert_cli/config.py | 4 +-- streamalert_cli/terraform/handlers.py | 2 +- streamalert_cli/terraform/rule_promotion.py | 2 +- .../terraform/scheduled_queries.py | 4 +-- tests/unit/conf/lambda.json | 2 +- tests/unit/conf_athena/lambda.json | 2 +- tests/unit/helpers/config.py | 4 +-- .../athena_partitioner/test_main.py | 30 +++++++++---------- .../terraform/test_generate.py | 8 ++--- .../terraform/test_rule_promotion.py | 4 ++- tests/unit/streamalert_cli/test_cli_config.py | 4 +-- 21 files changed, 63 insertions(+), 59 deletions(-) diff --git a/conf/lambda.json b/conf/lambda.json index 6396c1ae8..c366943c7 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -50,8 +50,10 @@ "subnet_ids": [] } }, - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, + "memory": 128, + "timeout": 300, "file_format": null, "log_level": "info" }, diff --git a/docs/source/architecture.rst b/docs/source/architecture.rst index 2fe0f0099..45cdd406f 100644 --- a/docs/source/architecture.rst +++ b/docs/source/architecture.rst @@ -40,7 +40,7 @@ configured `outputs `_. All alerts implicitly include a Firehose o an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are successfully delivered, at which point they will be removed from the DynamoDB table. -6. An "athena partition refresh" Lambda function runs periodically to onboard new StreamAlert data +6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data and alerts into their respective Athena databases for historical search. Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst index ee4781006..bcdd47f91 100644 --- a/docs/source/getting-started.rst +++ b/docs/source/getting-started.rst @@ -109,7 +109,7 @@ Deploy .. code-block:: bash - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, "file_format": "parquet", "log_level": "info" @@ -284,7 +284,7 @@ dropdown on the left and preview the ``alerts`` table: :target: _images/athena-alerts-search.png (Here, my name prefix is ``testv2``.) If no records are returned, look for errors -in the Athena Partition Refresh function or try invoking it directly. +in the Athena Partitioner function or try invoking it directly. And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine. Any resulting alerts are delivered to your configured output(s) within a matter of minutes. diff --git a/docs/source/historical-search.rst b/docs/source/historical-search.rst index c8f45c444..38c7c2c4f 100644 --- a/docs/source/historical-search.rst +++ b/docs/source/historical-search.rst @@ -6,7 +6,7 @@ StreamAlert historical search feature is backed by Amazon S3 and `Athena ` and - the :ref:`Athena Partition Refresh` function. Note that + the :ref:`Athena Partitioner` function. Note that the Athena database and alerts table are created automatically when you first deploy StreamAlert. -* If the ``file_format`` value within the :ref:`Athena Partition Refresh` +* If the ``file_format`` value within the :ref:`Athena Partitioner` function config is set to ``parquet``, you can run the ``MSCK REPAIR TABLE alerts`` command in Athena to load all available partitions and then alerts can be searchable. Note, however, that the ``MSCK REPAIR`` command cannot load new partitions automatically. * StreamAlert includes a Lambda function to automatically add new partitions for Athena tables when - the data arrives in S3. See :ref:`configure_athena_partition_refresh_lambda` + the data arrives in S3. See :ref:`configure_athena_partitioner_lambda` .. code-block:: bash { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, "file_format": "parquet", "log_level": "info" } } -* Deploy the Athena Partition Refresh Lambda function +* Deploy the Athena Partitioner Lambda function .. code-block:: bash @@ -109,7 +109,7 @@ It is optional to store data in S3 bucket and available for search in Athena tab .. image:: ../images/athena-data-search.png -.. _configure_athena_partition_refresh_lambda: +.. _configure_athena_partitioner_lambda: ************************* Configure Lambda Settings @@ -120,8 +120,8 @@ Open ``conf/lambda.json``, and fill in the following options: =================================== ======== ==================== =========== Key Required Default Description ----------------------------------- -------- -------------------- ----------- -``enabled`` Yes ``true`` Enables/Disables the Athena Partition Refresh Lambda function -``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function +``enabled`` Yes ``true`` Enables/Disables the Athena Partitioner Lambda function +``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partitioner Lambda function ``log_level`` No ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries. ``memory`` No ``128`` The amount of memory (in MB) allocated to the Lambda function ``timeout`` No ``60`` The maximum duration of the Lambda function (in seconds) @@ -134,7 +134,7 @@ Key Required Default Descriptio .. code-block:: json { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "log_level": "info", "memory": 128, "buckets": { diff --git a/streamalert/rule_promotion/promoter.py b/streamalert/rule_promotion/promoter.py index 11e1a8dca..088f92a58 100644 --- a/streamalert/rule_promotion/promoter.py +++ b/streamalert/rule_promotion/promoter.py @@ -39,7 +39,7 @@ def __init__(self): # Create the rule table class for getting staging information self._rule_table = RuleTable('{}_streamalert_rules'.format(prefix)) - athena_config = self._config['lambda']['athena_partition_refresh_config'] + athena_config = self._config['lambda']['athena_partitioner_config'] # Get the name of the athena database to access db_name = athena_config.get('database_name', get_database_name(self._config)) diff --git a/streamalert/shared/athena.py b/streamalert/shared/athena.py index 6723889a4..6879c38a0 100644 --- a/streamalert/shared/athena.py +++ b/streamalert/shared/athena.py @@ -59,7 +59,7 @@ def __init__(self, database_name, results_bucket, results_prefix, region=None): if not results_bucket.startswith('s3://'): results_bucket = 's3://{}'.format(results_bucket) - # Produces athena_partition_refresh/YYYY/MM/DD S3 keys + # Produces athena_partitioner/YYYY/MM/DD S3 keys self._s3_results_path = posixpath.join( results_bucket, results_prefix, diff --git a/streamalert/shared/config.py b/streamalert/shared/config.py index 150cc5285..dee733f9f 100644 --- a/streamalert/shared/config.py +++ b/streamalert/shared/config.py @@ -122,7 +122,7 @@ def athena_partition_buckets(config): Returns: list: Bucket names for which Athena is enabled """ - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] data_buckets = athena_config.get('buckets', {}) data_buckets[firehose_alerts_bucket(config)] = 'alerts' data_bucket = firehose_data_bucket(config) # Data retention is optional, so check for this @@ -140,7 +140,7 @@ def athena_query_results_bucket(config): Returns: str: The name of the S3 bucket. """ - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] prefix = config['global']['account']['prefix'] return athena_config.get( diff --git a/streamalert/shared/metrics.py b/streamalert/shared/metrics.py index 52a97cd36..65ea17b02 100644 --- a/streamalert/shared/metrics.py +++ b/streamalert/shared/metrics.py @@ -29,7 +29,7 @@ CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster') # The FUNC_PREFIXES dict acts as a simple map to a human-readable name -# Add ATHENA_PARTITIONER_NAME: 'AthenaPartitionRefresh', to the +# Add ATHENA_PARTITIONER_NAME: 'AthenaPartitioner', to the # below when metrics are supported there FUNC_PREFIXES = { ALERT_MERGER_NAME: 'AlertMerger', diff --git a/streamalert/shared/utils.py b/streamalert/shared/utils.py index 30457a180..87ff47b12 100644 --- a/streamalert/shared/utils.py +++ b/streamalert/shared/utils.py @@ -152,7 +152,7 @@ def get_database_name(config): str: The name of the athena database """ prefix = config['global']['account']['prefix'] - athena_config = config['lambda'].get('athena_partition_refresh_config') + athena_config = config['lambda'].get('athena_partitioner_config') return athena_config.get('database_name', '{}_streamalert'.format(prefix)) @@ -163,6 +163,6 @@ def get_data_file_format(config): Returns: str: The data store format either "parquet" or "json" """ - athena_config = config['lambda'].get('athena_partition_refresh_config', {}) + athena_config = config['lambda'].get('athena_partitioner_config', {}) return athena_config.get('file_format') diff --git a/streamalert_cli/athena/handler.py b/streamalert_cli/athena/handler.py index 743f5286a..691798e36 100644 --- a/streamalert_cli/athena/handler.py +++ b/streamalert_cli/athena/handler.py @@ -198,7 +198,7 @@ def get_athena_client(config): AthenaClient: instantiated client for performing athena actions """ prefix = config['global']['account']['prefix'] - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] db_name = get_database_name(config) @@ -451,11 +451,11 @@ def create_table(table, bucket, config, schema_override=None): if table != 'alerts' and bucket != config_data_bucket: # Only add buckets to the config if they are not one of the default/configured buckets # Ensure 'buckets' exists in the config (since it is not required) - config['lambda']['athena_partition_refresh_config']['buckets'] = ( - config['lambda']['athena_partition_refresh_config'].get('buckets', {}) + config['lambda']['athena_partitioner_config']['buckets'] = ( + config['lambda']['athena_partitioner_config'].get('buckets', {}) ) - if bucket not in config['lambda']['athena_partition_refresh_config']['buckets']: - config['lambda']['athena_partition_refresh_config']['buckets'][bucket] = 'data' + if bucket not in config['lambda']['athena_partitioner_config']['buckets']: + config['lambda']['athena_partitioner_config']['buckets'][bucket] = 'data' config.write() LOGGER.info('The %s table was successfully created!', sanitized_table_name) diff --git a/streamalert_cli/config.py b/streamalert_cli/config.py index afe6dd411..3ce23d619 100644 --- a/streamalert_cli/config.py +++ b/streamalert_cli/config.py @@ -59,7 +59,7 @@ def clusters(self): def generate_athena(self): """Generate a base Athena config""" - if 'athena_partition_refresh_config' in self.config['lambda']: + if 'athena_partitioner_config' in self.config['lambda']: LOGGER.warning('The Athena configuration already exists, skipping.') return @@ -71,7 +71,7 @@ def generate_athena(self): 'third_party_libraries': [] } - self.config['lambda']['athena_partition_refresh_config'] = athena_config_template + self.config['lambda']['athena_partitioner_config'] = athena_config_template self.write() LOGGER.info('Athena configuration successfully created') diff --git a/streamalert_cli/terraform/handlers.py b/streamalert_cli/terraform/handlers.py index 967420f5a..538967bc9 100644 --- a/streamalert_cli/terraform/handlers.py +++ b/streamalert_cli/terraform/handlers.py @@ -118,7 +118,7 @@ def handler(cls, options, config): if get_data_file_format(config) == 'json': # Terraform v0.12 now supports creating Athena tables. We will support # to use terraform aws_glue_catalog_table resource to create table only - # when data file_format is set to "parquet" in "athena_partition_refresh_config" + # when data file_format is set to "parquet" in "athena_partitioner_config" # # For "json" file_format, we will continue using Athena DDL query to # create tables. However, this capabity will be faded out in the future diff --git a/streamalert_cli/terraform/rule_promotion.py b/streamalert_cli/terraform/rule_promotion.py index 5ad1b4655..62c5dfadb 100644 --- a/streamalert_cli/terraform/rule_promotion.py +++ b/streamalert_cli/terraform/rule_promotion.py @@ -49,7 +49,7 @@ def generate_rule_promotion(config): 'rules_table_arn': '${module.globals.rules_table_arn}', 'function_alias_arn': '${module.rule_promotion_lambda.function_alias_arn}', 'function_name': '${module.rule_promotion_lambda.function_name}', - 'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}', + 'athena_results_bucket_arn': '${module.athena_partitioner_iam.results_bucket_arn}', 'alerts_bucket': alerts_bucket, 's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}' } diff --git a/streamalert_cli/terraform/scheduled_queries.py b/streamalert_cli/terraform/scheduled_queries.py index 6041e80ca..cf35a62db 100644 --- a/streamalert_cli/terraform/scheduled_queries.py +++ b/streamalert_cli/terraform/scheduled_queries.py @@ -25,14 +25,14 @@ def generate_scheduled_queries_module_configuration(config): # FIXME (derek.wang) # should violently break if athena configurations don't exist. # Alternatively, could read off streamalert_athena module and get more outputs from that. - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] # FIXME (derek.wang) make consistent with streamalert_athena module, # maybe make this dependent on output of that module? database = athena_config.get('database_name', '{}_streamalert'.format(prefix)) # The results bucket cannot reference the output from the streamalert_athena module: - # '${module.streamalert_athena.results_bucket_arn}' + # '${module.athena_partitioner_iam.results_bucket_arn}' # Because it takes a bucket name, not an ARN # FIXME (derek.wang) DRY out this code results_bucket = athena_query_results_bucket(config) diff --git a/tests/unit/conf/lambda.json b/tests/unit/conf/lambda.json index 7d42b54a1..8dacf068a 100644 --- a/tests/unit/conf/lambda.json +++ b/tests/unit/conf/lambda.json @@ -28,7 +28,7 @@ ] } }, - "athena_partition_refresh_config": { + "athena_partitioner_config": { "memory": "128", "timeout": "60", "file_format": "parquet", diff --git a/tests/unit/conf_athena/lambda.json b/tests/unit/conf_athena/lambda.json index 12fade859..7d29268f2 100644 --- a/tests/unit/conf_athena/lambda.json +++ b/tests/unit/conf_athena/lambda.json @@ -1,5 +1,5 @@ { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "file_format": "json", "memory": "128", "timeout": "60" diff --git a/tests/unit/helpers/config.py b/tests/unit/helpers/config.py index 71c2de457..f74139648 100644 --- a/tests/unit/helpers/config.py +++ b/tests/unit/helpers/config.py @@ -110,7 +110,7 @@ def basic_streamalert_config(): 'memory': 128, 'timeout': 10 }, - 'athena_partition_refresh_config': { + 'athena_partitioner_config': { 'enable_custom_metrics': False, 'memory': 128, 'timeout': 60 @@ -288,6 +288,6 @@ def athena_cli_basic_config(): } }, 'lambda': { - 'athena_partition_refresh_config': {} + 'athena_partitioner_config': {} } } diff --git a/tests/unit/streamalert/athena_partitioner/test_main.py b/tests/unit/streamalert/athena_partitioner/test_main.py index c498298e3..b360df323 100644 --- a/tests/unit/streamalert/athena_partitioner/test_main.py +++ b/tests/unit/streamalert/athena_partitioner/test_main.py @@ -39,11 +39,11 @@ class TestAthenaPartitioner: def setup(self, boto_patch): """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaPartitioner() + self._partitioner = AthenaPartitioner() def test_add_partitions(self): """AthenaPartitioner - Add Partitions""" - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.parquet', b'parquet/alerts/dt=2020-02-13-08/prefix_streamalert_alert_delivery-01-abcd.parquet' @@ -63,14 +63,14 @@ def test_add_partitions(self): b'dt=2020-02-12-07/log_type_2_abcd.parquet' } } - result = self._refresher._add_partitions() + result = self._partitioner._add_partitions() assert_true(result) @patch('logging.Logger.warning') def test_add_partitions_none(self, log_mock): """AthenaPartitioner - Add Partitions, None to Add""" - result = self._refresher._add_partitions() + result = self._partitioner._add_partitions() log_mock.assert_called_with('No partitions to add') assert_equal(result, False) @@ -103,7 +103,7 @@ def test_get_partitions_from_keys_parquet(self): } } - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-26-14/rule_name_alerts-1304134918401.parquet', b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.parquet', @@ -123,7 +123,7 @@ def test_get_partitions_from_keys_parquet(self): } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() assert_equal(result, expected_result) @@ -131,13 +131,13 @@ def test_get_partitions_from_keys_parquet(self): def test_get_partitions_from_keys_error(self, log_mock): """AthenaPartitioner - Get Partitions From Keys, Bad Key""" bad_key = b'bad_match_string' - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { bad_key } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() log_mock.assert_called_with('The key %s does not match any regex, skipping', bad_key.decode('utf-8')) @@ -203,7 +203,7 @@ def _create_test_message(count=2, placeholder=False): def test_run(self, add_mock, log_mock): """AthenaPartitioner - Run""" add_mock.return_value = True - self._refresher.run(self._create_test_message(1)) + self._partitioner.run(self._create_test_message(1)) log_mock.assert_called_with( 'Received notification for object \'%s\' in bucket \'%s\'', 'parquet/alerts/dt=2017-08-01-14/02/test.json'.encode(), @@ -213,7 +213,7 @@ def test_run(self, add_mock, log_mock): @patch('logging.Logger.info') def test_run_placeholder_file(self, log_mock): """AthenaPartitioner - Run, Placeholder File""" - self._refresher.run(self._create_test_message(1, True)) + self._partitioner.run(self._create_test_message(1, True)) log_mock.assert_has_calls([ call( 'Skipping placeholder file notification with key: %s', @@ -224,7 +224,7 @@ def test_run_placeholder_file(self, log_mock): @patch('logging.Logger.warning') def test_run_no_messages(self, log_mock): """AthenaPartitioner - Run, No Messages""" - self._refresher.run(self._create_test_message(0)) + self._partitioner.run(self._create_test_message(0)) log_mock.assert_called_with('No partitions to add') @patch('logging.Logger.error') @@ -235,7 +235,7 @@ def test_run_invalid_bucket(self, log_mock): s3_record = self._s3_record(1) s3_record['Records'][0]['s3']['bucket']['name'] = bucket event['Records'][0]['body'] = json.dumps(s3_record) - self._refresher.run(event) + self._partitioner.run(event) log_mock.assert_called_with('\'%s\' not found in \'buckets\' config. Please add this ' 'bucket to enable additions of Hive partitions.', bucket) @@ -251,7 +251,7 @@ class TestAthenaPartitionerJSON: def setup(self, boto_patch): """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaPartitioner() + self._partitioner = AthenaPartitioner() def test_get_partitions_from_keys_json(self): """AthenaPartitioner - Get Partitions From Keys in json format""" @@ -282,7 +282,7 @@ def test_get_partitions_from_keys_json(self): } } - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-26-14/rule_name_alerts-1304134918401.json', b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.json', @@ -302,6 +302,6 @@ def test_get_partitions_from_keys_json(self): } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() assert_equal(result, expected_result) diff --git a/tests/unit/streamalert_cli/terraform/test_generate.py b/tests/unit/streamalert_cli/terraform/test_generate.py index 3d309b6a1..daf257af1 100644 --- a/tests/unit/streamalert_cli/terraform/test_generate.py +++ b/tests/unit/streamalert_cli/terraform/test_generate.py @@ -785,13 +785,13 @@ def test_generate_main_with_sqs_url_false(self): def test_generate_main_file_format_unspecified(self): "CLI - Terraform Generate Main raises error when file_format unspecified" - self.config['lambda']['athena_partition_refresh_config']['file_format'] = None + self.config['lambda']['athena_partitioner_config']['file_format'] = None assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partition_refresh_config', + config_name='athena_partitioner_config', generate_func='test_func', tf_tmp_file='test_tf_tmp_file_path', message='test message' @@ -799,13 +799,13 @@ def test_generate_main_file_format_unspecified(self): def test_generate_main_file_format_misconfigured(self): "CLI - Terraform Generate Main raises error when file_format misconfigured" - self.config['lambda']['athena_partition_refresh_config']['file_format'] = 'Parquet' + self.config['lambda']['athena_partitioner_config']['file_format'] = 'Parquet' assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partition_refresh_config', + config_name='athena_partitioner_config', generate_func='test_func', tf_tmp_file='test_tf_tmp_file_path', message='test message' diff --git a/tests/unit/streamalert_cli/terraform/test_rule_promotion.py b/tests/unit/streamalert_cli/terraform/test_rule_promotion.py index 9f5063b8b..01f2cf49e 100644 --- a/tests/unit/streamalert_cli/terraform/test_rule_promotion.py +++ b/tests/unit/streamalert_cli/terraform/test_rule_promotion.py @@ -42,7 +42,9 @@ def test_generate(self): 'source': './modules/tf_rule_promotion_iam', 'send_digest_schedule_expression': 'cron(30 13 * * ? *)', 'digest_sns_topic': 'unit-test_streamalert_rule_staging_stats', - 'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}', + 'athena_results_bucket_arn': ( + '${module.athena_partitioner_iam.results_bucket_arn}' + ), 'alerts_bucket': 'unit-test-streamalerts', 's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}' }, diff --git a/tests/unit/streamalert_cli/test_cli_config.py b/tests/unit/streamalert_cli/test_cli_config.py index 8ad7925d0..b4cccaefa 100644 --- a/tests/unit/streamalert_cli/test_cli_config.py +++ b/tests/unit/streamalert_cli/test_cli_config.py @@ -63,9 +63,9 @@ def test_load_config(self): def test_toggle_metric(self): """CLI - Metric toggling""" - self.config.toggle_metrics('athena_partition_refresh', enabled=True) + self.config.toggle_metrics('athena_partitioner', enabled=True) assert_equal( - self.config['lambda']['athena_partition_refresh_config']['enable_custom_metrics'], + self.config['lambda']['athena_partitioner_config']['enable_custom_metrics'], True ) From eb2a3bd8ab374ef6a1140e028995a679c6638e5b Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 1 Apr 2020 10:36:54 -0700 Subject: [PATCH 6/7] removing no-longer-used method (athena is default) --- streamalert_cli/config.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/streamalert_cli/config.py b/streamalert_cli/config.py index 3ce23d619..ac620c02c 100644 --- a/streamalert_cli/config.py +++ b/streamalert_cli/config.py @@ -57,25 +57,6 @@ def clusters(self): """Return list of cluster configuration keys""" return list(self.config['clusters'].keys()) - def generate_athena(self): - """Generate a base Athena config""" - if 'athena_partitioner_config' in self.config['lambda']: - LOGGER.warning('The Athena configuration already exists, skipping.') - return - - athena_config_template = { - 'enable_custom_metrics': False, - 'timeout': '60', - 'memory': '128', - 'log_level': 'info', - 'third_party_libraries': [] - } - - self.config['lambda']['athena_partitioner_config'] = athena_config_template - self.write() - - LOGGER.info('Athena configuration successfully created') - def set_prefix(self, prefix): """Set the Org Prefix in Global settings""" if not isinstance(prefix, str): From 6f7e1ffefb2b033f1a07e12f46d4ce34ea58de88 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 1 Apr 2020 12:13:44 -0700 Subject: [PATCH 7/7] addressing PR feedback --- streamalert/shared/athena.py | 2 +- streamalert_cli/terraform/generate.py | 81 +++++++++++-------- .../terraform/test_generate.py | 46 ++++++++--- 3 files changed, 83 insertions(+), 46 deletions(-) diff --git a/streamalert/shared/athena.py b/streamalert/shared/athena.py index 6879c38a0..659ee2873 100644 --- a/streamalert/shared/athena.py +++ b/streamalert/shared/athena.py @@ -59,7 +59,7 @@ def __init__(self, database_name, results_bucket, results_prefix, region=None): if not results_bucket.startswith('s3://'): results_bucket = 's3://{}'.format(results_bucket) - # Produces athena_partitioner/YYYY/MM/DD S3 keys + # Produces s3:////YYYY/MM/DD S3 keys self._s3_results_path = posixpath.join( results_bucket, results_prefix, diff --git a/streamalert_cli/terraform/generate.py b/streamalert_cli/terraform/generate.py index e24b2156f..1268db9b0 100644 --- a/streamalert_cli/terraform/generate.py +++ b/streamalert_cli/terraform/generate.py @@ -457,58 +457,54 @@ def terraform_generate_handler(config, init=False, check_tf=True, check_creds=Tr os.path.join(TERRAFORM_FILES_PATH, 'metric_alarms.tf.json') ) - # Setup Athena - generate_global_lambda_settings( - config, - config_name='athena_partitioner_config', - generate_func=generate_athena, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'athena.tf.json'), - message='Removing old Athena Terraform file' - ) - # Setup Threat Intel Downloader Lambda function if it is enabled generate_global_lambda_settings( config, - config_name='threat_intel_downloader_config', + conf_name='threat_intel_downloader_config', generate_func=generate_threat_intel_downloader, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'ti_downloader.tf.json'), - message='Removing old Threat Intel Downloader Terraform file' + tf_tmp_file_name='ti_downloader', + required=False, ) # Setup Rule Promotion if it is enabled generate_global_lambda_settings( config, - config_name='rule_promotion_config', + conf_name='rule_promotion_config', generate_func=generate_rule_promotion, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'rule_promotion.tf.json'), - message='Removing old Rule Promotion Terraform file' + tf_tmp_file_name='rule_promotion', + required=False, + ) + + # Setup Athena Partitioner + generate_global_lambda_settings( + config, + conf_name='athena_partitioner_config', + generate_func=generate_athena, + tf_tmp_file_name='athena', ) # Setup Rules Engine generate_global_lambda_settings( config, - config_name='rules_engine_config', + conf_name='rules_engine_config', generate_func=generate_rules_engine, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'rules_engine.tf.json'), - message='Removing old Rules Engine Terraform file' + tf_tmp_file_name='rules_engine', ) # Setup Alert Processor generate_global_lambda_settings( config, - config_name='alert_processor_config', + conf_name='alert_processor_config', generate_func=generate_alert_processor, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'alert_processor.tf.json'), - message='Removing old Alert Processor Terraform file' + tf_tmp_file_name='alert_processor', ) # Setup Alert Merger generate_global_lambda_settings( config, - config_name='alert_merger_config', + conf_name='alert_merger_config', generate_func=generate_alert_merger, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'alert_merger.tf.json'), - message='Removing old Alert Merger Terraform file' + tf_tmp_file_name='alert_merger', ) # Setup Lookup Tables if applicable @@ -527,7 +523,7 @@ def _generate_lookup_tables_settings(config): tf_file_name = os.path.join(TERRAFORM_FILES_PATH, 'lookup_tables.tf.json') if not config['lookup_tables'].get('enabled', False): - remove_temp_terraform_file(tf_file_name, 'Removing old LookupTables Terraform file') + remove_temp_terraform_file(tf_file_name) return # Use the lookup_tables.json configuration file to determine which resources we have @@ -545,7 +541,7 @@ def _generate_lookup_tables_settings(config): if not dynamodb_tables and not s3_buckets: # If no resources are configured at all, simply return and do not generate lookuptables # IAM policies - remove_temp_terraform_file(tf_file_name, 'No tables configured') + remove_temp_terraform_file(tf_file_name, extra='No tables configured') return roles = { @@ -588,7 +584,7 @@ def _generate_streamquery_module(config): """ tf_file_name = os.path.join(TERRAFORM_FILES_PATH, 'scheduled_queries.tf.json') if not config.get('scheduled_queries', {}).get('enabled', False): - remove_temp_terraform_file(tf_file_name, 'Removing old scheduled queries Terraform file') + remove_temp_terraform_file(tf_file_name) return _create_terraform_module_file( @@ -597,7 +593,12 @@ def _generate_streamquery_module(config): ) -def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_file, message): +def generate_global_lambda_settings( + config, + conf_name, + generate_func, + tf_tmp_file_name, + required=True): """Generate settings for global Lambda functions Args: @@ -607,7 +608,7 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ - if config_name == 'athena_partitioner_config': + if conf_name == 'athena_partitioner_config': # Raise ConfigError when user doesn't explicitly set `file_format` # in `athena_partitioner_config` in conf/lambda.json when upgrade to v3.1.0. file_format = get_data_file_format(config) @@ -624,26 +625,36 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f ) raise ConfigError(message) - if not config['lambda'].get(config_name): - LOGGER.warning('Config for \'%s\' not in lambda.json', config_name) - remove_temp_terraform_file(tf_tmp_file, message) + tf_tmp_file = os.path.join(TERRAFORM_FILES_PATH, '{}.tf.json'.format(tf_tmp_file_name)) + + if required and conf_name not in config['lambda']: + message = 'Required configuration missing in lambda.json: {}'.format(conf_name) + raise ConfigError(message) + + if not config['lambda'].get(conf_name): + LOGGER.warning('Optional configuration missing in lambda.json, skipping: %s', conf_name) + remove_temp_terraform_file(tf_tmp_file) return - if config['lambda'][config_name].get('enabled', True): + if config['lambda'][conf_name].get('enabled', True): generated_config = generate_func(config=config) if generated_config: _create_terraform_module_file(generated_config, tf_tmp_file) else: - remove_temp_terraform_file(tf_tmp_file, message) + remove_temp_terraform_file(tf_tmp_file) -def remove_temp_terraform_file(tf_tmp_file, message): +def remove_temp_terraform_file(tf_tmp_file, extra=None): """Remove temporal terraform file Args: tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ + if extra: + LOGGER.info(extra) + + message = 'Removing old Terraform file: {}'.format(tf_tmp_file) if os.path.isfile(tf_tmp_file): LOGGER.info(message) os.remove(tf_tmp_file) diff --git a/tests/unit/streamalert_cli/terraform/test_generate.py b/tests/unit/streamalert_cli/terraform/test_generate.py index daf257af1..da6c07a1f 100644 --- a/tests/unit/streamalert_cli/terraform/test_generate.py +++ b/tests/unit/streamalert_cli/terraform/test_generate.py @@ -783,30 +783,56 @@ def test_generate_main_with_sqs_url_false(self): assert_equal(result['module']['globals']['source'], './modules/tf_globals') assert_false(result['module']['globals']['sqs_use_prefix']) - def test_generate_main_file_format_unspecified(self): - "CLI - Terraform Generate Main raises error when file_format unspecified" + def test_generate_athena_lambda_format_unspecified(self): + "CLI - Terraform Generate Global Lambda Settings, Unspecified Athena file_format" self.config['lambda']['athena_partitioner_config']['file_format'] = None assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partitioner_config', + conf_name='athena_partitioner_config', generate_func='test_func', - tf_tmp_file='test_tf_tmp_file_path', - message='test message' + tf_tmp_file_name='test_tf_tmp_file_path', ) - def test_generate_main_file_format_misconfigured(self): - "CLI - Terraform Generate Main raises error when file_format misconfigured" + def test_generate_athena_lambda_format_invalid(self): + "CLI - Terraform Generate Global Lambda Settings, Invalid Athena file_format" self.config['lambda']['athena_partitioner_config']['file_format'] = 'Parquet' assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partitioner_config', + conf_name='athena_partitioner_config', generate_func='test_func', - tf_tmp_file='test_tf_tmp_file_path', - message='test message' + tf_tmp_file_name='test_tf_tmp_file_path', + ) + + def test_generate_required_lambda_invalid_config(self): + "CLI - Terraform Generate Global Lambda Settings, Invalid Config" + + assert_raises( + ConfigError, + generate.generate_global_lambda_settings, + config=self.config, + conf_name='athena_partition_refresh_config', + generate_func='test_func', + tf_tmp_file_name='test_tf_tmp_file_path', + ) + + @patch('logging.Logger.warning') + def test_generate_optional_lambda_not_in_config(self, log_mock): + "CLI - Terraform Generate Global Lambda Settings, Optional Missing in Config" + fake_opt_conf_name = 'fake_optional_conf_name' + generate.generate_global_lambda_settings( + config=self.config, + conf_name=fake_opt_conf_name, + generate_func='test_func', + tf_tmp_file_name='test_tf_tmp_file_path', + required=False, + ) + + log_mock.assert_called_with( + 'Optional configuration missing in lambda.json, skipping: %s', fake_opt_conf_name )