Skip to content

Commit

Permalink
fix: grpc timeout segment data loss
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel committed Jan 27, 2021
1 parent 8e9ea9d commit 0bcf3a0
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 54 deletions.
2 changes: 1 addition & 1 deletion skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init():


def __fini():
__protocol.report(__queue, False)
__protocol.report(__queue, block = False, fini = True)
__queue.join()


Expand Down
113 changes: 60 additions & 53 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,65 @@ def on_error(self):
self.channel.unsubscribe(self._cb)
self.channel.subscribe(self._cb, try_to_connect=True)

def report(self, queue: Queue, block: bool = True):
def generator():
while True:
try:
segment = queue.get(block=block) # type: Segment
except Empty:
return
def report(self, queue: Queue, block: bool = True, fini: bool = False):
while True:
try:
segment = queue.get(block=block) # type: Segment
except Empty:
return

queue.task_done()

logger.debug('reporting segment %s', segment)

s = SegmentObject(
traceId=str(segment.related_traces[0]),
traceSegmentId=str(segment.segment_id),
service=config.service_name,
serviceInstance=config.service_instance,
spans=[SpanObject(
spanId=span.sid,
parentSpanId=span.pid,
startTime=span.start_time,
endTime=span.end_time,
operationName=span.op,
peer=span.peer,
spanType=span.kind.name,
spanLayer=span.layer.name,
componentId=span.component.value,
isError=span.error_occurred,
logs=[Log(
time=int(log.timestamp * 1000),
data=[KeyStringValuePair(key=item.key, value=item.val) for item in log.items],
) for log in span.logs],
tags=[KeyStringValuePair(
key=str(tag.key),
value=str(tag.val),
) for tag in span.tags],
refs=[SegmentReference(
refType=0 if ref.ref_type == "CrossProcess" else 1,
traceId=ref.trace_id,
parentTraceSegmentId=ref.segment_id,
parentSpanId=ref.span_id,
parentService=ref.service,
parentServiceInstance=ref.service_instance,
parentEndpoint=ref.endpoint,
networkAddressUsedAtPeer=ref.client_address,
) for ref in span.refs if ref.trace_id],
) for span in segment.spans],
)

logger.debug('reporting segment %s', segment)

s = SegmentObject(
traceId=str(segment.related_traces[0]),
traceSegmentId=str(segment.segment_id),
service=config.service_name,
serviceInstance=config.service_instance,
spans=[SpanObject(
spanId=span.sid,
parentSpanId=span.pid,
startTime=span.start_time,
endTime=span.end_time,
operationName=span.op,
peer=span.peer,
spanType=span.kind.name,
spanLayer=span.layer.name,
componentId=span.component.value,
isError=span.error_occurred,
logs=[Log(
time=int(log.timestamp * 1000),
data=[KeyStringValuePair(key=item.key, value=item.val) for item in log.items],
) for log in span.logs],
tags=[KeyStringValuePair(
key=str(tag.key),
value=str(tag.val),
) for tag in span.tags],
refs=[SegmentReference(
refType=0 if ref.ref_type == "CrossProcess" else 1,
traceId=ref.trace_id,
parentTraceSegmentId=ref.segment_id,
parentSpanId=ref.span_id,
parentService=ref.service,
parentServiceInstance=ref.service_instance,
parentEndpoint=ref.endpoint,
networkAddressUsedAtPeer=ref.client_address,
) for ref in span.refs if ref.trace_id],
) for span in segment.spans],
)

yield s

queue.task_done()
try:
self.traces_reporter.report(iter([s]))

try:
self.traces_reporter.report(generator())
except grpc.RpcError:
self.on_error()
except grpc.RpcError:
self.on_error()
queue.put(segment) # put back segment that failed to send for another attempt

# If not finalizing app then let grpc reconnect and leave this segment send for next update.
# Otherwise, if finalizing, then there will not be a next update so we have to wait for reconnect and
# finish send here. We retry up to 3 times then we give up.
if not fini or fini > 3:
return

fini += 1 # pyre-ignore, yes, True += 1, its Python...

0 comments on commit 0bcf3a0

Please sign in to comment.