From 96bfe07a5646ae8ac7b75e3552a3cdd2903076fc Mon Sep 17 00:00:00 2001 From: Nghia Le Date: Wed, 18 Sep 2019 15:39:39 +0200 Subject: [PATCH] MINIFICPP-1048 - Add PublishKafka docker tests --- docker/Dockerfile | 2 +- docker/test/integration/minifi/__init__.py | 60 +++++++++- .../test/integration/minifi/test/__init__.py | 106 ++++++++++++++++-- docker/test/integration/test_rdkafka.py | 76 +++++++++++++ 4 files changed, 226 insertions(+), 18 deletions(-) create mode 100644 docker/test/integration/test_rdkafka.py diff --git a/docker/Dockerfile b/docker/Dockerfile index 38bcaf7bc0..7d8847769b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -76,7 +76,7 @@ ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-${MINIFI_VERSION} RUN cd ${MINIFI_BASE_DIR} \ && mkdir build \ && cd build \ - && cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON .. \ + && cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON .. \ && make -j8 package \ && tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR} diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py index 48f25b3567..cfb2eb56e7 100644 --- a/docker/test/integration/minifi/__init__.py +++ b/docker/test/integration/minifi/__init__.py @@ -27,6 +27,9 @@ import yaml from copy import copy +import time +from collections import OrderedDict + class Cluster(object): """ @@ -64,7 +67,7 @@ def __init__(self): self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version self.network = None - self.containers = [] + self.containers = OrderedDict() self.images = [] self.tmp_files = [] @@ -93,12 +96,22 @@ def deploy_flow(self, if self.network is None: net_name = 'nifi-' + str(uuid.uuid4()) logging.info('Creating network: %s', net_name) - self.network = self.client.networks.create(net_name) + # Set IP + ipam_pool = docker.types.IPAMPool( + subnet='192.168.42.0/24', + gateway='192.168.42.1' + ) + ipam_config = docker.types.IPAMConfig( + pool_configs=[ipam_pool] + ) + self.network = self.client.networks.create(net_name, ipam=ipam_config) if engine == 'nifi': self.deploy_nifi_flow(flow, name, vols) elif engine == 'minifi-cpp': self.deploy_minifi_cpp_flow(flow, name, vols) + elif engine == 'kafka-broker': + self.deploy_kafka_broker(name) else: raise Exception('invalid flow engine: \'%s\'' % engine) @@ -148,7 +161,7 @@ def deploy_minifi_cpp_flow(self, flow, name, vols): logging.info('Started container \'%s\'', container.name) - self.containers.append(container) + self.containers[container.name] = container def deploy_nifi_flow(self, flow, name, vols): dockerfile = dedent("""FROM {base_image} @@ -198,7 +211,35 @@ def deploy_nifi_flow(self, flow, name, vols): logging.info('Started container \'%s\'', container.name) - self.containers.append(container) + self.containers[container.name] = container + + def deploy_kafka_broker(self, name): + dockerfile = dedent("""FROM {base_image} + USER root + CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic test > heaven_signal.txt + """.format(base_image='spotify/kafka:latest')) + + logging.info('Creating and running docker container for kafka broker...') + + broker = self.client.containers.run( + self.client.images.pull("spotify/kafka:latest"), + detach=True, + name='kafka-broker', + ports={'2181/tcp': 2181, '9092/tcp': 9092}, + environment=["ADVERTISED_HOST=192.168.42.4", "ADVERTISED_PORT=9092"] + ) + self.network.connect(broker, ipv4_address='192.168.42.4') + + configured_image = self.build_image(dockerfile, []) + consumer = self.client.containers.run( + configured_image[0], + detach=True, + name='kafka-consumer', + network=self.network.name + ) + + self.containers[consumer.name] = consumer + self.containers[broker.name] = broker def build_image(self, dockerfile, context_files): conf_dockerfile_buffer = BytesIO() @@ -227,6 +268,7 @@ def build_image(self, dockerfile, context_files): custom_context=True, rm=True, forcerm=True) + logging.info('Created image with id: %s', configured_image[0].id) self.images.append(configured_image) finally: @@ -247,7 +289,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): """ # Clean up containers - for container in self.containers: + for container in self.containers.values(): logging.info('Cleaning up container: %s', container.name) container.remove(v=True, force=True) @@ -466,6 +508,14 @@ def nifi_property_key(self, key): else: return key +class PublishKafka(Processor): + def __init__(self): + super(PublishKafka, self).__init__('PublishKafka', + properties={'Client Name': 'nghiaxlee', 'Known Brokers': '192.168.42.4:9092', 'Topic Name': 'test', + 'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1', + 'Request Timeout': '10 sec', 'Message Timeout': '5 sec'}, + auto_terminate=['success']) + class InputPort(Connectable): def __init__(self, name=None, remote_process_group=None): diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py index 67affa986c..a5f94676e3 100644 --- a/docker/test/integration/minifi/test/__init__.py +++ b/docker/test/integration/minifi/test/__init__.py @@ -16,6 +16,8 @@ import logging import shutil import uuid +import tarfile +from io import BytesIO from threading import Event import os @@ -67,6 +69,9 @@ def __init__(self, output_validator): super(DockerTestCluster, self).__init__() + if isinstance(output_validator, KafkaValidator): + output_validator.set_containers(self.containers) + def deploy_flow(self, flow, name=None, @@ -89,6 +94,26 @@ def deploy_flow(self, name=name, engine=engine) + def start_flow(self, name): + container = self.containers[name] + container.reload() + logging.info("Status before start: %s", container.status) + if container.status == 'exited': + logging.info("Start container: %s", name) + container.start() + return True + return False + + def stop_flow(self, name): + container = self.containers[name] + container.reload() + logging.info("Status before stop: %s", container.status) + if container.status == 'running': + logging.info("Stop container: %s", name) + container.stop(timeout=0) + return True + return False + def put_test_data(self, contents): """ Creates a randomly-named file in the test input dir and writes @@ -116,7 +141,7 @@ def wait_for_output(self, timeout_seconds): def log_nifi_output(self): - for container in self.containers: + for container in self.containers.values(): container = self.client.containers.get(container.id) logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs().decode("utf-8")) if b'Segmentation fault' in container.logs(): @@ -141,14 +166,20 @@ def log_nifi_output(self): stats = container.stats(stream=False) logging.info('Container stats:\n%s', stats) - def check_output(self, timeout=5): + def check_output(self, timeout=5, **kwargs): """ Wait for flow output, validate it, and log minifi output. """ self.wait_for_output(timeout) self.log_nifi_output() - - return self.output_validator.validate() and not self.segfault + if self.segfault: + return false + if isinstance(self.output_validator, FileOutputValidator): + return self.output_validator.validate(dir=kwargs.get('dir', '')) + return self.output_validator.validate() + def rm_out_child(self, dir): + logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir) + shutil.rmtree(self.tmp_test_output_dir + dir) def __exit__(self, exc_type, exc_val, exc_tb): """ @@ -204,6 +235,9 @@ class FileOutputValidator(OutputValidator): def set_output_dir(self, output_dir): self.output_dir = output_dir + def validate(self, dir=''): + pass + class SingleFileOutputValidator(FileOutputValidator): """ Validates the content of a single file in the given directory. @@ -213,24 +247,72 @@ def __init__(self, expected_content): self.valid = False self.expected_content = expected_content - def validate(self): + def validate(self, dir=''): - if self.valid: - return True + self.valid = False - listing = listdir(self.output_dir) + full_dir = self.output_dir + dir + logging.info("Output folder: %s", full_dir) + + listing = listdir(full_dir) if listing: + for l in listing: + logging.info("name:: %s", l) out_file_name = listing[0] - with open(join(self.output_dir, out_file_name), 'r') as out_file: + with open(join(full_dir, out_file_name), 'r') as out_file: contents = out_file.read() + logging.info("dir %s -- name %s", full_dir, out_file_name) + logging.info("expected %s -- content %s", self.expected_content, contents) - if contents == self.expected_content: + if self.expected_content in contents: self.valid = True return self.valid +class KafkaValidator(OutputValidator): + """ + Validates PublishKafka + """ + + def __init__(self, expected_content): + self.valid = False + self.expected_content = expected_content + self.containers = None + + def set_containers(self, containers): + self.containers = containers + + def validate(self): + + if self.valid: + return True + if self.containers is None: + return self.valid + + if 'kafka-consumer' not in self.containers: + logging.info('Not found kafka container.') + return False + else: + kafka_container = self.containers['kafka-consumer'] + + output, stat = kafka_container.get_archive('/heaven_signal.txt') + file_obj = BytesIO() + for i in output: + file_obj.write(i) + file_obj.seek(0) + tar = tarfile.open(mode='r', fileobj=file_obj) + contents = tar.extractfile('heaven_signal.txt').read() + logging.info("expected %s -- content %s", self.expected_content, contents) + + contents = contents.decode("utf-8") + if self.expected_content in contents: + self.valid = True + + logging.info("expected %s -- content %s", self.expected_content, contents) + return self.valid + class EmptyFilesOutPutValidator(FileOutputValidator): """ Validates if all the files in the target directory are empty and at least one exists @@ -238,7 +320,7 @@ class EmptyFilesOutPutValidator(FileOutputValidator): def __init__(self): self.valid = False - def validate(self): + def validate(self, dir=''): if self.valid: return True @@ -256,7 +338,7 @@ class NoFileOutPutValidator(FileOutputValidator): def __init__(self): self.valid = False - def validate(self): + def validate(self, dir=''): if self.valid: return True diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py new file mode 100644 index 0000000000..5a5c696fd6 --- /dev/null +++ b/docker/test/integration/test_rdkafka.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minifi import * +from minifi.test import * + + +def test_publish_kafka(): + """ + Verify delivery of message to kafka broker + """ + producer_flow = GetFile('/tmp/input') >> PublishKafka() >> ('success', LogAttribute()) + + with DockerTestCluster(KafkaValidator('test')) as cluster: + cluster.put_test_data('test') + cluster.deploy_flow(None, engine='kafka-broker') + cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp') + + assert cluster.check_output(10) + +def test_no_broker(): + """ + Verify failure case when broker is down + """ + producer_flow = (GetFile('/tmp/input') >> PublishKafka() + >> (('failure', PutFile('/tmp/output')), + ('success', LogAttribute()))) + + with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster: + cluster.put_test_data('no broker') + cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp') + + assert cluster.check_output(30) + +def test_broker_on_off(): + """ + Verify delivery of message when broker is unstable + """ + producer_flow = (GetFile('/tmp/input') >> PublishKafka() + >> (('success', PutFile('/tmp/output/success')), + ('failure', PutFile('/tmp/output/failure')))) + + with DockerTestCluster(SingleFileOutputValidator('test')) as cluster: + cluster.put_test_data('test') + cluster.deploy_flow(None, engine='kafka-broker') + cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp') + + def start_kafka(): + assert cluster.start_flow('kafka-broker') + assert cluster.start_flow('kafka-consumer') + def stop_kafka(): + assert cluster.stop_flow('kafka-consumer') + assert cluster.stop_flow('kafka-broker') + + assert cluster.check_output(10, dir='/success') + stop_kafka() + assert cluster.check_output(30, dir='/failure') + start_kafka() + cluster.rm_out_child('/success') + assert cluster.check_output(30, dir='/success') + stop_kafka() + cluster.rm_out_child('/failure') + assert cluster.check_output(30, dir='/failure') +