/
RemoteStreamPusher.java
137 lines (122 loc) · 5.48 KB
/
RemoteStreamPusher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.transport.stream.impl;
import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.StreamResponseException;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream.StreamId;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorCode;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponseDecoder;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A naive implementation to push jobs out, which performs no retries of any kind, but reports
* errors on failure.
*
* @param <P> the payload type to be pushed out
*/
final class RemoteStreamPusher<P extends BufferWriter> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamPusher.class);
private final StreamResponseDecoder responseDecoder = new StreamResponseDecoder();
private final ThrottledLogger pushErrorLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5));
private final ThrottledLogger pushWarnLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5));
private final RemoteStreamMetrics metrics;
private final Transport transport;
private final Executor executor;
RemoteStreamPusher(
final Transport transport, final Executor executor, final RemoteStreamMetrics metrics) {
this.metrics = Objects.requireNonNull(metrics, "must specify remote stream metrics");
this.transport = Objects.requireNonNull(transport, "must provide a network transport");
this.executor = Objects.requireNonNull(executor, "must provide an asynchronous executor");
}
public void pushAsync(
final P payload, final RemoteStreamErrorHandler<P> errorHandler, final StreamId streamId) {
Objects.requireNonNull(payload, "must specify a payload");
Objects.requireNonNull(errorHandler, "must specify a error handler");
executor.execute(
() -> push(payload, instrumentingErrorHandler(errorHandler, streamId), streamId));
}
private RemoteStreamErrorHandler<P> instrumentingErrorHandler(
final RemoteStreamErrorHandler<P> errorHandler, final StreamId streamId) {
return (error, payload) -> {
if (error == null) {
return;
}
if (error instanceof final StreamResponseException e
&& (e.code() == ErrorCode.INVALID || e.code() == ErrorCode.MALFORMED)) {
pushErrorLogger.error(
"Failed to push (size = {}) to stream {}, request could not be parsed",
payload.getLength(),
streamId,
e);
} else {
pushWarnLogger.warn(
"Failed to push (size = {}) to stream {}", payload.getLength(), streamId, error);
}
metrics.pushFailed();
errorHandler.handleError(error, payload);
};
}
private void push(
final P payload, final RemoteStreamErrorHandler<P> errorHandler, final StreamId streamId) {
final var request = new PushStreamRequest().streamId(streamId.streamId()).payload(payload);
try {
transport
.send(request, streamId.receiver())
.whenCompleteAsync(
(response, error) -> onPush(payload, errorHandler, response, error), executor);
LOG.trace("Pushed {} to stream {}", payload, streamId);
} catch (final Exception e) {
errorHandler.handleError(e, payload);
}
}
private void onPush(
final P payload,
final RemoteStreamErrorHandler<P> errorHandler,
final byte[] responseBuffer,
final Throwable error) {
if (error != null) {
errorHandler.handleError(error, payload);
return;
}
responseDecoder
.decode(responseBuffer, new PushStreamResponse())
.mapLeft(ErrorResponse::asException)
.ifRightOrLeft(
ok -> metrics.pushSucceeded(), failure -> errorHandler.handleError(failure, payload));
}
/**
* A small abstraction over the network transport. This allows for better testability, and also
* removes the need for this class to know how communication occurs (e.g. which topic the message
* is sent over)
*/
interface Transport {
/**
* Sends the given request out to the given receiver. May throw errors, e.g. serialization
* errors.
*
* @param request the request to send
* @param receiver the expected target
* @return a future which is completed when the request has been acknowledged by the receiver,
* or an error occurred
* @throws Exception if an error occurs before the request is sent out, i.e. serialization error
*/
CompletableFuture<byte[]> send(final PushStreamRequest request, final MemberId receiver)
throws Exception;
}
}