From 9ab099fb471c4a23376d42f06c121bf9ae73872a Mon Sep 17 00:00:00 2001 From: Fabio de Simoni Date: Fri, 4 Sep 2020 15:48:48 +0100 Subject: [PATCH 1/3] Adding SSL support to mce_cli.py --- metadata-ingestion/mce-cli/kafka.config | 14 ++++++++++++++ metadata-ingestion/mce-cli/mce_cli.py | 22 ++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 metadata-ingestion/mce-cli/kafka.config diff --git a/metadata-ingestion/mce-cli/kafka.config b/metadata-ingestion/mce-cli/kafka.config new file mode 100644 index 0000000000000..e708d2c7bf7ca --- /dev/null +++ b/metadata-ingestion/mce-cli/kafka.config @@ -0,0 +1,14 @@ +# https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka +# Kafka Brokers Certs +#security.protocol=ssl +ssl.ca.location=certificate_ca.pem +ssl.certificate.location=certificate.pem +ssl.key.location=private_key.pem +ssl.key.password=XXXXXXX + +# Schema Registry Certs +schema.registry.ssl.ca.location=certificate_ca.pem +schema.registry.ssl.certificate.location=certificate.pem +schema.registry.ssl.key.location=private_key.pem +schema.registry.ssl.key.password=XXXXXXX + diff --git a/metadata-ingestion/mce-cli/mce_cli.py b/metadata-ingestion/mce-cli/mce_cli.py index e715ff5bd6e29..92e81cc73621b 100644 --- a/metadata-ingestion/mce-cli/mce_cli.py +++ b/metadata-ingestion/mce-cli/mce_cli.py @@ -78,11 +78,30 @@ def consume(conf, schema_record): c.close() +def convert_config_to_dict(file_location): + #print("converting: " + file_location) + result = {} + f = open(file_location, "r") + fl =f.readlines() + for line in fl: + if line[0] == '#' or line == '\n': + continue + key, value = line.split("=", 1) + if value is not None: + result = {**result, **{key : value.rsplit("\n")[0]}} + else: + result = {**result, **{key : None}} + return result + + def main(args): # Handle common configs conf = {'bootstrap.servers': args.bootstrap_servers, 'schema.registry.url': args.schema_registry} + if args.command_config is not None: + conf.update(convert_config_to_dict(args.command_config)) + if args.mode == "produce": produce(conf, args.data_file, args.schema_record) else: @@ -104,4 +123,7 @@ def main(args): help="Avro schema record; required if running 'producer' mode") parser.add_argument('-d', dest="data_file", default="bootstrap_mce.dat", help="MCE data file; required if running 'producer' mode") + parser.add_argument('-c', dest="command_config", + default="kafka.config", help="Kafka SSL details") main(parser.parse_args()) + From 3ce503273a90316a492221d90525021029aa08cf Mon Sep 17 00:00:00 2001 From: Fabio de Simoni Date: Fri, 4 Sep 2020 16:00:17 +0100 Subject: [PATCH 2/3] Kafka Config option --- metadata-ingestion/mce-cli/mce_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/mce-cli/mce_cli.py b/metadata-ingestion/mce-cli/mce_cli.py index 92e81cc73621b..61f26cb43bf0d 100644 --- a/metadata-ingestion/mce-cli/mce_cli.py +++ b/metadata-ingestion/mce-cli/mce_cli.py @@ -124,6 +124,6 @@ def main(args): parser.add_argument('-d', dest="data_file", default="bootstrap_mce.dat", help="MCE data file; required if running 'producer' mode") parser.add_argument('-c', dest="command_config", - default="kafka.config", help="Kafka SSL details") + default="None", help="Kafka SSL details") main(parser.parse_args()) From 645e3a64fa93aff833e80eb575ca5398f41ee246 Mon Sep 17 00:00:00 2001 From: Fabio de Simoni Date: Fri, 4 Sep 2020 18:44:40 +0100 Subject: [PATCH 3/3] Adding space and removing the commented line --- metadata-ingestion/mce-cli/mce_cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/mce-cli/mce_cli.py b/metadata-ingestion/mce-cli/mce_cli.py index 61f26cb43bf0d..7a1128abe0073 100644 --- a/metadata-ingestion/mce-cli/mce_cli.py +++ b/metadata-ingestion/mce-cli/mce_cli.py @@ -79,10 +79,9 @@ def consume(conf, schema_record): def convert_config_to_dict(file_location): - #print("converting: " + file_location) result = {} f = open(file_location, "r") - fl =f.readlines() + fl = f.readlines() for line in fl: if line[0] == '#' or line == '\n': continue