Skip to content

Commit

Permalink
Merge db7d0f1 into ffffe64
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryxias committed Feb 19, 2020
2 parents ffffe64 + db7d0f1 commit 6db7e47
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 222 deletions.
80 changes: 44 additions & 36 deletions streamalert/scheduled_queries/command/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,72 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from datetime import datetime

from streamalert.scheduled_queries.command.processor import CommandProcessor
from streamalert.scheduled_queries.config.services import configure_container
from streamalert.scheduled_queries.container.container import ServiceContainer
from streamalert.scheduled_queries.state.state_manager import StepFunctionStateManager, StateManager

from streamalert.scheduled_queries.config.lambda_conf import parameters
from streamalert.scheduled_queries.config.lambda_conf import get_streamquery_env_vars
from streamalert.scheduled_queries.config.services import ApplicationServices


class ScheduledQueries:

def __init__(self):
# Boot the service container
self._service_container = ServiceContainer(parameters)
configure_container(self._service_container)

self._logger = self._service_container.get('logger')
# Ready all services
self._services = ApplicationServices()

def run(self, event):
"""Ensure the Lambda function's Handler is set to: 'lambda.handler' """
"""The main application execution.
StreamQuery executions are configured by two external sources. ENVIRONMENT variables and
the input event. ENVIRONMENT variables help configure the application at deployment,
whereas the input event tracks state within a single state machine.
FIXME (Ryxias)
We should re-evaluate which environment variables can be deployed via configuration files
instead of being embedded into Terraform configurations.
By design, StreamQuery's executions should be nonblocking. Waiting on Athena to complete
many query executions is a waste of Lambda execution time, so StreamQuery is designed to
fire-and-forget Athena queries. Upon first execution, query execution ids are saved into
the state machine. Subsequent executions check the statuses of these queries, and dispatch
the results of successful queries to StreamAlert. This process repeats until all scheduled
queries are dispatched.
Params:
event (dict)
The input event, which represents the state of the state machine.
StreamQuery expects a very specific structure to the event. See StateManager or
StepFunctionStateManager for more details.
Returns:
dict: The final state of the state machine.
"""

# Start the function
logger = self._logger
logger.info('Running scheduled_queries lambda handler')
logger.debug(
self._services.logger.info('Running scheduled_queries lambda handler')
self._services.logger.debug(
'Invocation event: %s', event
)
logger.debug(
'ServiceContainer parameters: %s', parameters
self._services.logger.debug(
'ServiceContainer parameters: %s', get_streamquery_env_vars()
)

# Load up any prior state from the event passed in from the StepFunction
state_manager_loader = StepFunctionStateManager(
self._service_container.get('state_manager'),
logger
)
state_manager_loader = self._services.create_step_function_state_manager()
state_manager_loader.load_from_step_function_event(event)

# Wind the clock as part of the setup operation, if necessary
state_manager = self._service_container.get('state_manager') # type: StateManager
clock = self._service_container.get('clock')
isotime = state_manager.get('streamquery_configuration', {}).get('clock', False)
if isotime:
clock_datetime = datetime.strptime(isotime, "%Y-%m-%dT%H:%M:%SZ")
clock.time_machine(clock_datetime)
logger.info('Winding clock to %s...', clock.now)
else:
logger.warning('No clock configuration provided. Defaulting to %s', clock.now)

# Execute a single pass of the StreamQuery runtime
processor = self._service_container.get('command_processor') # type: CommandProcessor
done = processor.nonblocking_single_pass()
done = self._services.command_processor.nonblocking_single_pass()

# Set the updated state into the response
# The step function as-written currently looks specifically for $.done and
# $.continue and expects both of them to be present AND to be adopt exact
# numeric values
#
# When 'continue' is set to 1, the state machine will go into a waiting state, then
# re-execute this Lambda function again. When 'done' is set to 1, the state machine
# is considered complete and will not execute again. This should only happen if all
# scheduled queries have completed or failed.
#
# @see terraform/modules/tf_scheduled_queries/step_function.tf
response = {
'done': 1 if done else 0,
'continue': 1,
Expand Down
19 changes: 11 additions & 8 deletions streamalert/scheduled_queries/config/lambda_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
"""
import os

parameters = {
'command_name': 'StreamQuery',
'aws_region': os.environ['REGION'],
'log_level': os.environ['LOGGER_LEVEL'],
'athena_database': os.environ['ATHENA_DATABASE'],
'athena_results_bucket': os.environ['ATHENA_RESULTS_BUCKET'],
'kinesis_stream': os.environ['KINESIS_STREAM'],
}

def get_streamquery_env_vars():
"""Returns environment variables pertinent to StreamQuery"""
return {
'command_name': 'StreamQuery',
'aws_region': os.environ['REGION'],
'log_level': os.environ['LOGGER_LEVEL'],
'athena_database': os.environ['ATHENA_DATABASE'],
'athena_results_bucket': os.environ['ATHENA_RESULTS_BUCKET'],
'kinesis_stream': os.environ['KINESIS_STREAM'],
}
Loading

0 comments on commit 6db7e47

Please sign in to comment.