Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: grpc timeout segment data loss #116

Merged
merged 1 commit into from Jan 29, 2021
Merged

Conversation

tom-pytel
Copy link
Contributor

@tom-pytel tom-pytel commented Jan 27, 2021

Please review this thoroughly because all I wanted to do was resend segments which failed to send due to a grpc timeout. But unexpectedly the behavior of grpc timeout changed, it doesn't time out anymore, the heartbeat seems to keep it alive now. I don't understand why this behavior changed so that is why I am requesting a thorough looking at this.

@wu-sheng
Copy link
Member

wu-sheng commented Jan 27, 2021

From my understanding, the timeout should make the segments abandoned with keeping the agent memory cost as less as possible.
This is not specific for python agent, it is recommended, implemented, and kept in all agents. The users will never know how to determine which trace segments should be kept, and others are not in the runtime.
I am not checking the codes, but it should be in this way. @kezhenxu94

@tom-pytel
Copy link
Contributor Author

tom-pytel commented Jan 27, 2021

From my understanding, the timeout should make the segments abandoned with keeping the agent memory as much as possible.
This is not specific for the agent, it is recommended, implemented, and kept in all agents. The users will never know how to determine which trace segments should be kept, and others are not in the runtime.
I am not checking the codes, but it should be in this way. @kezhenxu94

Well there is already a mechanism to reject segments if the queue is full, in agent/__init__.py:archive() the queue is explicitly checked and segment is rejected if full. All I am trying to do is if queue is not full to not lose the segments that have already made it into the queue.

@wu-sheng
Copy link
Member

OK, this is another case, but also a tricky case in the product environment.
The queue could be full or closer to be full because of this mechanism, then new segments will be injected.
Which segments are more important to keep?😃

@tom-pytel
Copy link
Contributor Author

tom-pytel commented Jan 27, 2021

OK, this is another case, but also a tricky case in the product environment.
The queue could be full or closer to be full because of this mechanism, then new segments will be injected.
Which segments are more important to keep?

If you want the new segments to take precedence then that archive() call could pop the oldest segment off the queue if it is full to make space for the new one, something like the following but with all the proper error checking:

def archive(segment: 'Segment'):
    if __queue.full():
        __queue.get()
        logger.warning('the queue is full, the oldest segment was abandoned')

    __queue.put(segment)

The benefit here being that if you are not running the queue full then you will not lose segments at all.

@wu-sheng
Copy link
Member

I know we could dequeue, but still, highly recommend you to take a look at Satellite project. It provides a much bettet way to resolve this case by leveraging file system.

@wu-sheng
Copy link
Member

@tom-pytel
Copy link
Contributor Author

I know we could dequeue, but still, highly recommend you to take a look at Satellite project. It provides a much bettet way to resolve this case by leveraging file system.

Well, we will probably eventually be looking at that but for now it is overkill for the small problem I am tasked with which is just not to drop sporadic segment data on silly timeouts. If you don't want this fix in your codebase its no problem I will just wind up doing it in our own fork then. So do you want me to add the dequeueing and keep this or just do on our own side?

Apart from that, like I said, I still don't understand why the grpc connection stopped timing out. The old code would time out constantly regardless of the heartbeat and this code works more like I expect was the intention which is that it does not actually time out as long as the server is up. Would be good to have an explanation for this since even if I don't add this fix to your codebase you could limit data losses just by fixing the constant timeout problem.

@wu-sheng
Copy link
Member

I know we could dequeue, but still, highly recommend you to take a look at Satellite project. It provides a much bettet way to resolve this case by leveraging file system.

Well, we will probably eventually be looking at that but for now it is overkill for the small problem I am tasked with which is just not to drop sporadic segment data on silly timeouts. If you don't want this fix in your codebase its no problem I will just wind up doing it in our own fork then. So do you want me to add the dequeueing and keep this or just do on our own side?

Apart from that, like I said, I still don't understand why the grpc connection stopped timing out. The old code would time out constantly regardless of the heartbeat and this code works more like I expect was the intention which is that it does not actually time out as long as the server is up. Would be good to have an explanation for this since even if I don't add this fix to your codebase you could limit data losses just by fixing the constant timeout problem.

I will leave the decision to @kezhenxu94 and you, once the memory cost is limited as much as the queue side, I am good for either re-push segment into the queue if there is available slot, or expire the oldest data in the queue.
I agree that gRPC connection should be back to available if the OAP is back, also, please check the case that, there is a case, the connection may not restore based on gRPC reconnection mechanism, according to Java

If you set the backend address by a domain name, which should be resolved by a DNS server, rather than local HOST file, then rebooting OAP(or a new pod in k8s), actually IP changed, but same domain name. At this case, the client side is required to re-resolve the domain addresses.

@tom-pytel
Copy link
Contributor Author

I will leave the decision to @kezhenxu94 and you, once the memory cost is limited as much as the queue side, I am good for either re-push segment into the queue if there is available slot, or expire the oldest data in the queue.
I agree that gRPC connection should be back to available if the OAP is back, also, please check the case that, there is a case, the connection may not restore based on gRPC reconnection mechanism, according to Java

If you set the backend address by a domain name, which should be resolved by a DNS server, rather than local HOST file, then rebooting OAP(or a new pod in k8s), actually IP changed, but same domain name. At this case, the client side is required to re-resolve the domain addresses.

Ok, well I am also curious for @kezhenxu94's opinion. Specifically I haven't looked at how this grpc works under the hood so maybe he can tell me if this new code isn't doing something incredibly stupid like creating a new connection for each segment...

@kezhenxu94
Copy link
Member

@tom-pytel I'm +1 to put the segments back to the queue when the it is not full and abandon the oldest segments if it's full.

As for the timeout issue, I had a quick glance of the codes and guess (don't have too much time ATM to look thoroughly and test myself) the reason may be the changes from self.traces_reporter.report(generator()) to self.traces_reporter.report(iter([s])), the former generator() was blocking the self.traces_reporter.report and may cause timeout, while the latter one doesn't.

@kezhenxu94
Copy link
Member

On the other hand, it's not that different to abandon the oldest segments or the latest segments when the queue is full, maybe abandon the latest segments so that we don't need to get and put over and over again when there are too many traffics?

@kezhenxu94
Copy link
Member

As for the timeout issue, I had a quick glance of the codes and guess (don't have too much time ATM to look thoroughly and test myself) the reason may be the changes from self.traces_reporter.report(generator()) to self.traces_reporter.report(iter([s])), the former generator() was blocking the self.traces_reporter.report and may cause timeout, while the latter one doesn't.

OK just did a quick experiment, seems to be the case, I just changed a shorter timeout intentionally and the codes before this PR throws timeout exception constantly, while those after this PR don't.

diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 734d3bb..81de589 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -55,4 +55,4 @@ class GrpcTraceSegmentReportService(TraceSegmentReportService):
         self.report_stub = TraceSegmentReportServiceStub(channel)
 
     def report(self, generator):
-        self.report_stub.collect(generator)
+        self.report_stub.collect(generator, timeout=3)
skywalking [Thread-21] [DEBUG] grpc channel connectivity changed, [ChannelConnectivity.READY -> ChannelConnectivity.READY]
Traceback (most recent call last):
  File "/Users/kezhenxu94/workspace/skywalking-python/venv/lib/python3.8/site-packages/apache_skywalking-0.5.0-py3.8.egg/skywalking/agent/protocol/grpc.py", line 122, in report
    self.traces_reporter.report(generator())
  File "/Users/kezhenxu94/workspace/skywalking-python/venv/lib/python3.8/site-packages/apache_skywalking-0.5.0-py3.8.egg/skywalking/client/grpc.py", line 58, in report
    self.report_stub.collect(generator, timeout=3)
  File "/Users/kezhenxu94/workspace/skywalking-python/venv/lib/python3.8/site-packages/grpc/_channel.py", line 1011, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/Users/kezhenxu94/workspace/skywalking-python/venv/lib/python3.8/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "{"created":"@1611829792.286230000","description":"Error received from peer ipv4:127.0.0.1:11800","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>

@tom-pytel
Copy link
Contributor Author

On the other hand, it's not that different to abandon the oldest segments or the latest segments when the queue is full, maybe abandon the latest segments so that we don't need to get and put over and over again when there are too many traffics?

This can be changed very easily in code so you could leave it as it is and change if needed at some point. It could even be configurable at runtime if you want.

@tom-pytel
Copy link
Contributor Author

Now for some thoughts. I changed from a continuous send to send one segment at a time because with a continuous send I could not be guaranteed to know where the error happened and would not know which segment(s) to resend. For example does the grpc go one by one pulling then sending to stream or does it build up blocks of segments before trying to stream them? How efficient is it to send one segment at a time on a client stream function? What is the overhead of setting up a stream per-segment? Do you have a single unitary segment send function defined in your protocol which could do this more efficiently or is it only streams?

@kezhenxu94
Copy link
Member

I could not be guaranteed to know where the error happened and would not know which segment(s) to resend.

The streaming itself returns a generator of responses so that may be a clue (not sure).

Do you have a single unitary segment send function defined in your protocol which could do this more efficiently or is it only streams?

I think we only provide stream for this case (segment).

@tom-pytel
Copy link
Contributor Author

I could not be guaranteed to know where the error happened and would not know which segment(s) to resend.

The streaming itself returns a generator of responses so that may be a clue (not sure).

Ok, well then don't merge until I dig a little deeper to see if it can be made more efficient.

Do you have a single unitary segment send function defined in your protocol which could do this more efficiently or is it only streams?

I think we only provide stream for this case (segment).

Any thoughts about introducing one in the future?

@tom-pytel
Copy link
Contributor Author

Ok, so I was uncomfortable with the idea of sending segments one by one using a streaming call so did a different way. There are now two mechanisms to prevent data loss here.

  • Added a queue.get() timeout so that report() will not block forever also setting a collect() timeout higher than this to make sure report() returns before a collect() timeout can possibly happen.
  • Even if a timeout does happen somehow, the last segment processed is pushed back onto the queue for another attempt when the connection is reestablished (I checked that it does seem to be processed one segment at a time and not in blocks).

@kezhenxu94 kezhenxu94 added the enhancement New feature or request label Jan 29, 2021
@kezhenxu94 kezhenxu94 added this to the 0.6.0 milestone Jan 29, 2021
@kezhenxu94 kezhenxu94 merged commit 6ed3043 into apache:master Jan 29, 2021
@kezhenxu94
Copy link
Member

Ok, so I was uncomfortable with the idea of sending segments one by one using a streaming call so did a different way. There are now two mechanisms to prevent data loss here.

  • Added a queue.get() timeout so that report() will not block forever also setting a collect() timeout higher than this to make sure report() returns before a collect() timeout can possibly happen.
  • Even if a timeout does happen somehow, the last segment processed is pushed back onto the queue for another attempt when the connection is reestablished (I checked that it does seem to be processed one segment at a time and not in blocks).

This looks better than the previous method. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants