Skip to content

Commit

Permalink
misc updates related to config path renaming, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandeivert committed Apr 1, 2020
1 parent 6d0e727 commit 280ad23
Show file tree
Hide file tree
Showing 19 changed files with 47 additions and 43 deletions.
4 changes: 3 additions & 1 deletion conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
"subnet_ids": []
}
},
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"memory": 128,
"timeout": 300,
"file_format": null,
"log_level": "info"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/source/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ configured `outputs <outputs.html>`_. All alerts implicitly include a Firehose o
an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are
successfully delivered, at which point they will be removed from the DynamoDB table.

6. An "athena partition refresh" Lambda function runs periodically to onboard new StreamAlert data
6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data
and alerts into their respective Athena databases for historical search.

Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule
Expand Down
4 changes: 2 additions & 2 deletions docs/source/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Deploy

.. code-block:: bash
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
Expand Down Expand Up @@ -284,7 +284,7 @@ dropdown on the left and preview the ``alerts`` table:
:target: _images/athena-alerts-search.png

(Here, my name prefix is ``testv2``.) If no records are returned, look for errors
in the Athena Partition Refresh function or try invoking it directly.
in the Athena Partitioner function or try invoking it directly.

And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine.
Any resulting alerts are delivered to your configured output(s) within a matter of minutes.
22 changes: 11 additions & 11 deletions docs/source/historical-search.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ StreamAlert historical search feature is backed by Amazon S3 and `Athena <https:
By default, StreamAlert will send all alerts to S3 and those alerts will be searchable in Athena table. StreamAlert
users have option to enable historical search feature for data as well.

As of StreamAlert v3.1.0, a new field, ``file_format``, has been added to ``athena_partition_refresh_config``
As of StreamAlert v3.1.0, a new field, ``file_format``, has been added to ``athena_partitioner_config``
in ``conf/lamba.json``, defaulting to ``null``. This field allows users to configure how the data processed
by the Classifier is stored in S3 bucket, either in ``parquet`` or ``json``.

Expand Down Expand Up @@ -39,7 +39,7 @@ The pipeline is:

#. StreamAlert creates an Athena Database, alerts kinesis Firehose and ``alerts`` table during initial deployment
#. Optionally create Firehose resources and Athena tables for historical data retention
#. S3 events will be sent to an SQS that is mapped to the Athena Partition Refresh Lambda function
#. S3 events will be sent to an SQS that is mapped to the Athena Partitioner Lambda function
#. The Lambda function adds new partitions when there are new alerts or data saved in S3 bucket via Firehose
#. Alerts, and optionally data, are available for searching via Athena console or the Athena API

Expand All @@ -50,26 +50,26 @@ Alerts Search
*************

* Review the settings for the :ref:`Alerts Firehose Configuration <alerts_firehose_configuration>` and
the :ref:`Athena Partition Refresh<configure_athena_partition_refresh_lambda>` function. Note that
the :ref:`Athena Partitioner<configure_athena_partitioner_lambda>` function. Note that
the Athena database and alerts table are created automatically when you first deploy StreamAlert.
* If the ``file_format`` value within the :ref:`Athena Partition Refresh<configure_athena_partition_refresh_lambda>`
* If the ``file_format`` value within the :ref:`Athena Partitioner<configure_athena_partitioner_lambda>`
function config is set to ``parquet``, you can run the ``MSCK REPAIR TABLE alerts`` command in
Athena to load all available partitions and then alerts can be searchable. Note, however, that the
``MSCK REPAIR`` command cannot load new partitions automatically.
* StreamAlert includes a Lambda function to automatically add new partitions for Athena tables when
the data arrives in S3. See :ref:`configure_athena_partition_refresh_lambda`
the data arrives in S3. See :ref:`configure_athena_partitioner_lambda`

.. code-block:: bash
{
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
}
}
* Deploy the Athena Partition Refresh Lambda function
* Deploy the Athena Partitioner Lambda function

.. code-block:: bash
Expand Down Expand Up @@ -109,7 +109,7 @@ It is optional to store data in S3 bucket and available for search in Athena tab
.. image:: ../images/athena-data-search.png


.. _configure_athena_partition_refresh_lambda:
.. _configure_athena_partitioner_lambda:

*************************
Configure Lambda Settings
Expand All @@ -120,8 +120,8 @@ Open ``conf/lambda.json``, and fill in the following options:
=================================== ======== ==================== ===========
Key Required Default Description
----------------------------------- -------- -------------------- -----------
``enabled`` Yes ``true`` Enables/Disables the Athena Partition Refresh Lambda function
``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function
``enabled`` Yes ``true`` Enables/Disables the Athena Partitioner Lambda function
``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partitioner Lambda function
``log_level`` No ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries.
``memory`` No ``128`` The amount of memory (in MB) allocated to the Lambda function
``timeout`` No ``60`` The maximum duration of the Lambda function (in seconds)
Expand All @@ -134,7 +134,7 @@ Key Required Default Descriptio
.. code-block:: json
{
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"log_level": "info",
"memory": 128,
"buckets": {
Expand Down
2 changes: 1 addition & 1 deletion streamalert/rule_promotion/promoter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self):
# Create the rule table class for getting staging information
self._rule_table = RuleTable('{}_streamalert_rules'.format(prefix))

athena_config = self._config['lambda']['athena_partition_refresh_config']
athena_config = self._config['lambda']['athena_partitioner_config']

# Get the name of the athena database to access
db_name = athena_config.get('database_name', get_database_name(self._config))
Expand Down
2 changes: 1 addition & 1 deletion streamalert/shared/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, database_name, results_bucket, results_prefix, region=None):
if not results_bucket.startswith('s3://'):
results_bucket = 's3://{}'.format(results_bucket)

# Produces athena_partition_refresh/YYYY/MM/DD S3 keys
# Produces athena_partitioner/YYYY/MM/DD S3 keys
self._s3_results_path = posixpath.join(
results_bucket,
results_prefix,
Expand Down
4 changes: 2 additions & 2 deletions streamalert/shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def athena_partition_buckets(config):
Returns:
list: Bucket names for which Athena is enabled
"""
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']
data_buckets = athena_config.get('buckets', {})
data_buckets[firehose_alerts_bucket(config)] = 'alerts'
data_bucket = firehose_data_bucket(config) # Data retention is optional, so check for this
Expand All @@ -140,7 +140,7 @@ def athena_query_results_bucket(config):
Returns:
str: The name of the S3 bucket.
"""
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']
prefix = config['global']['account']['prefix']

return athena_config.get(
Expand Down
4 changes: 2 additions & 2 deletions streamalert/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def get_database_name(config):
str: The name of the athena database
"""
prefix = config['global']['account']['prefix']
athena_config = config['lambda'].get('athena_partition_refresh_config')
athena_config = config['lambda'].get('athena_partitioner_config')

return athena_config.get('database_name', '{}_streamalert'.format(prefix))

Expand All @@ -163,6 +163,6 @@ def get_data_file_format(config):
Returns:
str: The data store format either "parquet" or "json"
"""
athena_config = config['lambda'].get('athena_partition_refresh_config', {})
athena_config = config['lambda'].get('athena_partitioner_config', {})

return athena_config.get('file_format')
10 changes: 5 additions & 5 deletions streamalert_cli/athena/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def get_athena_client(config):
AthenaClient: instantiated client for performing athena actions
"""
prefix = config['global']['account']['prefix']
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']

db_name = get_database_name(config)

Expand Down Expand Up @@ -451,11 +451,11 @@ def create_table(table, bucket, config, schema_override=None):
if table != 'alerts' and bucket != config_data_bucket:
# Only add buckets to the config if they are not one of the default/configured buckets
# Ensure 'buckets' exists in the config (since it is not required)
config['lambda']['athena_partition_refresh_config']['buckets'] = (
config['lambda']['athena_partition_refresh_config'].get('buckets', {})
config['lambda']['athena_partitioner_config']['buckets'] = (
config['lambda']['athena_partitioner_config'].get('buckets', {})
)
if bucket not in config['lambda']['athena_partition_refresh_config']['buckets']:
config['lambda']['athena_partition_refresh_config']['buckets'][bucket] = 'data'
if bucket not in config['lambda']['athena_partitioner_config']['buckets']:
config['lambda']['athena_partitioner_config']['buckets'][bucket] = 'data'
config.write()

LOGGER.info('The %s table was successfully created!', sanitized_table_name)
Expand Down
4 changes: 2 additions & 2 deletions streamalert_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def clusters(self):

def generate_athena(self):
"""Generate a base Athena config"""
if 'athena_partition_refresh_config' in self.config['lambda']:
if 'athena_partitioner_config' in self.config['lambda']:
LOGGER.warning('The Athena configuration already exists, skipping.')
return

Expand All @@ -71,7 +71,7 @@ def generate_athena(self):
'third_party_libraries': []
}

self.config['lambda']['athena_partition_refresh_config'] = athena_config_template
self.config['lambda']['athena_partitioner_config'] = athena_config_template
self.write()

LOGGER.info('Athena configuration successfully created')
Expand Down
2 changes: 1 addition & 1 deletion streamalert_cli/terraform/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def handler(cls, options, config):
if get_data_file_format(config) == 'json':
# Terraform v0.12 now supports creating Athena tables. We will support
# to use terraform aws_glue_catalog_table resource to create table only
# when data file_format is set to "parquet" in "athena_partition_refresh_config"
# when data file_format is set to "parquet" in "athena_partitioner_config"
#
# For "json" file_format, we will continue using Athena DDL query to
# create tables. However, this capabity will be faded out in the future
Expand Down
2 changes: 1 addition & 1 deletion streamalert_cli/terraform/rule_promotion.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def generate_rule_promotion(config):
'rules_table_arn': '${module.globals.rules_table_arn}',
'function_alias_arn': '${module.rule_promotion_lambda.function_alias_arn}',
'function_name': '${module.rule_promotion_lambda.function_name}',
'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}',
'athena_results_bucket_arn': '${module.athena_partitioner_iam.results_bucket_arn}',
'alerts_bucket': alerts_bucket,
's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}'
}
Expand Down
4 changes: 2 additions & 2 deletions streamalert_cli/terraform/scheduled_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def generate_scheduled_queries_module_configuration(config):
# FIXME (derek.wang)
# should violently break if athena configurations don't exist.
# Alternatively, could read off streamalert_athena module and get more outputs from that.
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']

# FIXME (derek.wang) make consistent with streamalert_athena module,
# maybe make this dependent on output of that module?
database = athena_config.get('database_name', '{}_streamalert'.format(prefix))

# The results bucket cannot reference the output from the streamalert_athena module:
# '${module.streamalert_athena.results_bucket_arn}'
# '${module.athena_partitioner_iam.results_bucket_arn}'
# Because it takes a bucket name, not an ARN
# FIXME (derek.wang) DRY out this code
results_bucket = athena_query_results_bucket(config)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
]
}
},
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"memory": "128",
"timeout": "60",
"file_format": "parquet",
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/conf_athena/lambda.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"file_format": "json",
"memory": "128",
"timeout": "60"
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/helpers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def basic_streamalert_config():
'memory': 128,
'timeout': 10
},
'athena_partition_refresh_config': {
'athena_partitioner_config': {
'enable_custom_metrics': False,
'memory': 128,
'timeout': 60
Expand Down Expand Up @@ -288,6 +288,6 @@ def athena_cli_basic_config():
}
},
'lambda': {
'athena_partition_refresh_config': {}
'athena_partitioner_config': {}
}
}
8 changes: 4 additions & 4 deletions tests/unit/streamalert_cli/terraform/test_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,27 +785,27 @@ def test_generate_main_with_sqs_url_false(self):

def test_generate_main_file_format_unspecified(self):
"CLI - Terraform Generate Main raises error when file_format unspecified"
self.config['lambda']['athena_partition_refresh_config']['file_format'] = None
self.config['lambda']['athena_partitioner_config']['file_format'] = None

assert_raises(
ConfigError,
generate.generate_global_lambda_settings,
config=self.config,
config_name='athena_partition_refresh_config',
config_name='athena_partitioner_config',
generate_func='test_func',
tf_tmp_file='test_tf_tmp_file_path',
message='test message'
)

def test_generate_main_file_format_misconfigured(self):
"CLI - Terraform Generate Main raises error when file_format misconfigured"
self.config['lambda']['athena_partition_refresh_config']['file_format'] = 'Parquet'
self.config['lambda']['athena_partitioner_config']['file_format'] = 'Parquet'

assert_raises(
ConfigError,
generate.generate_global_lambda_settings,
config=self.config,
config_name='athena_partition_refresh_config',
config_name='athena_partitioner_config',
generate_func='test_func',
tf_tmp_file='test_tf_tmp_file_path',
message='test message'
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/streamalert_cli/terraform/test_rule_promotion.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def test_generate(self):
'source': './modules/tf_rule_promotion_iam',
'send_digest_schedule_expression': 'cron(30 13 * * ? *)',
'digest_sns_topic': 'unit-test_streamalert_rule_staging_stats',
'athena_results_bucket_arn': '${module.streamalert_athena.results_bucket_arn}',
'athena_results_bucket_arn': (
'${module.athena_partitioner_iam.results_bucket_arn}'
),
'alerts_bucket': 'unit-test-streamalerts',
's3_kms_key_arn': '${aws_kms_key.server_side_encryption.arn}'
},
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/streamalert_cli/test_cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def test_load_config(self):

def test_toggle_metric(self):
"""CLI - Metric toggling"""
self.config.toggle_metrics('athena_partition_refresh', enabled=True)
self.config.toggle_metrics('athena_partitioner', enabled=True)
assert_equal(
self.config['lambda']['athena_partition_refresh_config']['enable_custom_metrics'],
self.config['lambda']['athena_partitioner_config']['enable_custom_metrics'],
True
)

Expand Down

0 comments on commit 280ad23

Please sign in to comment.