-
Notifications
You must be signed in to change notification settings - Fork 219
/
DorisStreamLoad.java
296 lines (272 loc) · 12.3 KB
/
DorisStreamLoad.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import static org.apache.doris.flink.sink.LoadStatus.FAIL;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
/**
* load data to doris.
**/
public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final String labelSuffix;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
private String loadUrlStr;
private String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
private final String db;
private final String table;
private final Properties streamLoadProp;
private final RecordStream recordStream;
private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
public DorisStreamLoad(String hostPort,
DorisOptions dorisOptions,
DorisExecutionOptions executionOptions,
String labelSuffix,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
this.user = dorisOptions.getUsername();
this.passwd = dorisOptions.getPassword();
this.labelSuffix = labelSuffix;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.httpClient = httpClient;
this.executorService = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload"));
this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount());
lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes();
loadBatchFirstRecord = true;
}
public String getDb() {
return db;
}
public String getHostPort() {
return hostPort;
}
public void setHostPort(String hostPort) {
this.hostPort = hostPort;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table);
}
public Future<CloseableHttpResponse> getPendingLoadFuture() {
return pendingLoadFuture;
}
/**
* try to discard pending transactions with labels beginning with labelSuffix.
* @param labelSuffix
* @param chkID
* @throws Exception
*/
public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
long startChkID = chkID;
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
String label = labelSuffix + "_" + startChkID;
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.enable2PC()
.setLabel(label)
.setEmptyEntity()
.addProperties(streamLoadProp);
RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build()));
Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
// label already exist and job finished
if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " +
"change you label prefix or restore from latest savepoint!");
}
// job not finished, abort.
Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
if (matcher.find()) {
Preconditions.checkState(label.equals(matcher.group(1)));
long txnId = Long.parseLong(matcher.group(2));
LOG.info("abort {} for exist label {}", txnId, label);
abortTransaction(txnId);
} else {
LOG.error("response: {}", respContent.toString());
throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!");
}
} else {
LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
abortTransaction(respContent.getTxnId());
break;
}
startChkID++;
} catch (Exception e) {
LOG.warn("failed to stream load data", e);
throw e;
}
}
LOG.info("abort for labelSuffix {} finished", labelSuffix);
}
/**
* write record into stream.
* @param record
* @throws IOException
*/
public void writeRecord(byte[] record) throws IOException{
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else {
recordStream.write(lineDelimiter);
}
recordStream.write(record);
}
@VisibleForTesting
public RecordStream getRecordStream() {
return recordStream;
}
public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception{
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
return OBJECT_MAPPER.readValue(loadResult, RespContent.class);
}
throw new StreamLoadException("stream load error: " + response.getStatusLine().toString());
}
public RespContent stopLoad() throws IOException{
recordStream.endInput();
LOG.info("stream load stopped.");
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
} catch (Exception e) {
throw new DorisRuntimeException(e);
}
}
/**
* start write data for new checkpoint.
* @param chkID
* @throws IOException
*/
public void startLoad(long chkID) throws IOException{
loadBatchFirstRecord = true;
String label = labelSuffix + "_" + chkID;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput();
LOG.info("stream load started for {}", label);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.enable2PC()
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
pendingLoadFuture = executorService.submit(() -> {
LOG.info("start execute load");
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
LOG.warn(err, e);
throw e;
}
}
private void abortTransaction(long txnID) throws Exception {
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(abortUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addTxnId(txnID)
.setEmptyEntity()
.abort();
CloseableHttpResponse response = httpClient.execute(builder.build());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
LOG.warn("abort transaction response: " + response.getStatusLine().toString());
throw new DorisRuntimeException("Fail to abort transaction " + txnID + " with url " + abortUrlStr);
}
ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>(){});
if (FAIL.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisException("try abort committed transaction, " +
"do you recover from old savepoint?");
}
LOG.warn("Fail to abort transaction. error: {}", res.get("msg"));
}
}
public void close() throws IOException {
if (null != httpClient) {
try {
httpClient.close();
} catch (IOException e) {
throw new IOException("Closing httpClient failed.", e);
}
}
if (null != executorService) {
executorService.shutdownNow();
}
}
}