-
Notifications
You must be signed in to change notification settings - Fork 460
/
MultiLangRecordProcessor.java
276 lines (244 loc) · 10.8 KB
/
MultiLangRecordProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.multilang;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* A record processor that manages creating a child process that implements the multi language protocol and connecting
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
* that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are
* called.
*/
public class MultiLangRecordProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
private static final int EXIT_VALUE = 1;
private String shardId;
private Future<?> stderrReadTask;
private MessageWriter messageWriter;
private MessageReader messageReader;
private DrainChildSTDERRTask readSTDERRTask;
private ProcessBuilder processBuilder;
private Process process;
private ExecutorService executorService;
private ProcessState state;
private ObjectMapper objectMapper;
private MultiLangProtocol protocol;
/**
* Used to tell whether the processor has been shutdown already.
*/
private enum ProcessState {
ACTIVE, SHUTDOWN
}
/**
* Constructor.
*
* @param processBuilder Provides process builder functionality.
* @param executorService An executor
* @param objectMapper An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
}
/**
* Note: This constructor has package level access solely for testing purposes.
*
* @param processBuilder Provides the child process for this record processor
* @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
* @param objectMapper Object mapper
* @param messageWriter Message write to write to child process's stdin
* @param messageReader Message reader to read from child process's stdout
* @param readSTDERRTask Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder,
ExecutorService executorService,
ObjectMapper objectMapper,
MessageWriter messageWriter,
MessageReader messageReader,
DrainChildSTDERRTask readSTDERRTask) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.state = ProcessState.ACTIVE;
}
@Override
public void initialize(String shardIdToProcess) {
try {
this.shardId = shardIdToProcess;
try {
this.process = startProcess();
} catch (IOException e) {
/*
* The process builder has thrown an exception while starting the child process so we would like to shut
* down
*/
stopProcessing("Failed to start client executable", e);
return;
}
// Initialize all of our utility objects that will handle interacting with the process over
// STDIN/STDOUT/STDERR
messageWriter.initialize(process.getOutputStream(), shardId, objectMapper, executorService);
messageReader.initialize(process.getInputStream(), shardId, objectMapper, executorService);
readSTDERRTask.initialize(process.getErrorStream(), shardId, "Reading STDERR for " + shardId);
// Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask);
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process");
}
} catch (Throwable t) {
stopProcessing("Encountered an error while trying to initialize record processor", t);
}
}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
try {
if (!protocol.processRecords(records, checkpointer)) {
throw new RuntimeException("Child process failed to process records");
}
} catch (Throwable t) {
stopProcessing("Encountered an error while trying to process records", t);
}
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
try {
if (ProcessState.ACTIVE.equals(this.state)) {
if (!protocol.shutdown(checkpointer, reason)) {
throw new RuntimeException("Child process failed to shutdown");
}
childProcessShutdownSequence();
} else {
LOG.warn("Shutdown was called but this processor is already shutdown. Not doing anything.");
}
} catch (Throwable t) {
if (ProcessState.ACTIVE.equals(this.state)) {
stopProcessing("Encountered an error while trying to shutdown child process", t);
} else {
stopProcessing("Encountered an error during shutdown,"
+ " but it appears the processor has already been shutdown", t);
}
}
}
/**
* Performs the necessary shutdown actions for the child process, e.g. stopping all the handlers after they have
* drained their streams. Attempts to wait for child process to completely finish before returning.
*/
private void childProcessShutdownSequence() {
try {
/*
* Close output stream to the child process. The child process should be reading off its stdin until it
* receives EOF, closing the output stream should signal this and allow the child process to terminate. We
* expect it to terminate immediately, but there is the possibility that the child process then begins to
* write to its STDOUT and STDERR.
*/
if (messageWriter.isOpen()) {
messageWriter.close();
}
} catch (IOException e) {
LOG.error("Encountered exception while trying to close output stream.", e);
}
// We should drain the STDOUT and STDERR of the child process. If we don't, the child process might remain
// blocked writing to a full pipe buffer.
safelyWaitOnFuture(messageReader.drainSTDOUT(), "draining STDOUT");
safelyWaitOnFuture(stderrReadTask, "draining STDERR");
safelyCloseInputStream(process.getErrorStream(), "STDERR");
safelyCloseInputStream(process.getInputStream(), "STDOUT");
/*
* By this point the threads handling reading off input streams are done, we do one last thing just to make sure
* we don't leave the child process running. The process is expected to have exited by now, but we still make
* sure that it exits before we finish.
*/
try {
LOG.info("Child process exited with value: " + process.waitFor());
} catch (InterruptedException e) {
LOG.error("Interrupted before process finished exiting. Attempting to kill process.");
process.destroy();
}
state = ProcessState.SHUTDOWN;
}
private void safelyCloseInputStream(InputStream inputStream, String name) {
try {
inputStream.close();
} catch (IOException e) {
LOG.error("Encountered exception while trying to close " + name + " stream.", e);
}
}
/**
* Convenience method used by {@link #childProcessShutdownSequence()} to drain the STDIN and STDERR of the child
* process.
*
* @param future A future to wait on.
* @param whatThisFutureIsDoing What that future is doing while we wait.
*/
private void safelyWaitOnFuture(Future<?> future, String whatThisFutureIsDoing) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Encountered error while " + whatThisFutureIsDoing + " for shard " + shardId, e);
}
}
/**
* Convenience method for logging and safely shutting down so that we don't throw an exception up to the KCL on
* accident.
*
* @param message The reason we are stopping processing.
* @param reason An exception that caused us to want to stop processing.
*/
private void stopProcessing(String message, Throwable reason) {
try {
LOG.error(message, reason);
if (!state.equals(ProcessState.SHUTDOWN)) {
childProcessShutdownSequence();
}
} catch (Throwable t) {
LOG.error("Encountered error while trying to shutdown", t);
}
exit();
}
/**
* We provide a package level method for unit testing this call to exit.
*
* @param val exit value
*/
void exit() {
System.exit(EXIT_VALUE);
}
/**
* The {@link ProcessBuilder} class is final so not easily mocked. We wrap the only interaction we have with it in
* this package level method to permit unit testing.
*
* @return The process started by processBuilder
* @throws IOException If the process can't be started.
*/
Process startProcess() throws IOException {
return this.processBuilder.start();
}
}