Skip to content

Commit

Permalink
MINIFICPP-1048 - Add PublishKafka docker tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nghiaxlee committed Oct 9, 2019
1 parent b95fc10 commit 96bfe07
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-${MINIFI_VERSION}
RUN cd ${MINIFI_BASE_DIR} \
&& mkdir build \
&& cd build \
&& cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON .. \
&& cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON .. \
&& make -j8 package \
&& tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR}

Expand Down
60 changes: 55 additions & 5 deletions docker/test/integration/minifi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import yaml
from copy import copy

import time
from collections import OrderedDict


class Cluster(object):
"""
Expand Down Expand Up @@ -64,7 +67,7 @@ def __init__(self):
self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
self.network = None
self.containers = []
self.containers = OrderedDict()
self.images = []
self.tmp_files = []

Expand Down Expand Up @@ -93,12 +96,22 @@ def deploy_flow(self,
if self.network is None:
net_name = 'nifi-' + str(uuid.uuid4())
logging.info('Creating network: %s', net_name)
self.network = self.client.networks.create(net_name)
# Set IP
ipam_pool = docker.types.IPAMPool(
subnet='192.168.42.0/24',
gateway='192.168.42.1'
)
ipam_config = docker.types.IPAMConfig(
pool_configs=[ipam_pool]
)
self.network = self.client.networks.create(net_name, ipam=ipam_config)

if engine == 'nifi':
self.deploy_nifi_flow(flow, name, vols)
elif engine == 'minifi-cpp':
self.deploy_minifi_cpp_flow(flow, name, vols)
elif engine == 'kafka-broker':
self.deploy_kafka_broker(name)
else:
raise Exception('invalid flow engine: \'%s\'' % engine)

Expand Down Expand Up @@ -148,7 +161,7 @@ def deploy_minifi_cpp_flow(self, flow, name, vols):

logging.info('Started container \'%s\'', container.name)

self.containers.append(container)
self.containers[container.name] = container

def deploy_nifi_flow(self, flow, name, vols):
dockerfile = dedent("""FROM {base_image}
Expand Down Expand Up @@ -198,7 +211,35 @@ def deploy_nifi_flow(self, flow, name, vols):

logging.info('Started container \'%s\'', container.name)

self.containers.append(container)
self.containers[container.name] = container

def deploy_kafka_broker(self, name):
dockerfile = dedent("""FROM {base_image}
USER root
CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic test > heaven_signal.txt
""".format(base_image='spotify/kafka:latest'))

logging.info('Creating and running docker container for kafka broker...')

broker = self.client.containers.run(
self.client.images.pull("spotify/kafka:latest"),
detach=True,
name='kafka-broker',
ports={'2181/tcp': 2181, '9092/tcp': 9092},
environment=["ADVERTISED_HOST=192.168.42.4", "ADVERTISED_PORT=9092"]
)
self.network.connect(broker, ipv4_address='192.168.42.4')

configured_image = self.build_image(dockerfile, [])
consumer = self.client.containers.run(
configured_image[0],
detach=True,
name='kafka-consumer',
network=self.network.name
)

self.containers[consumer.name] = consumer
self.containers[broker.name] = broker

def build_image(self, dockerfile, context_files):
conf_dockerfile_buffer = BytesIO()
Expand Down Expand Up @@ -227,6 +268,7 @@ def build_image(self, dockerfile, context_files):
custom_context=True,
rm=True,
forcerm=True)
logging.info('Created image with id: %s', configured_image[0].id)
self.images.append(configured_image)

finally:
Expand All @@ -247,7 +289,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""

# Clean up containers
for container in self.containers:
for container in self.containers.values():
logging.info('Cleaning up container: %s', container.name)
container.remove(v=True, force=True)

Expand Down Expand Up @@ -466,6 +508,14 @@ def nifi_property_key(self, key):
else:
return key

class PublishKafka(Processor):
def __init__(self):
super(PublishKafka, self).__init__('PublishKafka',
properties={'Client Name': 'nghiaxlee', 'Known Brokers': '192.168.42.4:9092', 'Topic Name': 'test',
'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
'Request Timeout': '10 sec', 'Message Timeout': '5 sec'},
auto_terminate=['success'])


class InputPort(Connectable):
def __init__(self, name=None, remote_process_group=None):
Expand Down
106 changes: 94 additions & 12 deletions docker/test/integration/minifi/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import logging
import shutil
import uuid
import tarfile
from io import BytesIO
from threading import Event

import os
Expand Down Expand Up @@ -67,6 +69,9 @@ def __init__(self, output_validator):

super(DockerTestCluster, self).__init__()

if isinstance(output_validator, KafkaValidator):
output_validator.set_containers(self.containers)

def deploy_flow(self,
flow,
name=None,
Expand All @@ -89,6 +94,26 @@ def deploy_flow(self,
name=name,
engine=engine)

def start_flow(self, name):
container = self.containers[name]
container.reload()
logging.info("Status before start: %s", container.status)
if container.status == 'exited':
logging.info("Start container: %s", name)
container.start()
return True
return False

def stop_flow(self, name):
container = self.containers[name]
container.reload()
logging.info("Status before stop: %s", container.status)
if container.status == 'running':
logging.info("Stop container: %s", name)
container.stop(timeout=0)
return True
return False

def put_test_data(self, contents):
"""
Creates a randomly-named file in the test input dir and writes
Expand Down Expand Up @@ -116,7 +141,7 @@ def wait_for_output(self, timeout_seconds):

def log_nifi_output(self):

for container in self.containers:
for container in self.containers.values():
container = self.client.containers.get(container.id)
logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs().decode("utf-8"))
if b'Segmentation fault' in container.logs():
Expand All @@ -141,14 +166,20 @@ def log_nifi_output(self):
stats = container.stats(stream=False)
logging.info('Container stats:\n%s', stats)

def check_output(self, timeout=5):
def check_output(self, timeout=5, **kwargs):
"""
Wait for flow output, validate it, and log minifi output.
"""
self.wait_for_output(timeout)
self.log_nifi_output()

return self.output_validator.validate() and not self.segfault
if self.segfault:
return false
if isinstance(self.output_validator, FileOutputValidator):
return self.output_validator.validate(dir=kwargs.get('dir', ''))
return self.output_validator.validate()
def rm_out_child(self, dir):
logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
shutil.rmtree(self.tmp_test_output_dir + dir)

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Expand Down Expand Up @@ -204,6 +235,9 @@ class FileOutputValidator(OutputValidator):
def set_output_dir(self, output_dir):
self.output_dir = output_dir

def validate(self, dir=''):
pass

class SingleFileOutputValidator(FileOutputValidator):
"""
Validates the content of a single file in the given directory.
Expand All @@ -213,32 +247,80 @@ def __init__(self, expected_content):
self.valid = False
self.expected_content = expected_content

def validate(self):
def validate(self, dir=''):

if self.valid:
return True
self.valid = False

listing = listdir(self.output_dir)
full_dir = self.output_dir + dir
logging.info("Output folder: %s", full_dir)

listing = listdir(full_dir)

if listing:
for l in listing:
logging.info("name:: %s", l)
out_file_name = listing[0]

with open(join(self.output_dir, out_file_name), 'r') as out_file:
with open(join(full_dir, out_file_name), 'r') as out_file:
contents = out_file.read()
logging.info("dir %s -- name %s", full_dir, out_file_name)
logging.info("expected %s -- content %s", self.expected_content, contents)

if contents == self.expected_content:
if self.expected_content in contents:
self.valid = True

return self.valid

class KafkaValidator(OutputValidator):
"""
Validates PublishKafka
"""

def __init__(self, expected_content):
self.valid = False
self.expected_content = expected_content
self.containers = None

def set_containers(self, containers):
self.containers = containers

def validate(self):

if self.valid:
return True
if self.containers is None:
return self.valid

if 'kafka-consumer' not in self.containers:
logging.info('Not found kafka container.')
return False
else:
kafka_container = self.containers['kafka-consumer']

output, stat = kafka_container.get_archive('/heaven_signal.txt')
file_obj = BytesIO()
for i in output:
file_obj.write(i)
file_obj.seek(0)
tar = tarfile.open(mode='r', fileobj=file_obj)
contents = tar.extractfile('heaven_signal.txt').read()
logging.info("expected %s -- content %s", self.expected_content, contents)

contents = contents.decode("utf-8")
if self.expected_content in contents:
self.valid = True

logging.info("expected %s -- content %s", self.expected_content, contents)
return self.valid

class EmptyFilesOutPutValidator(FileOutputValidator):
"""
Validates if all the files in the target directory are empty and at least one exists
"""
def __init__(self):
self.valid = False

def validate(self):
def validate(self, dir=''):

if self.valid:
return True
Expand All @@ -256,7 +338,7 @@ class NoFileOutPutValidator(FileOutputValidator):
def __init__(self):
self.valid = False

def validate(self):
def validate(self, dir=''):

if self.valid:
return True
Expand Down
76 changes: 76 additions & 0 deletions docker/test/integration/test_rdkafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the \"License\"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an \"AS IS\" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from minifi import *
from minifi.test import *


def test_publish_kafka():
"""
Verify delivery of message to kafka broker
"""
producer_flow = GetFile('/tmp/input') >> PublishKafka() >> ('success', LogAttribute())

with DockerTestCluster(KafkaValidator('test')) as cluster:
cluster.put_test_data('test')
cluster.deploy_flow(None, engine='kafka-broker')
cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')

assert cluster.check_output(10)

def test_no_broker():
"""
Verify failure case when broker is down
"""
producer_flow = (GetFile('/tmp/input') >> PublishKafka()
>> (('failure', PutFile('/tmp/output')),
('success', LogAttribute())))

with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster:
cluster.put_test_data('no broker')
cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')

assert cluster.check_output(30)

def test_broker_on_off():
"""
Verify delivery of message when broker is unstable
"""
producer_flow = (GetFile('/tmp/input') >> PublishKafka()
>> (('success', PutFile('/tmp/output/success')),
('failure', PutFile('/tmp/output/failure'))))

with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
cluster.put_test_data('test')
cluster.deploy_flow(None, engine='kafka-broker')
cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')

def start_kafka():
assert cluster.start_flow('kafka-broker')
assert cluster.start_flow('kafka-consumer')
def stop_kafka():
assert cluster.stop_flow('kafka-consumer')
assert cluster.stop_flow('kafka-broker')

assert cluster.check_output(10, dir='/success')
stop_kafka()
assert cluster.check_output(30, dir='/failure')
start_kafka()
cluster.rm_out_child('/success')
assert cluster.check_output(30, dir='/success')
stop_kafka()
cluster.rm_out_child('/failure')
assert cluster.check_output(30, dir='/failure')

0 comments on commit 96bfe07

Please sign in to comment.