Skip to content

Conversation

@buraksezer
Copy link
Contributor

Hi team!

This is my initial PR for #201. I have several questions before completing the task.

1:
I need to add config.set_property("hazelcast.serialization.input.returns.bytearray", True) to serialize/deserialize messages. I set this property before running tests and sample programs. How do I prevent this?

2:
Test suite occasionally fails. See this error messages. I need to fix that issue before adding more tests. Because the missing tests are complicated and I don't understand the what the actual problem is.

3:
How to add a publisher_address? The Go implementation doesn't use it.

4:
How to handle publish_time? It's Unix epoch now. We may need to use native Python dates here.

I extracted tests from the Go implementation. Currently some of the tests are missing. I'm going to add the following tests after fixing 2.

  • TestReliableTopicProxy_ClientNotActiveError
  • TestReliableTopicProxy_DistributedObjectDestroyedError
  • TestReliableTopicProxy_DistributedObjectDestroyed
  • TestReliableTopicProxy_Stale

I'm still iterating over the code. You can use examples/reliable-topic/reliable_topic_example.py for quick experments. Could you please share your ideas?

@devOpsHazelcast
Copy link
Contributor

Can one of the admins verify this patch?

1 similar comment
@devOpsHazelcast
Copy link
Contributor

Can one of the admins verify this patch?

@mdumandag
Copy link
Contributor

mdumandag commented May 14, 2020

Hi @buraksezer , thank you so much for your efforts. It is great to see a community activity on the python client. I will be reviewing the PR in detail tomorrow, but I think it looks pretty good. For now, let me answer your questions.

  1. Our ObjectDataInput#read_byte_array method returns list of byte-sized integers if the option you mentioned is not present. Honestly, I feel like the implementation should always return bytearray but, for backward compatibility reasons we cannot change it. The problem you have faced is in this line. https://github.com/hazelcast/hazelcast-python-client/blob/master/hazelcast/serialization/input.py#L150 . We are passing a list instead of bytearray. There is a missing check here. We should cast buff back into bytearray if self._respect_bytearrays is false. We will change the implementation of read_byte_array in 4.0 release, but for now we can use this.

  2. I ran the test suite more than 10 times but couldn't reproduce the problem. Also, from the logs, I cannot tell anything useful. It seems like the member was not responding but I don't know why. I will look into it in more detail tomorrow.

  3. You can simply pass None. It is there because the Java member side reliable topic proxy sends this data as a part of the message but clients does not. In the Java side, both member and the client use the same class so this option has to be present but can be None.

  4. I am fine with returning Unix timestamp. It is easy to convert it to datetime object but I would like to leave this to user.

Copy link
Contributor

@mdumandag mdumandag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the PR looks good. I left some reviews. I can check again as you iterate over the PR.


def publish(self, message):
raise NotImplementedError
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a blocking API. In general the API provided in the 3.x versions are non-blocking apart from the proxy creations and listener registrations. So, I think publish should be non-blocking too.

check_not_negative(start_sequence, "sequence can't be smaller than 0")
check_true(max_count >= min_count, "max count should be greater or equal to min count")
check_true(min_count <= self.capacity().result(), "min count should be smaller or equal to capacity")
#check_true(min_count <= self.capacity().result(), "min count should be smaller or equal to capacity")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct validation for this line is:

check_true(max_count <= self.capacity().result(), "max count should be smaller or equal to capacity")

Copy link
Contributor Author

@buraksezer buraksezer Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asimarslan, @mdumandag Sorry for this extreme delay. I was overloaded by my day job and the other open source development stuff. Now I'm back to the Hazelcast community.

I changed the validation line and added some logic to call read_many in _MessageListener._next function. Without this modification, read_many call in the ReliableTopic implementation throws the exception:

HazelcastClient.LifecycleService: INFO: [dev] [hz.client_1] (20200531 - 2df48a6) HazelcastClient is CONNECTED
HazelcastClient: INFO: [dev] [hz.client_1] Client started.
HazelcastClient.AsyncoreReactor: ERROR: [dev] [hz.client_1] Error in Reactor Thread
Traceback (most recent call last):
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/reactor.py", line 46, in _loop
    self._check_timers()
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/reactor.py", line 66, in _check_timers
    if timer.check_timer(now):
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/reactor.py", line 277, in check_timer
    self.timer_ended_cb()
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/proxy/reliable_topic.py", line 211, in _read_many
    future = self._proxy.ringbuffer.read_many(self._sequence, 1, self._proxy.config.read_batch_size)
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/proxy/ringbuffer.py", line 160, in read_many
    check_true(max_count <= self.capacity().result(), "max count should be smaller or equal to capacity")
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/future.py", line 58, in result
    self._reactor_check()
  File "/home/buraksezer/Projects/python/hazelcast-python-client/hazelcast/future.py", line 70, in _reactor_check
    "Synchronous result for incomplete operation must not be called from Reactor thread. "
RuntimeError: Synchronous result for incomplete operation must not be called from Reactor thread. Use add_done_callback instead.

It seems that the corrected validation (max, instead of min) breaks some tests in ringbuffer implementation. I'm not sure that how add_done_callback affects the overall health of the RingBuffer implementation and its current users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem at all, we are happy to see you back.

I checked the line and the error. I also played with the version that has no capacity check but it seems there is no check on the server-side. So, when you have a ring buffer with 10 elements capacity, if you make a read_many request with min_count larger than that, the server never sends back a response (waits to get at least min_count items even if it is higher than its capacity).

So, I guess we have to have some kind of capacity check to fail on bad user requests. I checked your implementation but it seems that even if the capacity check fails, the future returned to the user is not resolved with that failure. I tweaked it a little bit to do that. What do you think about it? I did some tests and it seems it is working.

        check_not_negative(start_sequence, "sequence can't be smaller than 0")
        check_true(max_count >= min_count, "max count should be greater or equal to min count")
        check_true(max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE)

        future = Future()

        def set_result(result):
            try:
                future.set_result(result.result())
            except:
                future.set_exception(sys.exc_info()[1], sys.exc_info()[2])

        def check_capacity(capacity):
            try:
                capacity = capacity.result()
                check_true(min_count <= capacity, "min count: %d should be smaller or equal to capacity: %d"
                           % (min_count, capacity))
                f = self._encode_invoke(ringbuffer_read_many_codec, response_handler=self._read_many_response_handler,
                                        start_sequence=start_sequence, min_count=min_count,
                                        max_count=max_count, filter=None)
                f.add_done_callback(set_result)
            except:
                future.set_exception(sys.exc_info()[1], sys.exc_info()[2])

        self.capacity().add_done_callback(check_capacity)

        return future

@asimarslan
Copy link
Contributor

@mdumandag
I think this PR should be sent to 3.12.z (too)
master branch is for 4.x client development.
The contents are same for now but from a PM point of view we need it on 3.12.z branch.

Copy link
Contributor

@mdumandag mdumandag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fixes @buraksezer . I left couple of small comments for your fixes.

def _handle_stale_sequence_error(self):
def on_response(res):
head_seq = res.result()
if self._listener.is_loss_tolerant:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should call the function. if self._listener.is_loss_tolerant: should be if self._listener.is_loss_tolerant():


def tearDown(self):
if self.registration_id is not None:
self.reliable_topic.remove_listener(self.registration_id)
Copy link
Contributor

@mdumandag mdumandag Jun 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you shutdown self.client after this line (not in the same block) ?

event = collector.events[0]
self.assertEqual(event.message, "aa")

self.assertTrueEventually(assert_event, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can shutdown the client_two by putting this line in try block and performing shutdown in the finally block.


def setUp(self):
config = ClientConfig()
config.set_property("hazelcast.serialization.input.returns.bytearray", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned before, you can remove this line after performing the small fix I mentioned here #206 (comment) (item 1)

@yuce
Copy link
Contributor

yuce commented Feb 3, 2021

Hi @buraksezer, I hope everything's going great with you.

Thanks for your contribution! Unfortunately this PR fell out of sync with our main branch before the reviews were addressed. Would it be possible for you to resolve the conflicts and address the reviews?

@buraksezer
Copy link
Contributor Author

Hi @yuce

I'm so sorry about that but I have no free time to complete the work.

@yuce
Copy link
Contributor

yuce commented Feb 8, 2021

@buraksezer No problem, thanks for letting us now. Good luck!

@mdumandag
Copy link
Contributor

Closing this in favor of #395. @buraksezer thanks again for your efforts. We have used your commits as a base for the implementation.

@mdumandag mdumandag closed this Apr 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants