diff --git a/conf/lambda.json b/conf/lambda.json index 6396c1ae8..c366943c7 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -50,8 +50,10 @@ "subnet_ids": [] } }, - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, + "memory": 128, + "timeout": 300, "file_format": null, "log_level": "info" }, diff --git a/docs/source/architecture.rst b/docs/source/architecture.rst index 2fe0f0099..45cdd406f 100644 --- a/docs/source/architecture.rst +++ b/docs/source/architecture.rst @@ -40,7 +40,7 @@ configured `outputs `_. All alerts implicitly include a Firehose o an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are successfully delivered, at which point they will be removed from the DynamoDB table. -6. An "athena partition refresh" Lambda function runs periodically to onboard new StreamAlert data +6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data and alerts into their respective Athena databases for historical search. Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst index ee4781006..bcdd47f91 100644 --- a/docs/source/getting-started.rst +++ b/docs/source/getting-started.rst @@ -109,7 +109,7 @@ Deploy .. code-block:: bash - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, "file_format": "parquet", "log_level": "info" @@ -284,7 +284,7 @@ dropdown on the left and preview the ``alerts`` table: :target: _images/athena-alerts-search.png (Here, my name prefix is ``testv2``.) If no records are returned, look for errors -in the Athena Partition Refresh function or try invoking it directly. +in the Athena Partitioner function or try invoking it directly. And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine. Any resulting alerts are delivered to your configured output(s) within a matter of minutes. diff --git a/docs/source/historical-search.rst b/docs/source/historical-search.rst index c8f45c444..38c7c2c4f 100644 --- a/docs/source/historical-search.rst +++ b/docs/source/historical-search.rst @@ -6,7 +6,7 @@ StreamAlert historical search feature is backed by Amazon S3 and `Athena ` and - the :ref:`Athena Partition Refresh` function. Note that + the :ref:`Athena Partitioner` function. Note that the Athena database and alerts table are created automatically when you first deploy StreamAlert. -* If the ``file_format`` value within the :ref:`Athena Partition Refresh` +* If the ``file_format`` value within the :ref:`Athena Partitioner` function config is set to ``parquet``, you can run the ``MSCK REPAIR TABLE alerts`` command in Athena to load all available partitions and then alerts can be searchable. Note, however, that the ``MSCK REPAIR`` command cannot load new partitions automatically. * StreamAlert includes a Lambda function to automatically add new partitions for Athena tables when - the data arrives in S3. See :ref:`configure_athena_partition_refresh_lambda` + the data arrives in S3. See :ref:`configure_athena_partitioner_lambda` .. code-block:: bash { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "concurrency_limit": 10, "file_format": "parquet", "log_level": "info" } } -* Deploy the Athena Partition Refresh Lambda function +* Deploy the Athena Partitioner Lambda function .. code-block:: bash @@ -109,7 +109,7 @@ It is optional to store data in S3 bucket and available for search in Athena tab .. image:: ../images/athena-data-search.png -.. _configure_athena_partition_refresh_lambda: +.. _configure_athena_partitioner_lambda: ************************* Configure Lambda Settings @@ -120,8 +120,8 @@ Open ``conf/lambda.json``, and fill in the following options: =================================== ======== ==================== =========== Key Required Default Description ----------------------------------- -------- -------------------- ----------- -``enabled`` Yes ``true`` Enables/Disables the Athena Partition Refresh Lambda function -``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function +``enabled`` Yes ``true`` Enables/Disables the Athena Partitioner Lambda function +``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partitioner Lambda function ``log_level`` No ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries. ``memory`` No ``128`` The amount of memory (in MB) allocated to the Lambda function ``timeout`` No ``60`` The maximum duration of the Lambda function (in seconds) @@ -134,7 +134,7 @@ Key Required Default Descriptio .. code-block:: json { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "log_level": "info", "memory": 128, "buckets": { diff --git a/streamalert/athena_partition_refresh/__init__.py b/streamalert/athena_partitioner/__init__.py similarity index 100% rename from streamalert/athena_partition_refresh/__init__.py rename to streamalert/athena_partitioner/__init__.py diff --git a/streamalert/athena_partition_refresh/main.py b/streamalert/athena_partitioner/main.py similarity index 95% rename from streamalert/athena_partition_refresh/main.py rename to streamalert/athena_partitioner/main.py index fb7667bb7..267c5a80f 100644 --- a/streamalert/athena_partition_refresh/main.py +++ b/streamalert/athena_partitioner/main.py @@ -31,11 +31,11 @@ LOGGER = get_logger(__name__) -class AthenaRefreshError(Exception): +class AthenaPartitionerError(Exception): """Generic Athena Partition Error for erroring the Lambda function""" -class AthenaRefresher: +class AthenaPartitioner: """Handle polling an SQS queue and running Athena queries for updating tables""" ALERTS_REGEX = re.compile(r'alerts/dt=(?P\d{4})' @@ -58,14 +58,14 @@ class AthenaRefresher: r'\-(?P\d{2})' r'\-(?P\d{2})\/.*') - ATHENA_S3_PREFIX = 'athena_partition_refresh' + ATHENA_S3_PREFIX = 'athena_partitioner' _ATHENA_CLIENT = None def __init__(self): config = load_config(include={'lambda.json', 'global.json'}) prefix = config['global']['account']['prefix'] - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] self._file_format = get_data_file_format(config) if self._file_format == 'parquet': @@ -78,7 +78,7 @@ def __init__(self): else: message = ( 'file format "{}" is not supported. Supported file format are ' - '"parquet", "json". Please update the setting in athena_partition_refresh_config ' + '"parquet", "json". Please update the setting in athena_partitioner_config ' 'in "conf/lambda.json"'.format(self._file_format) ) raise ConfigError(message) @@ -106,7 +106,7 @@ def _create_client(cls, db_name, results_bucket): # Check if the database exists when the client is created if not cls._ATHENA_CLIENT.check_database_exists(): - raise AthenaRefreshError('The \'{}\' database does not exist'.format(db_name)) + raise AthenaPartitionerError('The \'{}\' database does not exist'.format(db_name)) def _get_partitions_from_keys(self): """Get the partitions that need to be added for the Athena tables @@ -198,7 +198,7 @@ def _add_partitions(self): success = self._ATHENA_CLIENT.run_query(query=query) if not success: - raise AthenaRefreshError( + raise AthenaPartitionerError( 'The add hive partition query has failed:\n{}'.format(query) ) @@ -247,5 +247,5 @@ def run(self, event): def handler(event, _): - """Athena Partition Refresher Handler Function""" - AthenaRefresher().run(event) + """Athena Partitioner Handler Function""" + AthenaPartitioner().run(event) diff --git a/streamalert/rule_promotion/promoter.py b/streamalert/rule_promotion/promoter.py index 11e1a8dca..088f92a58 100644 --- a/streamalert/rule_promotion/promoter.py +++ b/streamalert/rule_promotion/promoter.py @@ -39,7 +39,7 @@ def __init__(self): # Create the rule table class for getting staging information self._rule_table = RuleTable('{}_streamalert_rules'.format(prefix)) - athena_config = self._config['lambda']['athena_partition_refresh_config'] + athena_config = self._config['lambda']['athena_partitioner_config'] # Get the name of the athena database to access db_name = athena_config.get('database_name', get_database_name(self._config)) diff --git a/streamalert/shared/__init__.py b/streamalert/shared/__init__.py index 3832c9e6e..00fdda2f2 100644 --- a/streamalert/shared/__init__.py +++ b/streamalert/shared/__init__.py @@ -1,7 +1,7 @@ """Define some shared resources.""" ALERT_MERGER_NAME = 'alert_merger' ALERT_PROCESSOR_NAME = 'alert_processor' -ATHENA_PARTITION_REFRESH_NAME = 'athena_partition_refresh' +ATHENA_PARTITIONER_NAME = 'athena_partitioner' CLASSIFIER_FUNCTION_NAME = 'classifier' RULES_ENGINE_FUNCTION_NAME = 'rules_engine' RULE_PROMOTION_NAME = 'rule_promotion' diff --git a/streamalert/shared/athena.py b/streamalert/shared/athena.py index 6723889a4..6879c38a0 100644 --- a/streamalert/shared/athena.py +++ b/streamalert/shared/athena.py @@ -59,7 +59,7 @@ def __init__(self, database_name, results_bucket, results_prefix, region=None): if not results_bucket.startswith('s3://'): results_bucket = 's3://{}'.format(results_bucket) - # Produces athena_partition_refresh/YYYY/MM/DD S3 keys + # Produces athena_partitioner/YYYY/MM/DD S3 keys self._s3_results_path = posixpath.join( results_bucket, results_prefix, diff --git a/streamalert/shared/config.py b/streamalert/shared/config.py index 150cc5285..dee733f9f 100644 --- a/streamalert/shared/config.py +++ b/streamalert/shared/config.py @@ -122,7 +122,7 @@ def athena_partition_buckets(config): Returns: list: Bucket names for which Athena is enabled """ - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] data_buckets = athena_config.get('buckets', {}) data_buckets[firehose_alerts_bucket(config)] = 'alerts' data_bucket = firehose_data_bucket(config) # Data retention is optional, so check for this @@ -140,7 +140,7 @@ def athena_query_results_bucket(config): Returns: str: The name of the S3 bucket. """ - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] prefix = config['global']['account']['prefix'] return athena_config.get( diff --git a/streamalert/shared/metrics.py b/streamalert/shared/metrics.py index b16292aac..65ea17b02 100644 --- a/streamalert/shared/metrics.py +++ b/streamalert/shared/metrics.py @@ -18,7 +18,7 @@ from streamalert.shared import ( ALERT_MERGER_NAME, ALERT_PROCESSOR_NAME, - ATHENA_PARTITION_REFRESH_NAME, + ATHENA_PARTITIONER_NAME, CLASSIFIER_FUNCTION_NAME, RULES_ENGINE_FUNCTION_NAME ) @@ -29,7 +29,7 @@ CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster') # The FUNC_PREFIXES dict acts as a simple map to a human-readable name -# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the +# Add ATHENA_PARTITIONER_NAME: 'AthenaPartitioner', to the # below when metrics are supported there FUNC_PREFIXES = { ALERT_MERGER_NAME: 'AlertMerger', @@ -89,7 +89,7 @@ class MetricLogger: ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup) }, ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics - ATHENA_PARTITION_REFRESH_NAME: {}, # Placeholder for future athena processor metrics + ATHENA_PARTITIONER_NAME: {}, # Placeholder for future athena processor metrics CLASSIFIER_FUNCTION_NAME: { FAILED_PARSES: (_default_filter.format(FAILED_PARSES), _default_value_lookup), diff --git a/streamalert/shared/utils.py b/streamalert/shared/utils.py index 30457a180..87ff47b12 100644 --- a/streamalert/shared/utils.py +++ b/streamalert/shared/utils.py @@ -152,7 +152,7 @@ def get_database_name(config): str: The name of the athena database """ prefix = config['global']['account']['prefix'] - athena_config = config['lambda'].get('athena_partition_refresh_config') + athena_config = config['lambda'].get('athena_partitioner_config') return athena_config.get('database_name', '{}_streamalert'.format(prefix)) @@ -163,6 +163,6 @@ def get_data_file_format(config): Returns: str: The data store format either "parquet" or "json" """ - athena_config = config['lambda'].get('athena_partition_refresh_config', {}) + athena_config = config['lambda'].get('athena_partitioner_config', {}) return athena_config.get('file_format') diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/README.md b/streamalert_cli/_infrastructure/modules/tf_athena/README.md index be1ba1796..4490ffa44 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/README.md +++ b/streamalert_cli/_infrastructure/modules/tf_athena/README.md @@ -1,88 +1,8 @@ -# StreamAlert Athena Terraform Module -This Terraform module creates a Lambda function for refreshing Athena Partitions once new data is written to S3 - -## Components -* A Python3.7 Lambda Function to perform a table refresh -* IAM Role and Policy to allow for Athena execution -* S3 bucket notifications -* Lambda permissions - -## Example -``` -module "streamalert_athena" { - source = "../modules/tf_athena" - lambda_s3_bucket = "my-source-bucket" - lambda_s3_key = "source/athena_partition_refresh_code.zip" - athena_data_buckets = ["my-org-streamalerts"] -} -``` - -## Inputs - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PropertyDescriptionDefaultRequired
lambda_handlerThe Python function entry point"main.handler"False
lambda_timeoutThe max runtime in seconds for the Lambda function60 secondsFalse
lambda_memoryThe memory allocation in MB for the Lambda function128MBFalse
lambda_s3_bucketThe name of the S3 bucket to store Lambda deployment packagesNoneTrue
lambda_s3_keyThe object in S3 containing the Lambda sourceNoneTrue
lambda_log_levelThe current log level of the Lambda functioninfoFalse
current_versionThe currently published version of the Lambda production aliasNoneTrue
athean_data_bucketsA list of buckets to monitor changes to for Hive partitioningNoneTrue
prefixThe resource prefix, normally an organizational name or descriptorNoneTrue
schedule_expressionThe Cloudwatch-Lambda invocation schedule expressionrate(10 minutes)False
+# Athena Partitioner Permissions +This module adds IAM permissions and other specific resources needed in the Athena partitioner function: + * Athena Database for querying alerts and historical data + * S3 Bucket for storing the results of Athena queries + * SQS Queue for receiving event notifications from S3 buckets + * S3 Event Notifications for sending messages to SQS Queue when objects are created + * KMS Key and Alias for encrypting/decrypting messages on SQS Queue + * Permissions for sending data to SQS Queue and reading/writing data in S3 diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf b/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf index 76a3ede5d..2cb6a492d 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/iam.tf @@ -1,56 +1,7 @@ -// IAM Role: Lambda Execution Role -resource "aws_iam_role" "athena_partition_role" { - name = "${var.prefix}_athena_partition_refresh" - path = "/streamalert/" - assume_role_policy = data.aws_iam_policy_document.lambda_assume_role_policy.json - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// IAM Policy Doc: Generic Lambda trust relationship policy -data "aws_iam_policy_document" "lambda_assume_role_policy" { - statement { - effect = "Allow" - actions = ["sts:AssumeRole"] - - principals { - type = "Service" - identifiers = ["lambda.amazonaws.com"] - } - } -} - -// IAM Role Policy: Allow the Lambda function to use Cloudwatch logging -resource "aws_iam_role_policy" "cloudwatch" { - name = "CloudWatchPutLogs" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.cloudwatch.json -} - -// IAM Policy Doc: Cloudwatch creation and logging of events -data "aws_iam_policy_document" "cloudwatch" { - statement { - effect = "Allow" - - actions = [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ] - - resources = [ - "arn:aws:logs:*:*:*", - ] - } -} - -// IAM Role Policy: Allow the Lambda function to use Cloudwatch logging +// IAM Role Policy: Allow the function read and delete SQS messages resource "aws_iam_role_policy" "sqs" { name = "SQSReadDeleteMessages" - role = aws_iam_role.athena_partition_role.id + role = var.function_role_id policy = data.aws_iam_policy_document.sqs.json } @@ -92,21 +43,21 @@ data "aws_iam_policy_document" "sqs" { ] resources = [ - aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn, + aws_sqs_queue.data_bucket_notifications.arn, ] } } -// IAM Role Policy: Allow the Lambda function to execute Athena queries +// IAM Role Policy: Allow the Lambda function to execute Athena queries and perform Glue operations // Ref: http://amzn.to/2tSyxUV -resource "aws_iam_role_policy" "athena_query_permissions" { - name = "AthenaQuery" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.athena_permissions.json +resource "aws_iam_role_policy" "athena_glue_permissions" { + name = "AthenaGlueAccess" + role = var.function_role_id + policy = data.aws_iam_policy_document.athena_glue_permissions.json } -// IAM Policy Doc: Athena and S3 permissions -data "aws_iam_policy_document" "athena_permissions" { +// IAM Policy Doc: Athena and Glue permissions +data "aws_iam_policy_document" "athena_glue_permissions" { statement { effect = "Allow" @@ -149,7 +100,18 @@ data "aws_iam_policy_document" "athena_permissions" { "*", ] } +} + +// IAM Role Policy: Allow the Lambda function to read data buckets +resource "aws_iam_role_policy" "athena_results_bucket" { + name = "S3ResultsBucket" + role = var.function_role_id + policy = data.aws_iam_policy_document.athena_results_bucket.json +} +// IAM Policy Doc: Allow Athena to read data from configured buckets +// This is necessary for table repairs +data "aws_iam_policy_document" "athena_results_bucket" { statement { effect = "Allow" @@ -171,15 +133,15 @@ data "aws_iam_policy_document" "athena_permissions" { } // IAM Role Policy: Allow the Lambda function to read data buckets -resource "aws_iam_role_policy" "athena_query_data_bucket_permissions" { - name = "AthenaGetData" - role = aws_iam_role.athena_partition_role.id - policy = data.aws_iam_policy_document.athena_data_bucket_read.json +resource "aws_iam_role_policy" "data_bucket" { + name = "S3DataBucket" + role = var.function_role_id + policy = data.aws_iam_policy_document.data_bucket.json } // IAM Policy Doc: Allow Athena to read data from configured buckets // This is necessary for table repairs -data "aws_iam_policy_document" "athena_data_bucket_read" { +data "aws_iam_policy_document" "data_bucket" { statement { effect = "Allow" @@ -202,7 +164,7 @@ data "aws_iam_policy_document" "athena_data_bucket_read" { } // IAM Policy Doc: Allow configured data buckets to send SQS messages -data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { +data "aws_iam_policy_document" "data_bucket_sqs" { statement { effect = "Allow" @@ -216,7 +178,7 @@ data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { } resources = [ - aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn, + aws_sqs_queue.data_bucket_notifications.arn, ] condition { @@ -227,36 +189,3 @@ data "aws_iam_policy_document" "athena_data_bucket_sqs_sendmessage" { } } } - -// Allow S3 to use the SSE key when publishing events to SQS -data "aws_iam_policy_document" "kms_sse_allow_s3" { - statement { - sid = "Enable IAM User Permissions" - effect = "Allow" - - principals { - type = "AWS" - identifiers = ["arn:aws:iam::${var.account_id}:root"] - } - - actions = ["kms:*"] - resources = ["*"] - } - - statement { - sid = "AllowS3ToUseKey" - effect = "Allow" - - principals { - type = "Service" - identifiers = ["s3.amazonaws.com"] - } - - actions = [ - "kms:Decrypt", - "kms:GenerateDataKey", - ] - - resources = ["*"] - } -} diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf b/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf index 92a1002f3..bfde44fe8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/kms.tf @@ -3,11 +3,11 @@ resource "aws_kms_key" "sse" { description = "Athena SQS server-side encryption" enable_key_rotation = true - policy = data.aws_iam_policy_document.kms_sse_allow_s3.json + policy = data.aws_iam_policy_document.kms_sse.json tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } } @@ -15,3 +15,36 @@ resource "aws_kms_alias" "sse" { name = "alias/${var.prefix}_streamalert_sqs_sse" target_key_id = aws_kms_key.sse.key_id } + +// Allow S3 to use the SSE key when publishing events to SQS +data "aws_iam_policy_document" "kms_sse" { + statement { + sid = "Enable IAM User Permissions" + effect = "Allow" + + principals { + type = "AWS" + identifiers = ["arn:aws:iam::${var.account_id}:root"] + } + + actions = ["kms:*"] + resources = ["*"] + } + + statement { + sid = "AllowS3ToUseKey" + effect = "Allow" + + principals { + type = "Service" + identifiers = ["s3.amazonaws.com"] + } + + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey", + ] + + resources = ["*"] + } +} diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/main.tf b/streamalert_cli/_infrastructure/modules/tf_athena/main.tf index e884e056a..c340214f8 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/main.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/main.tf @@ -1,58 +1,7 @@ -// Lambda Function: Athena Parition Refresh -resource "aws_lambda_function" "athena_partition_refresh" { - function_name = "${var.prefix}_streamalert_athena_partition_refresh" - description = "StreamAlert Athena Refresh" - runtime = "python3.7" - role = aws_iam_role.athena_partition_role.arn - handler = var.lambda_handler - memory_size = var.lambda_memory - timeout = var.lambda_timeout - - filename = var.filename - source_code_hash = filebase64sha256(var.filename) - publish = true - - // Maximum number of concurrent executions allowed - reserved_concurrent_executions = var.concurrency_limit - - environment { - variables = { - LOGGER_LEVEL = var.lambda_log_level - } - } - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// Policy for S3 bucket -data "aws_iam_policy_document" "athena_results_bucket" { - # Force SSL access only - statement { - sid = "ForceSSLOnlyAccess" - - effect = "Deny" - - principals { - type = "AWS" - identifiers = ["*"] - } - - actions = ["s3:*"] - - resources = [ - "arn:aws:s3:::${var.results_bucket}", - "arn:aws:s3:::${var.results_bucket}/*", - ] - - condition { - test = "Bool" - variable = "aws:SecureTransport" - values = ["false"] - } - } +// Athena Database: streamalert +resource "aws_athena_database" "streamalert" { + name = var.database_name + bucket = aws_s3_bucket.athena_results_bucket.bucket } // S3 Bucket: Athena Query Results and Metastore Bucket @@ -63,8 +12,8 @@ resource "aws_s3_bucket" "athena_results_bucket" { force_destroy = false tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } versioning { @@ -86,22 +35,36 @@ resource "aws_s3_bucket" "athena_results_bucket" { } } -// Athena Database: streamalert -resource "aws_athena_database" "streamalert" { - name = var.database_name - bucket = aws_s3_bucket.athena_results_bucket.bucket -} +// Policy for S3 bucket +data "aws_iam_policy_document" "athena_results_bucket" { + # Force SSL access only + statement { + sid = "ForceSSLOnlyAccess" -// Lambda Alias: Athena Function Production Alias -resource "aws_lambda_alias" "athena_partition_refresh_production" { - name = "production" - description = "Production StreamAlert Athena Parition Refresh Alias" - function_name = aws_lambda_function.athena_partition_refresh.arn - function_version = aws_lambda_function.athena_partition_refresh.version + effect = "Deny" + + principals { + type = "AWS" + identifiers = ["*"] + } + + actions = ["s3:*"] + + resources = [ + "arn:aws:s3:::${var.results_bucket}", + "arn:aws:s3:::${var.results_bucket}/*", + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } } // SQS Queue: Athena Data Bucket Notificaitons -resource "aws_sqs_queue" "streamalert_athena_data_bucket_notifications" { +resource "aws_sqs_queue" "data_bucket_notifications" { name = var.queue_name # Enables SQS Long Polling: https://amzn.to/2wn10CR @@ -117,20 +80,20 @@ resource "aws_sqs_queue" "streamalert_athena_data_bucket_notifications" { kms_master_key_id = aws_kms_key.sse.arn tags = { - Name = "StreamAlert" - AltName = "Athena" + Name = "StreamAlert" + Subcomponent = "AthenaPartitioner" } } // SQS Queue Policy: Allow data buckets to send SQS messages -resource "aws_sqs_queue_policy" "streamalert_athena_data_bucket_notifications" { - queue_url = aws_sqs_queue.streamalert_athena_data_bucket_notifications.id - policy = data.aws_iam_policy_document.athena_data_bucket_sqs_sendmessage.json +resource "aws_sqs_queue_policy" "data_bucket_notifications" { + queue_url = aws_sqs_queue.data_bucket_notifications.id + policy = data.aws_iam_policy_document.data_bucket_sqs.json } -resource "aws_lambda_event_source_mapping" "streamalert_athena_sqs_event_source" { - event_source_arn = aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn - function_name = "${aws_lambda_function.athena_partition_refresh.arn}:${aws_lambda_alias.athena_partition_refresh_production.name}" +resource "aws_lambda_event_source_mapping" "athena_sqs" { + event_source_arn = aws_sqs_queue.data_bucket_notifications.arn + function_name = var.function_alias_arn } // S3 Bucekt Notificaiton: Configure S3 to notify Lambda @@ -139,35 +102,9 @@ resource "aws_s3_bucket_notification" "bucket_notification" { bucket = element(var.athena_data_buckets, count.index) queue { - queue_arn = aws_sqs_queue.streamalert_athena_data_bucket_notifications.arn + queue_arn = aws_sqs_queue.data_bucket_notifications.arn events = ["s3:ObjectCreated:*"] } - depends_on = [aws_sqs_queue_policy.streamalert_athena_data_bucket_notifications] -} - -// Log Retention Policy -resource "aws_cloudwatch_log_group" "athena" { - name = "/aws/lambda/${aws_lambda_function.athena_partition_refresh.function_name}" - retention_in_days = 14 - - tags = { - Name = "StreamAlert" - AltName = "Athena" - } -} - -// CloudWatch metric filters for the athena partition refresh function -// The split list is made up of: , , -resource "aws_cloudwatch_log_metric_filter" "athena_partition_refresh_cw_metric_filters" { - count = length(var.athena_metric_filters) - name = element(split(",", var.athena_metric_filters[count.index]), 0) - pattern = element(split(",", var.athena_metric_filters[count.index]), 1) - log_group_name = aws_cloudwatch_log_group.athena.name - - metric_transformation { - name = element(split(",", var.athena_metric_filters[count.index]), 0) - namespace = var.namespace - value = element(split(",", var.athena_metric_filters[count.index]), 2) - } + depends_on = [aws_sqs_queue_policy.data_bucket_notifications] } diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf b/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf index 8fa1d5816..932ce00ce 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/outputs.tf @@ -1,15 +1,3 @@ -output "lambda_arn" { - value = aws_lambda_function.athena_partition_refresh.arn -} - -output "lambda_role_arn" { - value = aws_iam_role.athena_partition_role.arn -} - -output "lambda_role_id" { - value = aws_iam_role.athena_partition_role.id -} - output "results_bucket_arn" { value = aws_s3_bucket.athena_results_bucket.arn } diff --git a/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf b/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf index 1fd1869b5..5f5e35d96 100644 --- a/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf +++ b/streamalert_cli/_infrastructure/modules/tf_athena/variables.tf @@ -6,29 +6,16 @@ variable "prefix" { type = string } -variable "lambda_handler" { - type = string - default = "main.handler" +variable "function_role_id" { + description = "Athena Partitioner function IAM Role ID, exported from the tf_lambda module" } -variable "lambda_memory" { - type = string - default = "128" +variable "function_alias_arn" { + description = "Athena Partitioner function alias arn, exported from the tf_lambda module" } -variable "lambda_timeout" { - type = string - default = "60" -} - -variable "filename" { - type = string - default = "athena_partition_refresh.zip" -} - -variable "lambda_log_level" { - type = string - default = "info" +variable "function_name" { + description = "Athena Partitioner function name, exported from the tf_lambda module" } variable "athena_data_buckets" { @@ -54,17 +41,3 @@ variable "database_name" { variable "queue_name" { type = string } - -variable "athena_metric_filters" { - type = list(string) - default = [] -} - -variable "namespace" { - type = string - default = "StreamAlert" -} - -variable "concurrency_limit" { - default = 10 -} diff --git a/streamalert_cli/athena/handler.py b/streamalert_cli/athena/handler.py index 743f5286a..691798e36 100644 --- a/streamalert_cli/athena/handler.py +++ b/streamalert_cli/athena/handler.py @@ -198,7 +198,7 @@ def get_athena_client(config): AthenaClient: instantiated client for performing athena actions """ prefix = config['global']['account']['prefix'] - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] db_name = get_database_name(config) @@ -451,11 +451,11 @@ def create_table(table, bucket, config, schema_override=None): if table != 'alerts' and bucket != config_data_bucket: # Only add buckets to the config if they are not one of the default/configured buckets # Ensure 'buckets' exists in the config (since it is not required) - config['lambda']['athena_partition_refresh_config']['buckets'] = ( - config['lambda']['athena_partition_refresh_config'].get('buckets', {}) + config['lambda']['athena_partitioner_config']['buckets'] = ( + config['lambda']['athena_partitioner_config'].get('buckets', {}) ) - if bucket not in config['lambda']['athena_partition_refresh_config']['buckets']: - config['lambda']['athena_partition_refresh_config']['buckets'][bucket] = 'data' + if bucket not in config['lambda']['athena_partitioner_config']['buckets']: + config['lambda']['athena_partitioner_config']['buckets'][bucket] = 'data' config.write() LOGGER.info('The %s table was successfully created!', sanitized_table_name) diff --git a/streamalert_cli/config.py b/streamalert_cli/config.py index afe6dd411..ac620c02c 100644 --- a/streamalert_cli/config.py +++ b/streamalert_cli/config.py @@ -57,25 +57,6 @@ def clusters(self): """Return list of cluster configuration keys""" return list(self.config['clusters'].keys()) - def generate_athena(self): - """Generate a base Athena config""" - if 'athena_partition_refresh_config' in self.config['lambda']: - LOGGER.warning('The Athena configuration already exists, skipping.') - return - - athena_config_template = { - 'enable_custom_metrics': False, - 'timeout': '60', - 'memory': '128', - 'log_level': 'info', - 'third_party_libraries': [] - } - - self.config['lambda']['athena_partition_refresh_config'] = athena_config_template - self.write() - - LOGGER.info('Athena configuration successfully created') - def set_prefix(self, prefix): """Set the Org Prefix in Global settings""" if not isinstance(prefix, str): diff --git a/streamalert_cli/manage_lambda/deploy.py b/streamalert_cli/manage_lambda/deploy.py index b581f5301..6f83c864c 100644 --- a/streamalert_cli/manage_lambda/deploy.py +++ b/streamalert_cli/manage_lambda/deploy.py @@ -211,7 +211,7 @@ def _create(function_name, config, clusters=None): for info in config['clusters'].values()) ), 'athena': PackageMap( - streamalert_packages.AthenaPackage, + streamalert_packages.AthenaPartitionerPackage, {'module.streamalert_athena'}, True ), diff --git a/streamalert_cli/manage_lambda/package.py b/streamalert_cli/manage_lambda/package.py index d8f7d38ce..acc0a3f69 100644 --- a/streamalert_cli/manage_lambda/package.py +++ b/streamalert_cli/manage_lambda/package.py @@ -268,17 +268,17 @@ class AppPackage(LambdaPackage): } -class AthenaPackage(LambdaPackage): - """Create the Athena Partition Refresh Lambda function package""" - config_key = 'athena_partition_refresh_config' - lambda_handler = 'streamalert.athena_partition_refresh.main.handler' +class AthenaPartitionerPackage(LambdaPackage): + """Create the Athena Partitioner Lambda function package""" + config_key = 'athena_partitioner_config' + lambda_handler = 'streamalert.athena_partitioner.main.handler' package_files = { 'conf', 'streamalert/__init__.py', - 'streamalert/athena_partition_refresh', + 'streamalert/athena_partitioner', 'streamalert/shared' } - package_name = 'athena_partition_refresh' + package_name = 'athena_partitioner' package_libs = {'netaddr'} diff --git a/streamalert_cli/manage_lambda/rollback.py b/streamalert_cli/manage_lambda/rollback.py index e12b3bb51..058c80f1e 100644 --- a/streamalert_cli/manage_lambda/rollback.py +++ b/streamalert_cli/manage_lambda/rollback.py @@ -127,7 +127,7 @@ def handler(cls, options, config): if rollback_all or 'athena' in options.function: success = success and _rollback_production( client, - '{}_streamalert_athena_partition_refresh'.format(prefix) + '{}_streamalert_athena_partitioner'.format(prefix) ) if rollback_all or 'classifier' in options.function: diff --git a/streamalert_cli/terraform/athena.py b/streamalert_cli/terraform/athena.py index 73f994e10..6abd68531 100644 --- a/streamalert_cli/terraform/athena.py +++ b/streamalert_cli/terraform/athena.py @@ -13,14 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. """ -from streamalert.shared import metrics +from streamalert.shared import ATHENA_PARTITIONER_NAME from streamalert.shared.config import athena_partition_buckets -from streamalert_cli.manage_lambda.package import AthenaPackage +from streamalert_cli.manage_lambda.package import AthenaPartitionerPackage from streamalert_cli.terraform.common import ( infinitedict, - monitoring_topic_arn, s3_access_logging_bucket, ) +from streamalert_cli.terraform.lambda_module import generate_lambda def generate_athena(config): @@ -32,12 +32,12 @@ def generate_athena(config): Returns: dict: Athena dict to be marshalled to JSON """ - athena_dict = infinitedict() - athena_config = config['lambda']['athena_partition_refresh_config'] - - data_buckets = sorted(athena_partition_buckets(config)) + result = infinitedict() prefix = config['global']['account']['prefix'] + athena_config = config['lambda']['athena_partitioner_config'] + + data_buckets = sorted(athena_partition_buckets(config)) database = athena_config.get('database_name', '{}_streamalert'.format(prefix)) results_bucket_name = athena_config.get( @@ -51,52 +51,33 @@ def generate_athena(config): ).strip() logging_bucket, _ = s3_access_logging_bucket(config) - athena_dict['module']['streamalert_athena'] = { - 's3_logging_bucket': logging_bucket, + + # Set variables for the athena partitioner's IAM permissions + result['module']['athena_partitioner_iam'] = { 'source': './modules/tf_athena', + 'account_id': config['global']['account']['aws_account_id'], + 'prefix': prefix, + 's3_logging_bucket': logging_bucket, 'database_name': database, 'queue_name': queue_name, + 'athena_data_buckets': data_buckets, 'results_bucket': results_bucket_name, 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', - 'lambda_handler': AthenaPackage.lambda_handler, - 'lambda_memory': athena_config.get('memory', '128'), - 'lambda_timeout': athena_config.get('timeout', '60'), - 'lambda_log_level': athena_config.get('log_level', 'info'), - 'athena_data_buckets': data_buckets, - 'concurrency_limit': athena_config.get('concurrency_limit', 10), - 'account_id': config['global']['account']['aws_account_id'], - 'prefix': prefix + 'function_role_id': '${module.athena_partitioner_lambda.role_id}', + 'function_name': '${module.athena_partitioner_lambda.function_name}', + 'function_alias_arn': '${module.athena_partitioner_lambda.function_alias_arn}', } - # Cloudwatch monitoring setup - athena_dict['module']['athena_monitoring'] = { - 'source': './modules/tf_monitoring', - 'sns_topic_arn': monitoring_topic_arn(config), - 'lambda_functions': ['{}_streamalert_athena_partition_refresh'.format(prefix)], - 'kinesis_alarms_enabled': False - } - - # Metrics setup - if not athena_config.get('enable_custom_metrics', False): - return athena_dict - - # Check to see if there are any metrics configured for the athena function - current_metrics = metrics.MetricLogger.get_available_metrics() - if metrics.ATHENA_PARTITION_REFRESH_NAME not in current_metrics: - return athena_dict - - metric_prefix = 'AthenaRefresh' - filter_pattern_idx, filter_value_idx = 0, 1 - - # Add filters for the cluster and aggregate - # Use a list of strings that represent the following comma separated values: - # ,, - filters = ['{},{},{}'.format('{}-{}'.format(metric_prefix, metric), - settings[filter_pattern_idx], - settings[filter_value_idx]) - for metric, settings in - current_metrics[metrics.ATHENA_PARTITION_REFRESH_NAME].items()] - - athena_dict['module']['streamalert_athena']['athena_metric_filters'] = filters - - return athena_dict + # Set variables for the Lambda module + result['module']['athena_partitioner_lambda'] = generate_lambda( + '{}_streamalert_{}'.format(prefix, ATHENA_PARTITIONER_NAME), + AthenaPartitionerPackage.package_name + '.zip', + AthenaPartitionerPackage.lambda_handler, + athena_config, + config, + tags={ + 'Subcomponent': 'AthenaPartitioner' + } + ) + + return result diff --git a/streamalert_cli/terraform/generate.py b/streamalert_cli/terraform/generate.py index f6bf40e18..1268db9b0 100644 --- a/streamalert_cli/terraform/generate.py +++ b/streamalert_cli/terraform/generate.py @@ -457,58 +457,54 @@ def terraform_generate_handler(config, init=False, check_tf=True, check_creds=Tr os.path.join(TERRAFORM_FILES_PATH, 'metric_alarms.tf.json') ) - # Setup Athena - generate_global_lambda_settings( - config, - config_name='athena_partition_refresh_config', - generate_func=generate_athena, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'athena.tf.json'), - message='Removing old Athena Terraform file' - ) - # Setup Threat Intel Downloader Lambda function if it is enabled generate_global_lambda_settings( config, - config_name='threat_intel_downloader_config', + conf_name='threat_intel_downloader_config', generate_func=generate_threat_intel_downloader, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'ti_downloader.tf.json'), - message='Removing old Threat Intel Downloader Terraform file' + tf_tmp_file_name='ti_downloader', + required=False, ) # Setup Rule Promotion if it is enabled generate_global_lambda_settings( config, - config_name='rule_promotion_config', + conf_name='rule_promotion_config', generate_func=generate_rule_promotion, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'rule_promotion.tf.json'), - message='Removing old Rule Promotion Terraform file' + tf_tmp_file_name='rule_promotion', + required=False, + ) + + # Setup Athena Partitioner + generate_global_lambda_settings( + config, + conf_name='athena_partitioner_config', + generate_func=generate_athena, + tf_tmp_file_name='athena', ) # Setup Rules Engine generate_global_lambda_settings( config, - config_name='rules_engine_config', + conf_name='rules_engine_config', generate_func=generate_rules_engine, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'rules_engine.tf.json'), - message='Removing old Rules Engine Terraform file' + tf_tmp_file_name='rules_engine', ) # Setup Alert Processor generate_global_lambda_settings( config, - config_name='alert_processor_config', + conf_name='alert_processor_config', generate_func=generate_alert_processor, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'alert_processor.tf.json'), - message='Removing old Alert Processor Terraform file' + tf_tmp_file_name='alert_processor', ) # Setup Alert Merger generate_global_lambda_settings( config, - config_name='alert_merger_config', + conf_name='alert_merger_config', generate_func=generate_alert_merger, - tf_tmp_file=os.path.join(TERRAFORM_FILES_PATH, 'alert_merger.tf.json'), - message='Removing old Alert Merger Terraform file' + tf_tmp_file_name='alert_merger', ) # Setup Lookup Tables if applicable @@ -527,7 +523,7 @@ def _generate_lookup_tables_settings(config): tf_file_name = os.path.join(TERRAFORM_FILES_PATH, 'lookup_tables.tf.json') if not config['lookup_tables'].get('enabled', False): - remove_temp_terraform_file(tf_file_name, 'Removing old LookupTables Terraform file') + remove_temp_terraform_file(tf_file_name) return # Use the lookup_tables.json configuration file to determine which resources we have @@ -545,7 +541,7 @@ def _generate_lookup_tables_settings(config): if not dynamodb_tables and not s3_buckets: # If no resources are configured at all, simply return and do not generate lookuptables # IAM policies - remove_temp_terraform_file(tf_file_name, 'No tables configured') + remove_temp_terraform_file(tf_file_name, extra='No tables configured') return roles = { @@ -588,7 +584,7 @@ def _generate_streamquery_module(config): """ tf_file_name = os.path.join(TERRAFORM_FILES_PATH, 'scheduled_queries.tf.json') if not config.get('scheduled_queries', {}).get('enabled', False): - remove_temp_terraform_file(tf_file_name, 'Removing old scheduled queries Terraform file') + remove_temp_terraform_file(tf_file_name) return _create_terraform_module_file( @@ -597,7 +593,12 @@ def _generate_streamquery_module(config): ) -def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_file, message): +def generate_global_lambda_settings( + config, + conf_name, + generate_func, + tf_tmp_file_name, + required=True): """Generate settings for global Lambda functions Args: @@ -607,16 +608,16 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ - if config_name == 'athena_partition_refresh_config': + if conf_name == 'athena_partitioner_config': # Raise ConfigError when user doesn't explicitly set `file_format` - # in `athena_partition_refresh_config` in conf/lambda.json when upgrade to v3.1.0. + # in `athena_partitioner_config` in conf/lambda.json when upgrade to v3.1.0. file_format = get_data_file_format(config) if not file_format or file_format not in ('parquet', 'json'): message = ( '[WARNING] ' 'It is required to explicitly set "file_format" for ' - 'athena_partition_refresh_config in "conf/lambda.json" when upgrading to v3.1.0. ' + 'athena_partitioner_config in "conf/lambda.json" when upgrading to v3.1.0. ' 'Available values are "parquet" and "json". For more information, refer to ' 'https://github.com/airbnb/streamalert/issues/1143. ' 'In the future release, the default value of "file_format" will ' @@ -624,26 +625,36 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f ) raise ConfigError(message) - if not config['lambda'].get(config_name): - LOGGER.warning('Config for \'%s\' not in lambda.json', config_name) - remove_temp_terraform_file(tf_tmp_file, message) + tf_tmp_file = os.path.join(TERRAFORM_FILES_PATH, '{}.tf.json'.format(tf_tmp_file_name)) + + if required and conf_name not in config['lambda']: + message = 'Required configuration missing in lambda.json: {}'.format(conf_name) + raise ConfigError(message) + + if not config['lambda'].get(conf_name): + LOGGER.warning('Optional configuration missing in lambda.json, skipping: %s', conf_name) + remove_temp_terraform_file(tf_tmp_file) return - if config['lambda'][config_name].get('enabled', True): + if config['lambda'][conf_name].get('enabled', True): generated_config = generate_func(config=config) if generated_config: _create_terraform_module_file(generated_config, tf_tmp_file) else: - remove_temp_terraform_file(tf_tmp_file, message) + remove_temp_terraform_file(tf_tmp_file) -def remove_temp_terraform_file(tf_tmp_file, message): +def remove_temp_terraform_file(tf_tmp_file, extra=None): """Remove temporal terraform file Args: tf_tmp_file (str): filename of terraform file, generated by CLI. message (str): Message will be logged by LOGGER. """ + if extra: + LOGGER.info(extra) + + message = 'Removing old Terraform file: {}'.format(tf_tmp_file) if os.path.isfile(tf_tmp_file): LOGGER.info(message) os.remove(tf_tmp_file) diff --git a/streamalert_cli/terraform/handlers.py b/streamalert_cli/terraform/handlers.py index 967420f5a..538967bc9 100644 --- a/streamalert_cli/terraform/handlers.py +++ b/streamalert_cli/terraform/handlers.py @@ -118,7 +118,7 @@ def handler(cls, options, config): if get_data_file_format(config) == 'json': # Terraform v0.12 now supports creating Athena tables. We will support # to use terraform aws_glue_catalog_table resource to create table only - # when data file_format is set to "parquet" in "athena_partition_refresh_config" + # when data file_format is set to "parquet" in "athena_partitioner_config" # # For "json" file_format, we will continue using Athena DDL query to # create tables. However, this capabity will be faded out in the future diff --git a/streamalert_cli/terraform/rule_promotion.py b/streamalert_cli/terraform/rule_promotion.py index 5ad1b4655..62c5dfadb 100644 --- a/streamalert_cli/terraform/rule_promotion.py +++ b/streamalert_cli/terraform/rule_promotion.py @@ -49,7 +49,7 @@ def generate_rule_promotion(config): 'rules_table_arn': '${module.globals.rules_table_arn}', 'function_alias_arn': '${module.rule_promotion_lambda.function_alias_arn}', 'function_name': '${module.rule_promotion_lambda.function_name}', - 'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}', + 'athena_results_bucket_arn': '${module.athena_partitioner_iam.results_bucket_arn}', 'alerts_bucket': alerts_bucket, 's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}' } diff --git a/streamalert_cli/terraform/scheduled_queries.py b/streamalert_cli/terraform/scheduled_queries.py index 6041e80ca..cf35a62db 100644 --- a/streamalert_cli/terraform/scheduled_queries.py +++ b/streamalert_cli/terraform/scheduled_queries.py @@ -25,14 +25,14 @@ def generate_scheduled_queries_module_configuration(config): # FIXME (derek.wang) # should violently break if athena configurations don't exist. # Alternatively, could read off streamalert_athena module and get more outputs from that. - athena_config = config['lambda']['athena_partition_refresh_config'] + athena_config = config['lambda']['athena_partitioner_config'] # FIXME (derek.wang) make consistent with streamalert_athena module, # maybe make this dependent on output of that module? database = athena_config.get('database_name', '{}_streamalert'.format(prefix)) # The results bucket cannot reference the output from the streamalert_athena module: - # '${module.streamalert_athena.results_bucket_arn}' + # '${module.athena_partitioner_iam.results_bucket_arn}' # Because it takes a bucket name, not an ARN # FIXME (derek.wang) DRY out this code results_bucket = athena_query_results_bucket(config) diff --git a/tests/unit/conf/lambda.json b/tests/unit/conf/lambda.json index 7d42b54a1..8dacf068a 100644 --- a/tests/unit/conf/lambda.json +++ b/tests/unit/conf/lambda.json @@ -28,7 +28,7 @@ ] } }, - "athena_partition_refresh_config": { + "athena_partitioner_config": { "memory": "128", "timeout": "60", "file_format": "parquet", diff --git a/tests/unit/conf_athena/lambda.json b/tests/unit/conf_athena/lambda.json index 12fade859..7d29268f2 100644 --- a/tests/unit/conf_athena/lambda.json +++ b/tests/unit/conf_athena/lambda.json @@ -1,5 +1,5 @@ { - "athena_partition_refresh_config": { + "athena_partitioner_config": { "file_format": "json", "memory": "128", "timeout": "60" diff --git a/tests/unit/helpers/config.py b/tests/unit/helpers/config.py index 71c2de457..f74139648 100644 --- a/tests/unit/helpers/config.py +++ b/tests/unit/helpers/config.py @@ -110,7 +110,7 @@ def basic_streamalert_config(): 'memory': 128, 'timeout': 10 }, - 'athena_partition_refresh_config': { + 'athena_partitioner_config': { 'enable_custom_metrics': False, 'memory': 128, 'timeout': 60 @@ -288,6 +288,6 @@ def athena_cli_basic_config(): } }, 'lambda': { - 'athena_partition_refresh_config': {} + 'athena_partitioner_config': {} } } diff --git a/tests/unit/streamalert/athena_partition_refresh/__init__.py b/tests/unit/streamalert/athena_partitioner/__init__.py similarity index 100% rename from tests/unit/streamalert/athena_partition_refresh/__init__.py rename to tests/unit/streamalert/athena_partitioner/__init__.py diff --git a/tests/unit/streamalert/athena_partition_refresh/test_main.py b/tests/unit/streamalert/athena_partitioner/test_main.py similarity index 84% rename from tests/unit/streamalert/athena_partition_refresh/test_main.py rename to tests/unit/streamalert/athena_partitioner/test_main.py index c9caadd5f..b360df323 100644 --- a/tests/unit/streamalert/athena_partition_refresh/test_main.py +++ b/tests/unit/streamalert/athena_partitioner/test_main.py @@ -20,7 +20,7 @@ from mock import Mock, patch, call from nose.tools import assert_equal, assert_true -from streamalert.athena_partition_refresh.main import AthenaRefresher +from streamalert.athena_partitioner.main import AthenaPartitioner from streamalert.shared.config import load_config from tests.unit.helpers.aws_mocks import MockAthenaClient @@ -29,21 +29,21 @@ # Without this time.sleep patch, backoff performs sleep # operations and drastically slows down testing @patch('time.sleep', Mock()) -class TestAthenaRefresher: - """Test class for AthenaRefresher when output data in Parquet format""" +class TestAthenaPartitioner: + """Test class for AthenaPartitioner when output data in Parquet format""" - @patch('streamalert.athena_partition_refresh.main.load_config', + @patch('streamalert.athena_partitioner.main.load_config', Mock(return_value=load_config('tests/unit/conf/'))) @patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1'}) @patch('streamalert.shared.athena.boto3') def setup(self, boto_patch): - """Setup the AthenaRefresher tests""" + """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaRefresher() + self._partitioner = AthenaPartitioner() def test_add_partitions(self): - """AthenaRefresher - Add Partitions""" - self._refresher._s3_buckets_and_keys = { + """AthenaPartitioner - Add Partitions""" + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.parquet', b'parquet/alerts/dt=2020-02-13-08/prefix_streamalert_alert_delivery-01-abcd.parquet' @@ -63,19 +63,19 @@ def test_add_partitions(self): b'dt=2020-02-12-07/log_type_2_abcd.parquet' } } - result = self._refresher._add_partitions() + result = self._partitioner._add_partitions() assert_true(result) @patch('logging.Logger.warning') def test_add_partitions_none(self, log_mock): - """AthenaRefresher - Add Partitions, None to Add""" - result = self._refresher._add_partitions() + """AthenaPartitioner - Add Partitions, None to Add""" + result = self._partitioner._add_partitions() log_mock.assert_called_with('No partitions to add') assert_equal(result, False) def test_get_partitions_from_keys_parquet(self): - """AthenaRefresher - Get Partitions From Keys in parquet format""" + """AthenaPartitioner - Get Partitions From Keys in parquet format""" expected_result = { 'alerts': { '(dt = \'2017-08-26-14\')': ('\'s3://unit-test-streamalerts/' @@ -103,7 +103,7 @@ def test_get_partitions_from_keys_parquet(self): } } - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-26-14/rule_name_alerts-1304134918401.parquet', b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.parquet', @@ -123,21 +123,21 @@ def test_get_partitions_from_keys_parquet(self): } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() assert_equal(result, expected_result) @patch('logging.Logger.warning') def test_get_partitions_from_keys_error(self, log_mock): - """AthenaRefresher - Get Partitions From Keys, Bad Key""" + """AthenaPartitioner - Get Partitions From Keys, Bad Key""" bad_key = b'bad_match_string' - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { bad_key } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() log_mock.assert_called_with('The key %s does not match any regex, skipping', bad_key.decode('utf-8')) @@ -182,10 +182,10 @@ def _s3_record_placeholder_file(): def _create_test_message(count=2, placeholder=False): """Helper function for creating an sqs messsage body""" if placeholder: - body = json.dumps(TestAthenaRefresher._s3_record_placeholder_file()) + body = json.dumps(TestAthenaPartitioner._s3_record_placeholder_file()) else: count = min(count, 30) - body = json.dumps(TestAthenaRefresher._s3_record(count)) + body = json.dumps(TestAthenaPartitioner._s3_record(count)) return { 'Records': [ { @@ -199,11 +199,11 @@ def _create_test_message(count=2, placeholder=False): } @patch('logging.Logger.debug') - @patch('streamalert.athena_partition_refresh.main.AthenaRefresher._add_partitions') + @patch('streamalert.athena_partitioner.main.AthenaPartitioner._add_partitions') def test_run(self, add_mock, log_mock): - """AthenaRefresher - Run""" + """AthenaPartitioner - Run""" add_mock.return_value = True - self._refresher.run(self._create_test_message(1)) + self._partitioner.run(self._create_test_message(1)) log_mock.assert_called_with( 'Received notification for object \'%s\' in bucket \'%s\'', 'parquet/alerts/dt=2017-08-01-14/02/test.json'.encode(), @@ -212,8 +212,8 @@ def test_run(self, add_mock, log_mock): @patch('logging.Logger.info') def test_run_placeholder_file(self, log_mock): - """AthenaRefresher - Run, Placeholder File""" - self._refresher.run(self._create_test_message(1, True)) + """AthenaPartitioner - Run, Placeholder File""" + self._partitioner.run(self._create_test_message(1, True)) log_mock.assert_has_calls([ call( 'Skipping placeholder file notification with key: %s', @@ -223,38 +223,38 @@ def test_run_placeholder_file(self, log_mock): @patch('logging.Logger.warning') def test_run_no_messages(self, log_mock): - """AthenaRefresher - Run, No Messages""" - self._refresher.run(self._create_test_message(0)) + """AthenaPartitioner - Run, No Messages""" + self._partitioner.run(self._create_test_message(0)) log_mock.assert_called_with('No partitions to add') @patch('logging.Logger.error') def test_run_invalid_bucket(self, log_mock): - """AthenaRefresher - Run, Bad Bucket Name""" + """AthenaPartitioner - Run, Bad Bucket Name""" event = self._create_test_message(0) bucket = 'bad.bucket.name' s3_record = self._s3_record(1) s3_record['Records'][0]['s3']['bucket']['name'] = bucket event['Records'][0]['body'] = json.dumps(s3_record) - self._refresher.run(event) + self._partitioner.run(event) log_mock.assert_called_with('\'%s\' not found in \'buckets\' config. Please add this ' 'bucket to enable additions of Hive partitions.', bucket) @patch('time.sleep', Mock()) -class TestAthenaRefresherJson: - """Test class for AthenaRefresher when output data in JSON format""" +class TestAthenaPartitionerJSON: + """Test class for AthenaPartitioner when output data in JSON format""" - @patch('streamalert.athena_partition_refresh.main.load_config', + @patch('streamalert.athena_partitioner.main.load_config', Mock(return_value=load_config('tests/unit/conf_athena/'))) @patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1'}) @patch('streamalert.shared.athena.boto3') def setup(self, boto_patch): - """Setup the AthenaRefresher tests""" + """Setup the AthenaPartitioner tests""" boto_patch.client.return_value = MockAthenaClient() - self._refresher = AthenaRefresher() + self._partitioner = AthenaPartitioner() def test_get_partitions_from_keys_json(self): - """AthenaRefresher - Get Partitions From Keys in json format""" + """AthenaPartitioner - Get Partitions From Keys in json format""" expected_result = { 'alerts': { '(dt = \'2017-08-26-14\')': ('\'s3://unit-test-streamalerts/' @@ -282,7 +282,7 @@ def test_get_partitions_from_keys_json(self): } } - self._refresher._s3_buckets_and_keys = { + self._partitioner._s3_buckets_and_keys = { 'unit-test-streamalerts': { b'parquet/alerts/dt=2017-08-26-14/rule_name_alerts-1304134918401.json', b'parquet/alerts/dt=2017-08-27-14/rule_name_alerts-1304134918401.json', @@ -302,6 +302,6 @@ def test_get_partitions_from_keys_json(self): } } - result = self._refresher._get_partitions_from_keys() + result = self._partitioner._get_partitions_from_keys() assert_equal(result, expected_result) diff --git a/tests/unit/streamalert_cli/manage_lambda/test_rollback.py b/tests/unit/streamalert_cli/manage_lambda/test_rollback.py index 650ddb1d0..13087ef3e 100644 --- a/tests/unit/streamalert_cli/manage_lambda/test_rollback.py +++ b/tests/unit/streamalert_cli/manage_lambda/test_rollback.py @@ -76,7 +76,7 @@ def test_rollback_all(self, mock_helper): mock.call(mock.ANY, 'unit-test_streamalert_alert_merger'), mock.call(mock.ANY, 'unit-test_corp_box_admin_events_box_collector_app'), mock.call(mock.ANY, 'unit-test_corp_duo_admin_duo_admin_collector_app'), - mock.call(mock.ANY, 'unit-test_streamalert_athena_partition_refresh'), + mock.call(mock.ANY, 'unit-test_streamalert_athena_partitioner'), mock.call(mock.ANY, 'unit-test_corp_streamalert_classifier'), mock.call(mock.ANY, 'unit-test_prod_streamalert_classifier'), mock.call(mock.ANY, 'unit-test_streamalert_rules_engine'), diff --git a/tests/unit/streamalert_cli/terraform/test_athena.py b/tests/unit/streamalert_cli/terraform/test_athena.py index 2079b35fd..a2b701e35 100644 --- a/tests/unit/streamalert_cli/terraform/test_athena.py +++ b/tests/unit/streamalert_cli/terraform/test_athena.py @@ -20,10 +20,11 @@ CONFIG = CLIConfig(config_path='tests/unit/conf') + def test_generate_athena(): - """CLI - Terraform Generate Athena""" + """CLI - Terraform Generate Athena Partitioner""" - CONFIG['lambda']['athena_partition_refresh_config'] = { + CONFIG['lambda']['athena_partitioner_config'] = { 'timeout': '60', 'memory': '128', 'third_party_libraries': [] @@ -33,32 +34,38 @@ def test_generate_athena(): expected_athena_config = { 'module': { - 'streamalert_athena': { - 's3_logging_bucket': '{}-streamalert-s3-logging'.format(prefix), + 'athena_partitioner_iam': { 'source': './modules/tf_athena', + 's3_logging_bucket': '{}-streamalert-s3-logging'.format(prefix), + 'prefix': 'unit-test', + 'account_id': '12345678910', 'database_name': '{}_streamalert'.format(prefix), 'queue_name': '{}_streamalert_athena_s3_notifications'.format(prefix), 'results_bucket': '{}-streamalert-athena-results'.format(prefix), - 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', - 'lambda_handler': 'streamalert.athena_partition_refresh.main.handler', - 'lambda_log_level': 'info', - 'lambda_memory': '128', - 'lambda_timeout': '60', 'athena_data_buckets': [ 'unit-test-streamalert-data', 'unit-test-streamalerts' ], - 'prefix': 'unit-test', - 'account_id': '12345678910', - 'concurrency_limit': 10 + 'kms_key_id': '${aws_kms_key.server_side_encryption.key_id}', + 'function_role_id': '${module.athena_partitioner_lambda.role_id}', + 'function_name': '${module.athena_partitioner_lambda.function_name}', + 'function_alias_arn': '${module.athena_partitioner_lambda.function_alias_arn}', }, - 'athena_monitoring': { - 'source': './modules/tf_monitoring', - 'sns_topic_arn': ( - 'arn:aws:sns:us-west-1:12345678910:unit-test_streamalert_monitoring' - ), - 'kinesis_alarms_enabled': False, - 'lambda_functions': ['unit-test_streamalert_athena_partition_refresh'] + 'athena_partitioner_lambda': { + 'description': 'Unit-Test Streamalert Athena Partitioner', + 'environment_variables': { + 'ENABLE_METRICS': '0', + 'LOGGER_LEVEL': 'info' + }, + 'tags': { + 'Subcomponent': 'AthenaPartitioner' + }, + 'filename': 'athena_partitioner.zip', + 'function_name': 'unit-test_streamalert_athena_partitioner', + 'handler': 'streamalert.athena_partitioner.main.handler', + 'memory_size_mb': '128', + 'source': './modules/tf_lambda', + 'timeout_sec': '60', } } } diff --git a/tests/unit/streamalert_cli/terraform/test_generate.py b/tests/unit/streamalert_cli/terraform/test_generate.py index 3d309b6a1..da6c07a1f 100644 --- a/tests/unit/streamalert_cli/terraform/test_generate.py +++ b/tests/unit/streamalert_cli/terraform/test_generate.py @@ -783,30 +783,56 @@ def test_generate_main_with_sqs_url_false(self): assert_equal(result['module']['globals']['source'], './modules/tf_globals') assert_false(result['module']['globals']['sqs_use_prefix']) - def test_generate_main_file_format_unspecified(self): - "CLI - Terraform Generate Main raises error when file_format unspecified" - self.config['lambda']['athena_partition_refresh_config']['file_format'] = None + def test_generate_athena_lambda_format_unspecified(self): + "CLI - Terraform Generate Global Lambda Settings, Unspecified Athena file_format" + self.config['lambda']['athena_partitioner_config']['file_format'] = None assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partition_refresh_config', + conf_name='athena_partitioner_config', generate_func='test_func', - tf_tmp_file='test_tf_tmp_file_path', - message='test message' + tf_tmp_file_name='test_tf_tmp_file_path', ) - def test_generate_main_file_format_misconfigured(self): - "CLI - Terraform Generate Main raises error when file_format misconfigured" - self.config['lambda']['athena_partition_refresh_config']['file_format'] = 'Parquet' + def test_generate_athena_lambda_format_invalid(self): + "CLI - Terraform Generate Global Lambda Settings, Invalid Athena file_format" + self.config['lambda']['athena_partitioner_config']['file_format'] = 'Parquet' assert_raises( ConfigError, generate.generate_global_lambda_settings, config=self.config, - config_name='athena_partition_refresh_config', + conf_name='athena_partitioner_config', generate_func='test_func', - tf_tmp_file='test_tf_tmp_file_path', - message='test message' + tf_tmp_file_name='test_tf_tmp_file_path', + ) + + def test_generate_required_lambda_invalid_config(self): + "CLI - Terraform Generate Global Lambda Settings, Invalid Config" + + assert_raises( + ConfigError, + generate.generate_global_lambda_settings, + config=self.config, + conf_name='athena_partition_refresh_config', + generate_func='test_func', + tf_tmp_file_name='test_tf_tmp_file_path', + ) + + @patch('logging.Logger.warning') + def test_generate_optional_lambda_not_in_config(self, log_mock): + "CLI - Terraform Generate Global Lambda Settings, Optional Missing in Config" + fake_opt_conf_name = 'fake_optional_conf_name' + generate.generate_global_lambda_settings( + config=self.config, + conf_name=fake_opt_conf_name, + generate_func='test_func', + tf_tmp_file_name='test_tf_tmp_file_path', + required=False, + ) + + log_mock.assert_called_with( + 'Optional configuration missing in lambda.json, skipping: %s', fake_opt_conf_name ) diff --git a/tests/unit/streamalert_cli/terraform/test_rule_promotion.py b/tests/unit/streamalert_cli/terraform/test_rule_promotion.py index 9f5063b8b..01f2cf49e 100644 --- a/tests/unit/streamalert_cli/terraform/test_rule_promotion.py +++ b/tests/unit/streamalert_cli/terraform/test_rule_promotion.py @@ -42,7 +42,9 @@ def test_generate(self): 'source': './modules/tf_rule_promotion_iam', 'send_digest_schedule_expression': 'cron(30 13 * * ? *)', 'digest_sns_topic': 'unit-test_streamalert_rule_staging_stats', - 'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}', + 'athena_results_bucket_arn': ( + '${module.athena_partitioner_iam.results_bucket_arn}' + ), 'alerts_bucket': 'unit-test-streamalerts', 's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}' }, diff --git a/tests/unit/streamalert_cli/test_cli_config.py b/tests/unit/streamalert_cli/test_cli_config.py index 8ad7925d0..b4cccaefa 100644 --- a/tests/unit/streamalert_cli/test_cli_config.py +++ b/tests/unit/streamalert_cli/test_cli_config.py @@ -63,9 +63,9 @@ def test_load_config(self): def test_toggle_metric(self): """CLI - Metric toggling""" - self.config.toggle_metrics('athena_partition_refresh', enabled=True) + self.config.toggle_metrics('athena_partitioner', enabled=True) assert_equal( - self.config['lambda']['athena_partition_refresh_config']['enable_custom_metrics'], + self.config['lambda']['athena_partitioner_config']['enable_custom_metrics'], True )