Skip to content

Commit

Permalink
input_topics / electron bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
brunneis committed Apr 29, 2019
1 parent da3cd96 commit 9290003
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 52 deletions.
9 changes: 8 additions & 1 deletion catenae/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ def get_sendable(self):
return copy

def deepcopy(self):
return copy.deepcopy(self)
electron = Electron()
electron.key = self.key
electron.value = copy.deepcopy(self.value)
electron.topic = self.topic
electron.previous_topic = self.previous_topic
electron.unpack_if_string = self.unpack_if_string
self.callbacks = []
return electron

def copy(self):
return Electron(self.key, self.value, self.topic, self.previous_topic, self.unpack_if_string, self.callbacks)
99 changes: 65 additions & 34 deletions catenae/link.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# *** ** ****************** *** *** *** ** ***
# *** **** ** *** **** *** **** ***
# *** *** *** ** *** ***** *** *** *** ***
# *** *** *** ** *** *** *** *** *** *** ***
# *** *** *** ** ****************** *** *** *** *** *** ******************
# *** *** *** ** *** *** ** *** *** *** ***
# *** *** *** ** *** *** ***** *** *** ***
# *** *** *** ** *** *** **** *** *** ***
# *** *** *** ** *** *** *** *** *** ***

# Catenae
# Copyright (C) 2017-2019 Rodrigo Martínez Castaño
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import math
from threading import Lock
from pickle5 import pickle
Expand All @@ -25,6 +51,8 @@ class Link:
MULTIPLE_KAFKA_INPUTS_CUSTOM_OUTPUT = 2
CUSTOM_INPUT = 3

NO_INPUT_TOPIC_SLEEP_SECONDS = 1

def __init__(self,
log_level='INFO',
link_mode=None,
Expand Down Expand Up @@ -471,20 +499,6 @@ def _kafka_consumer_rpc(self):
self.suicide('Kafka consumer error', exception=True)

def _kafka_consumer_main(self):
# Since the list
while not self._input_topics:
self.logger.log('No input topics, waiting...', level='debug')
time.sleep(1)

if self._input_mode == 'parity':
self._input_topic_assignments = {-1: -1}

# If topics are not specified, the first is used
elif not self._input_topic_assignments:
self._input_topic_assignments = {}
self._input_topic_assignments[self._input_topics[0]] = -1

# Kafka Consumer
if self._synchronous:
properties = dict(self._kafka_consumer_synchronous_properties)
else:
Expand All @@ -495,7 +509,19 @@ def _kafka_consumer_main(self):
prev_queued_messages = 0

while not self._consumer_main_thread.stopped():
for topic in self._input_topic_assignments.keys():
while not self._input_topics:
self.logger.log('No input topics, waiting...', level='debug')
time.sleep(Link.NO_INPUT_TOPIC_SLEEP_SECONDS)

self._set_input_topic_assignments()
current_input_topic_assignments = dict(self._input_topic_assignments)

for topic in current_input_topic_assignments.keys():
with self._input_topics_lock:
if self._changed_input_topics:
self._changed_input_topics = False
break

# Buffer for the current topic
message_buffer = []

Expand All @@ -512,15 +538,14 @@ def _kafka_consumer_main(self):

try:
start_time = utils.get_timestamp_ms()
assigned_time = self._input_topic_assignments[topic]
assigned_time = current_input_topic_assignments[topic]
while assigned_time == -1 or Link.in_time(start_time, assigned_time):
# Subscribe to the topics again if input topics have changed
self._input_topics_lock.acquire()
if self._changed_input_topics:
self._changed_input_topics = False
self._input_topics_lock.release()
break
self._input_topics_lock.release()
with self._input_topics_lock:
if self._changed_input_topics:
# _changed_input_topics is set to False in the
# outer loop so both loops are broken
break

message = consumer.poll(5)

Expand Down Expand Up @@ -663,21 +688,19 @@ def _thread_target(self, target, args=None, kwargs=None):
def add_input_topic(self, input_topic):
if input_topic not in self._input_topics:
self._input_topics.append(input_topic)
self._input_topics_lock.acquire()
if self._input_mode == 'exp':
self._set_input_topic_exp_assignments()
self._changed_input_topics = True
self._input_topics_lock.release()
with self._input_topics_lock:
if self._input_mode == 'exp':
self._set_input_topic_assignments()
self._changed_input_topics = True
self.logger.log(f'added input {input_topic}')

def remove_input_topic(self, input_topic):
if input_topic in self._input_topics:
self._input_topics.remove(input_topic)
self._input_topics_lock.acquire()
if self._input_mode == 'exp':
self._set_input_topic_exp_assignments()
self._changed_input_topics = True
self._input_topics_lock.release()
with self._input_topics_lock:
if self._input_mode == 'exp':
self._set_input_topic_assignments()
self._changed_input_topics = True
self.logger.log(f'removed input {input_topic}')

def start(self):
Expand Down Expand Up @@ -727,7 +750,7 @@ def _launch_threads(self):
if self._is_custom_input:
input_target = self.custom_input
elif self._is_multiple_kafka_input and self._input_mode == 'exp':
self._set_input_topic_exp_assignments()
self._set_input_topic_assignments()
consumer_kwargs = {'target': input_target}
self._consumer_main_thread = Thread(target=self._thread_target, kwargs=consumer_kwargs)
self._consumer_main_thread.start()
Expand Down Expand Up @@ -869,8 +892,16 @@ def _set_consumer_timeout(self, consumer_timeout):
self._consumer_timeout = consumer_timeout
self._consumer_timeout = self._consumer_timeout * 1000

def _set_input_topic_exp_assignments(self):
def _set_input_topic_assignments(self):
if self._input_mode == 'parity':
self._input_topic_assignments = {-1: -1}
return

self._input_topic_assignments = {}

if len(self._input_topics[0]) == 1:
self._input_topic_assignments[self._input_topics[0]] = -1

window_size = 900 # in seconds, 15 minutes
topics_no = len(self._input_topics)
self.logger.log('input topics time assingments:')
Expand Down
8 changes: 4 additions & 4 deletions test/kafka/middle_link_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ def dummy_log(message):
logging.info(f'MiddleLinkAsync -> {message}')

def remote_method(self, context, message):
logging.info(f'RPC invocation of remote_method(message): {message}.\Context: {context}')
logging.info(f'RPC invocation of remote_method(message): {message}. Context: {context}')

def remote_add_input_topic(self, _, topic):
def remote_add_input_topic(self, context, topic):
logging.info(f'RPC invocation of remote_add_input_topic(): {topic}. Context: {context}')
self.add_input_topic(topic)

def setup(self):
Expand All @@ -35,8 +36,7 @@ def setup(self):

def transform(self, electron):
logging.debug(f'{self.__class__.__name__} -> transform()')
logging.debug(
f'{self.__class__.__name__} -> received key: {electron.key}, value: {electron.value}')
logging.debug(f'{self.__class__.__name__} -> received key: {electron.key}, value: {electron.value}')
electron.key = str(electron.key) + '_transformed_async'
electron.value = str(electron.value) + '_transformed_async'
logging.debug(f'{self.__class__.__name__} -> previous topic: {electron.previous_topic}')
Expand Down
18 changes: 5 additions & 13 deletions test/kafka/middle_link_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ def setup(self):

def transform(self, electron):
logging.debug(f'{self.__class__.__name__} -> transform()')
logging.debug(
f'{self.__class__.__name__} -> received key: {electron.key}, value: {electron.value}')
logging.debug(f'{self.__class__.__name__} -> received key: {electron.key}, value: {electron.value}')
electron.key = str(electron.key) + '_transformed_sync'
electron.value = str(electron.value) + '_transformed_sync'
logging.debug(f'{self.__class__.__name__} -> previous topic: {electron.previous_topic}')
Expand All @@ -56,21 +55,14 @@ def transform(self, electron):
elif option == 5:
return electron, self.instance_callback_args, ['instance_arg_val1', 'instance_arg_val2']
elif option == 6:
return electron, self.instance_callback_args, {
'arg1': 'instance_kwarg_val1',
'arg2': 'instance_kwarg_val2'
}
return electron, self.instance_callback_args, {'arg1': 'instance_kwarg_val1', 'arg2': 'instance_kwarg_val2'}
elif option == 7:
logging.debug(
f'{self.__class__.__name__} -> RPC invocation of remote_method() (MiddleLinkAsync links)'
)
logging.debug(f'{self.__class__.__name__} -> RPC invocation of remote_method() (MiddleLinkAsync links)')
self.rpc_call('MiddleLinkAsync', 'remote_method', 'Hi from MiddleLinkSync')
elif option == 8:
logging.debug(
f'{self.__class__.__name__} -> RPC invocation of remote_add_input_topic() (MiddleLinkAsync links)'
)
self.rpc_call('MiddleLinkAsync', 'remote_add_input_topic',
f'new_topic_{random.randint(0, 1000)}')
f'{self.__class__.__name__} -> RPC invocation of remote_add_input_topic() (MiddleLinkAsync links)')
self.rpc_call('MiddleLinkAsync', 'remote_add_input_topic', f'new_topic_{random.randint(0, 1000)}')


if __name__ == "__main__":
Expand Down

0 comments on commit 9290003

Please sign in to comment.