From 8aa7c77f9d99f987d2549f3f8eed3a72f09f8bb1 Mon Sep 17 00:00:00 2001 From: Anand Subramanian Date: Fri, 28 Sep 2018 20:26:18 +0530 Subject: [PATCH 1/3] Support parser aggregation --- .../package/scripts/parser_commands.py | 49 +++++++++++++++++-- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py index 274306a4d0..19e937092d 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py @@ -20,6 +20,7 @@ import os import re +import shlex import subprocess import time @@ -49,7 +50,45 @@ def __init__(self, params): # get list of parsers def __get_parsers(self, params): - return params.parsers.replace(' ', '').split(',') + """ + Combines the list of parser topics and sends a unique list to be used for + Kafka topic creation and the like. + :param params: + :return: List containing the names of unique parsers + """ + parserBatches = list(self.__get_aggr_parsers(params)) + parsers = ','.join(s.translate(None, '"') for s in parserBatches) + # Get only the unique list of parser names + parsers = list(set(parsers.split(','))) + return parsers + + def __get_aggr_parsers(self, params): + """ + Fetches the list of batched (and regular) parsers and returns a list. + If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example, + then this method will return ["bro,snort,yaf", "bro,snort", "yaf"] + :param params: + :return: List containing the names of parsers + """ + parserList = [] + parsers = shlex.shlex(params.parsers) + for name in parsers: + parserList.append(name.strip(',')) + return [s.translate(None, "'[]") for s in filter(None, parserList)] + + def get_parser_batch_topology_names(self, params): + """ + Returns the names of regular and batched topologies as they would run in storm + A batch (or aggregated) topology has the naming convention of 'parserA__parserB'. + For example, a list of parsers like ["bro,snort", yaf] will be returned as ["bro__snort", "yaf"] + :param params: + :return: List containing the names of parser topologies + """ + topologyName = [] + for parser in self.__get_aggr_parsers(params): + parser = parser.replace(",", "__").strip('"') + topologyName.append(parser) + return topologyName def __get_topics(self): # All errors go to indexing topics, so create it here if it's not already @@ -104,7 +143,7 @@ def init_kafka_acls(self): metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) def start_parser_topologies(self, env): - Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list())) + Logger.info("Starting Metron parser topologies: {0}".format(self.__get_aggr_parsers(self.__params))) start_cmd_template = """{0}/bin/start_parser_topology.sh \ -k {1} \ -z {2} \ @@ -118,7 +157,7 @@ def start_parser_topologies(self, env): self.__params.metron_principal_name, execute_user=self.__params.metron_user) - stopped_parsers = set(self.get_parser_list()) - self.get_running_topology_names(env) + stopped_parsers = set(self.__get_aggr_parsers(self.__params)) - self.get_running_topology_names(env) Logger.info('Parsers that need started: ' + str(stopped_parsers)) for parser in stopped_parsers: @@ -135,7 +174,7 @@ def start_parser_topologies(self, env): def stop_parser_topologies(self, env): Logger.info('Stopping parsers') - running_parsers = set(self.get_parser_list()) & self.get_running_topology_names(env) + running_parsers = set(self.get_parser_batch_topology_names(self.__params)) & self.get_running_topology_names(env) Logger.info('Parsers that need stopped: ' + str(running_parsers)) for parser in running_parsers: @@ -192,7 +231,7 @@ def topologies_running(self, env): env.set_params(self.__params) all_running = True topologies = metron_service.get_running_topologies(self.__params) - for parser in self.get_parser_list(): + for parser in self.get_parser_batch_topology_names(self.__params): parser_found = False is_running = False if parser in topologies: From 4db3c19f1358fe2bd6a53c83e2f66478d80c07b6 Mon Sep 17 00:00:00 2001 From: Anand Subramanian Date: Mon, 1 Oct 2018 09:51:11 +0530 Subject: [PATCH 2/3] Keep naming consistent --- .../CURRENT/package/scripts/parser_commands.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py index 19e937092d..18780d9340 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py @@ -64,7 +64,7 @@ def __get_parsers(self, params): def __get_aggr_parsers(self, params): """ - Fetches the list of batched (and regular) parsers and returns a list. + Fetches the list of aggregated (and regular) parsers and returns a list. If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example, then this method will return ["bro,snort,yaf", "bro,snort", "yaf"] :param params: @@ -76,10 +76,10 @@ def __get_aggr_parsers(self, params): parserList.append(name.strip(',')) return [s.translate(None, "'[]") for s in filter(None, parserList)] - def get_parser_batch_topology_names(self, params): + def get_parser_aggr_topology_names(self, params): """ - Returns the names of regular and batched topologies as they would run in storm - A batch (or aggregated) topology has the naming convention of 'parserA__parserB'. + Returns the names of regular and aggregated topologies as they would run in storm + An aggregated topology has the naming convention of 'parserA__parserB'. For example, a list of parsers like ["bro,snort", yaf] will be returned as ["bro__snort", "yaf"] :param params: :return: List containing the names of parser topologies @@ -174,7 +174,7 @@ def start_parser_topologies(self, env): def stop_parser_topologies(self, env): Logger.info('Stopping parsers') - running_parsers = set(self.get_parser_batch_topology_names(self.__params)) & self.get_running_topology_names(env) + running_parsers = set(self.get_parser_aggr_topology_names(self.__params)) & self.get_running_topology_names(env) Logger.info('Parsers that need stopped: ' + str(running_parsers)) for parser in running_parsers: @@ -231,7 +231,7 @@ def topologies_running(self, env): env.set_params(self.__params) all_running = True topologies = metron_service.get_running_topologies(self.__params) - for parser in self.get_parser_batch_topology_names(self.__params): + for parser in self.get_parser_aggr_topology_names(self.__params): parser_found = False is_running = False if parser in topologies: From bba80494060ed76c25606c8270934f0f0f6a6199 Mon Sep 17 00:00:00 2001 From: Anand Subramanian Date: Wed, 3 Oct 2018 14:45:02 +0530 Subject: [PATCH 3/3] Doc additions and tool tip changes --- .../configuration/metron-parsers-env.xml | 2 +- .../metron-parsers/ParserChaining.md | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml index a9a498b3fd..03a259499f 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml @@ -21,7 +21,7 @@ parsers bro,snort,yaf - Metron parsers to deploy + Metron parsers to deploy. You can also specify an aggregated parser list by grouping them with double quotes. For example: "parserA,parserB",parserC,parserD Metron Parsers diff --git a/metron-platform/metron-parsers/ParserChaining.md b/metron-platform/metron-parsers/ParserChaining.md index a28f11dd61..4cab74c9f5 100644 --- a/metron-platform/metron-parsers/ParserChaining.md +++ b/metron-platform/metron-parsers/ParserChaining.md @@ -49,6 +49,24 @@ data emitted from any parser, we will need to adjust the downstream parsers to extract the enveloped data from the JSON blob and treat it as the data to parse. +# Aggregated Parsers with Parser Chaining +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor. + +Say, there were three sensors (`bro`, `snort` and `yaf`). Instead of creating a topology per sensor, all 3 can be run in a single aggregated parser. It is also possible to aggregate a subset of these parsers (e.g. run `bro` as it's own topology, and aggregate the other 2). + +The step to start an aggregated parsers then becomes +``` +$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s bro,snort,yaf +``` + +which will result in a single storm topology named `bro__snort__yaf` to run. + +Aggregated parsers can be specified using the Ambari Metron config as well under Services -> Metron -> Configs -> 'Parsers' tab -> 'Metron Parsers' field. The grouping is configured by enclosing the desired parsers in double quotes. + +Some examples of specifying aggregated parsers are as follows: +* "bro,snort,yaf" --> Will start a single topology named `bro__snort__yaf` +* "ciscopixA,ciscopixB",yaf,"squid,ciscopixC" --> Will start three topologies viz. `ciscopixA__ciscopixB`, `yaf` and `squid__ciscopixC` + # Architecting a Parser Chaining Solution in Metron Currently the approach to fulfill this requirement involves a couple