Skip to content

Commit

Permalink
Merge 11bd486 into 41da6b5
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandeivert committed Apr 2, 2020
2 parents 41da6b5 + 11bd486 commit 5d31dbb
Show file tree
Hide file tree
Showing 40 changed files with 386 additions and 586 deletions.
4 changes: 3 additions & 1 deletion conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
"subnet_ids": []
}
},
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"memory": 128,
"timeout": 300,
"file_format": null,
"log_level": "info"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/source/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ configured `outputs <outputs.html>`_. All alerts implicitly include a Firehose o
an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are
successfully delivered, at which point they will be removed from the DynamoDB table.

6. An "athena partition refresh" Lambda function runs periodically to onboard new StreamAlert data
6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data
and alerts into their respective Athena databases for historical search.

Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule
Expand Down
4 changes: 2 additions & 2 deletions docs/source/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Deploy

.. code-block:: bash
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
Expand Down Expand Up @@ -284,7 +284,7 @@ dropdown on the left and preview the ``alerts`` table:
:target: _images/athena-alerts-search.png

(Here, my name prefix is ``testv2``.) If no records are returned, look for errors
in the Athena Partition Refresh function or try invoking it directly.
in the Athena Partitioner function or try invoking it directly.

And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine.
Any resulting alerts are delivered to your configured output(s) within a matter of minutes.
22 changes: 11 additions & 11 deletions docs/source/historical-search.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ StreamAlert historical search feature is backed by Amazon S3 and `Athena <https:
By default, StreamAlert will send all alerts to S3 and those alerts will be searchable in Athena table. StreamAlert
users have option to enable historical search feature for data as well.

As of StreamAlert v3.1.0, a new field, ``file_format``, has been added to ``athena_partition_refresh_config``
As of StreamAlert v3.1.0, a new field, ``file_format``, has been added to ``athena_partitioner_config``
in ``conf/lamba.json``, defaulting to ``null``. This field allows users to configure how the data processed
by the Classifier is stored in S3 bucket, either in ``parquet`` or ``json``.

Expand Down Expand Up @@ -39,7 +39,7 @@ The pipeline is:

#. StreamAlert creates an Athena Database, alerts kinesis Firehose and ``alerts`` table during initial deployment
#. Optionally create Firehose resources and Athena tables for historical data retention
#. S3 events will be sent to an SQS that is mapped to the Athena Partition Refresh Lambda function
#. S3 events will be sent to an SQS that is mapped to the Athena Partitioner Lambda function
#. The Lambda function adds new partitions when there are new alerts or data saved in S3 bucket via Firehose
#. Alerts, and optionally data, are available for searching via Athena console or the Athena API

Expand All @@ -50,26 +50,26 @@ Alerts Search
*************

* Review the settings for the :ref:`Alerts Firehose Configuration <alerts_firehose_configuration>` and
the :ref:`Athena Partition Refresh<configure_athena_partition_refresh_lambda>` function. Note that
the :ref:`Athena Partitioner<configure_athena_partitioner_lambda>` function. Note that
the Athena database and alerts table are created automatically when you first deploy StreamAlert.
* If the ``file_format`` value within the :ref:`Athena Partition Refresh<configure_athena_partition_refresh_lambda>`
* If the ``file_format`` value within the :ref:`Athena Partitioner<configure_athena_partitioner_lambda>`
function config is set to ``parquet``, you can run the ``MSCK REPAIR TABLE alerts`` command in
Athena to load all available partitions and then alerts can be searchable. Note, however, that the
``MSCK REPAIR`` command cannot load new partitions automatically.
* StreamAlert includes a Lambda function to automatically add new partitions for Athena tables when
the data arrives in S3. See :ref:`configure_athena_partition_refresh_lambda`
the data arrives in S3. See :ref:`configure_athena_partitioner_lambda`

.. code-block:: bash
{
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
}
}
* Deploy the Athena Partition Refresh Lambda function
* Deploy the Athena Partitioner Lambda function

.. code-block:: bash
Expand Down Expand Up @@ -109,7 +109,7 @@ It is optional to store data in S3 bucket and available for search in Athena tab
.. image:: ../images/athena-data-search.png


.. _configure_athena_partition_refresh_lambda:
.. _configure_athena_partitioner_lambda:

*************************
Configure Lambda Settings
Expand All @@ -120,8 +120,8 @@ Open ``conf/lambda.json``, and fill in the following options:
=================================== ======== ==================== ===========
Key Required Default Description
----------------------------------- -------- -------------------- -----------
``enabled`` Yes ``true`` Enables/Disables the Athena Partition Refresh Lambda function
``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function
``enabled`` Yes ``true`` Enables/Disables the Athena Partitioner Lambda function
``enable_custom_metrics`` No ``false`` Enables/Disables logging of metrics for the Athena Partitioner Lambda function
``log_level`` No ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries.
``memory`` No ``128`` The amount of memory (in MB) allocated to the Lambda function
``timeout`` No ``60`` The maximum duration of the Lambda function (in seconds)
Expand All @@ -134,7 +134,7 @@ Key Required Default Descriptio
.. code-block:: json
{
"athena_partition_refresh_config": {
"athena_partitioner_config": {
"log_level": "info",
"memory": 128,
"buckets": {
Expand Down
2 changes: 1 addition & 1 deletion streamalert/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""StreamAlert version."""
__version__ = '3.1.2'
__version__ = '3.2.0'
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
LOGGER = get_logger(__name__)


class AthenaRefreshError(Exception):
class AthenaPartitionerError(Exception):
"""Generic Athena Partition Error for erroring the Lambda function"""


class AthenaRefresher:
class AthenaPartitioner:
"""Handle polling an SQS queue and running Athena queries for updating tables"""

ALERTS_REGEX = re.compile(r'alerts/dt=(?P<year>\d{4})'
Expand All @@ -58,14 +58,14 @@ class AthenaRefresher:
r'\-(?P<day>\d{2})'
r'\-(?P<hour>\d{2})\/.*')

ATHENA_S3_PREFIX = 'athena_partition_refresh'
ATHENA_S3_PREFIX = 'athena_partitioner'

_ATHENA_CLIENT = None

def __init__(self):
config = load_config(include={'lambda.json', 'global.json'})
prefix = config['global']['account']['prefix']
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']
self._file_format = get_data_file_format(config)

if self._file_format == 'parquet':
Expand All @@ -78,7 +78,7 @@ def __init__(self):
else:
message = (
'file format "{}" is not supported. Supported file format are '
'"parquet", "json". Please update the setting in athena_partition_refresh_config '
'"parquet", "json". Please update the setting in athena_partitioner_config '
'in "conf/lambda.json"'.format(self._file_format)
)
raise ConfigError(message)
Expand Down Expand Up @@ -106,7 +106,7 @@ def _create_client(cls, db_name, results_bucket):

# Check if the database exists when the client is created
if not cls._ATHENA_CLIENT.check_database_exists():
raise AthenaRefreshError('The \'{}\' database does not exist'.format(db_name))
raise AthenaPartitionerError('The \'{}\' database does not exist'.format(db_name))

def _get_partitions_from_keys(self):
"""Get the partitions that need to be added for the Athena tables
Expand Down Expand Up @@ -198,7 +198,7 @@ def _add_partitions(self):

success = self._ATHENA_CLIENT.run_query(query=query)
if not success:
raise AthenaRefreshError(
raise AthenaPartitionerError(
'The add hive partition query has failed:\n{}'.format(query)
)

Expand Down Expand Up @@ -247,5 +247,5 @@ def run(self, event):


def handler(event, _):
"""Athena Partition Refresher Handler Function"""
AthenaRefresher().run(event)
"""Athena Partitioner Handler Function"""
AthenaPartitioner().run(event)
2 changes: 1 addition & 1 deletion streamalert/rule_promotion/promoter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self):
# Create the rule table class for getting staging information
self._rule_table = RuleTable('{}_streamalert_rules'.format(prefix))

athena_config = self._config['lambda']['athena_partition_refresh_config']
athena_config = self._config['lambda']['athena_partitioner_config']

# Get the name of the athena database to access
db_name = athena_config.get('database_name', get_database_name(self._config))
Expand Down
2 changes: 1 addition & 1 deletion streamalert/shared/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Define some shared resources."""
ALERT_MERGER_NAME = 'alert_merger'
ALERT_PROCESSOR_NAME = 'alert_processor'
ATHENA_PARTITION_REFRESH_NAME = 'athena_partition_refresh'
ATHENA_PARTITIONER_NAME = 'athena_partitioner'
CLASSIFIER_FUNCTION_NAME = 'classifier'
RULES_ENGINE_FUNCTION_NAME = 'rules_engine'
RULE_PROMOTION_NAME = 'rule_promotion'
Expand Down
16 changes: 10 additions & 6 deletions streamalert/shared/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ def __init__(self, database_name, results_bucket, results_prefix, region=None):
if not results_bucket.startswith('s3://'):
results_bucket = 's3://{}'.format(results_bucket)

# Produces athena_partition_refresh/YYYY/MM/DD S3 keys
self._s3_results_path = posixpath.join(
results_bucket,
results_prefix,
datetime.utcnow().strftime('%Y/%m/%d')
# Produces s3://<results_bucket_name>/<results_prefix>
self._s3_results_path_prefix = posixpath.join(results_bucket, results_prefix)

@property
def results_path(self):
# Returns a path for the current hour: /YYYY/MM/DD/HH
return posixpath.join(
self._s3_results_path_prefix,
datetime.utcnow().strftime('%Y/%m/%d/%H')
)

@staticmethod
Expand Down Expand Up @@ -125,7 +129,7 @@ def _execute_query(self, query):
return self._client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': self.database},
ResultConfiguration={'OutputLocation': self._s3_results_path}
ResultConfiguration={'OutputLocation': self.results_path}
)
except ClientError as err:
raise AthenaQueryExecutionError('Athena query failed:\n{}'.format(err))
Expand Down
4 changes: 2 additions & 2 deletions streamalert/shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def athena_partition_buckets(config):
Returns:
list: Bucket names for which Athena is enabled
"""
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']
data_buckets = athena_config.get('buckets', {})
data_buckets[firehose_alerts_bucket(config)] = 'alerts'
data_bucket = firehose_data_bucket(config) # Data retention is optional, so check for this
Expand All @@ -140,7 +140,7 @@ def athena_query_results_bucket(config):
Returns:
str: The name of the S3 bucket.
"""
athena_config = config['lambda']['athena_partition_refresh_config']
athena_config = config['lambda']['athena_partitioner_config']
prefix = config['global']['account']['prefix']

return athena_config.get(
Expand Down
6 changes: 3 additions & 3 deletions streamalert/shared/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from streamalert.shared import (
ALERT_MERGER_NAME,
ALERT_PROCESSOR_NAME,
ATHENA_PARTITION_REFRESH_NAME,
ATHENA_PARTITIONER_NAME,
CLASSIFIER_FUNCTION_NAME,
RULES_ENGINE_FUNCTION_NAME
)
Expand All @@ -29,7 +29,7 @@
CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster')

# The FUNC_PREFIXES dict acts as a simple map to a human-readable name
# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the
# Add ATHENA_PARTITIONER_NAME: 'AthenaPartitioner', to the
# below when metrics are supported there
FUNC_PREFIXES = {
ALERT_MERGER_NAME: 'AlertMerger',
Expand Down Expand Up @@ -89,7 +89,7 @@ class MetricLogger:
ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup)
},
ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics
ATHENA_PARTITION_REFRESH_NAME: {}, # Placeholder for future athena processor metrics
ATHENA_PARTITIONER_NAME: {}, # Placeholder for future athena processor metrics
CLASSIFIER_FUNCTION_NAME: {
FAILED_PARSES: (_default_filter.format(FAILED_PARSES),
_default_value_lookup),
Expand Down
4 changes: 2 additions & 2 deletions streamalert/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def get_database_name(config):
str: The name of the athena database
"""
prefix = config['global']['account']['prefix']
athena_config = config['lambda'].get('athena_partition_refresh_config')
athena_config = config['lambda'].get('athena_partitioner_config')

return athena_config.get('database_name', '{}_streamalert'.format(prefix))

Expand All @@ -163,6 +163,6 @@ def get_data_file_format(config):
Returns:
str: The data store format either "parquet" or "json"
"""
athena_config = config['lambda'].get('athena_partition_refresh_config', {})
athena_config = config['lambda'].get('athena_partitioner_config', {})

return athena_config.get('file_format')
96 changes: 8 additions & 88 deletions streamalert_cli/_infrastructure/modules/tf_athena/README.md
Original file line number Diff line number Diff line change
@@ -1,88 +1,8 @@
# StreamAlert Athena Terraform Module
This Terraform module creates a Lambda function for refreshing Athena Partitions once new data is written to S3

## Components
* A Python3.7 Lambda Function to perform a table refresh
* IAM Role and Policy to allow for Athena execution
* S3 bucket notifications
* Lambda permissions

## Example
```
module "streamalert_athena" {
source = "../modules/tf_athena"
lambda_s3_bucket = "my-source-bucket"
lambda_s3_key = "source/athena_partition_refresh_code.zip"
athena_data_buckets = ["my-org-streamalerts"]
}
```

## Inputs
<table>
<tr>
<th>Property</th>
<th>Description</th>
<th>Default</th>
<th>Required</th>
</tr>
<tr>
<td>lambda_handler</td>
<td>The Python function entry point</td>
<td>"main.handler"</td>
<td>False</td>
</tr>
<tr>
<td>lambda_timeout</td>
<td>The max runtime in seconds for the Lambda function</td>
<td>60 seconds</td>
<td>False</td>
</tr>
<tr>
<td>lambda_memory</td>
<td>The memory allocation in MB for the Lambda function</td>
<td>128MB</td>
<td>False</td>
</tr>
<tr>
<td>lambda_s3_bucket</td>
<td>The name of the S3 bucket to store Lambda deployment packages</td>
<td>None</td>
<td>True</td>
</tr>
<tr>
<td>lambda_s3_key</td>
<td>The object in S3 containing the Lambda source</td>
<td>None</td>
<td>True</td>
</tr>
<tr>
<td>lambda_log_level</td>
<td>The current log level of the Lambda function</td>
<td>info</td>
<td>False</td>
</tr>
<tr>
<td>current_version</td>
<td>The currently published version of the Lambda production alias</td>
<td>None</td>
<td>True</td>
</tr>
<tr>
<td>athean_data_buckets</td>
<td>A list of buckets to monitor changes to for Hive partitioning</td>
<td>None</td>
<td>True</td>
</tr>
<tr>
<td>prefix</td>
<td>The resource prefix, normally an organizational name or descriptor</td>
<td>None</td>
<td>True</td>
</tr>
<tr>
<td>schedule_expression</td>
<td>The Cloudwatch-Lambda invocation schedule expression</td>
<td>rate(10 minutes)</td>
<td>False</td>
</tr>
</table>
# Athena Partitioner Permissions
This module adds IAM permissions and other specific resources needed in the Athena partitioner function:
* Athena Database for querying alerts and historical data
* S3 Bucket for storing the results of Athena queries
* SQS Queue for receiving event notifications from S3 buckets
* S3 Event Notifications for sending messages to SQS Queue when objects are created
* KMS Key and Alias for encrypting/decrypting messages on SQS Queue
* Permissions for sending data to SQS Queue and reading/writing data in S3
Loading

0 comments on commit 5d31dbb

Please sign in to comment.