forked from debezium/debezium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BaseSourceTask.java
328 lines (277 loc) · 11.1 KB
/
BaseSourceTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.common;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.SingleThreadAccess;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
/**
* Base class for Debezium's CDC {@link SourceTask} implementations. Provides functionality common to all connectors,
* such as validation of the configuration.
*
* @author Gunnar Morling
*/
public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext> extends SourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5));
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
protected static enum State {
RUNNING,
STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
/**
* Used to ensure that start(), stop() and commitRecord() calls are serialized.
*/
private final ReentrantLock stateLock = new ReentrantLock();
private volatile ElapsedTimeStrategy restartDelay;
/**
* Raw connector properties, kept here so they can be passed again in case of a restart.
*/
private volatile Map<String, String> props;
/**
* The change event source coordinator for those connectors adhering to the new
* framework structure, {@code null} for legacy-style connectors.
*/
private ChangeEventSourceCoordinator<P, O> coordinator;
/**
* The latest offset that has been acknowledged by the Kafka producer. Will be
* acknowledged with the source database in {@link BaseSourceTask#commit()}
* (which may be a no-op depending on the connector).
*/
private volatile Map<String, ?> lastOffset;
private Duration retriableRestartWait;
private final ElapsedTimeStrategy pollOutputDelay;
private final Clock clock = Clock.system();
@SingleThreadAccess("polling thread")
private Instant previousOutputInstant;
@SingleThreadAccess("polling thread")
private int previousOutputBatchSize;
protected BaseSourceTask() {
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
// Initial our poll output delay logic ...
pollOutputDelay.hasElapsed();
previousOutputInstant = clock.currentTimeAsInstant();
}
@Override
public final void start(Map<String, String> props) {
if (context == null) {
throw new ConnectException("Unexpected null context");
}
stateLock.lock();
try {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
this.props = props;
Configuration config = Configuration.from(props);
retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
// need to reset the delay or you only get one delayed restart
restartDelay = null;
if (!config.validateAndRecord(getAllConfigurationFields(), LOGGER::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Starting {} with configuration:", getClass().getSimpleName());
config.withMaskedPasswords().forEach((propName, propValue) -> {
LOGGER.info(" {} = {}", propName, propValue);
});
}
this.coordinator = start(config);
}
finally {
stateLock.unlock();
}
}
/**
* Called once when starting this source task.
*
* @param config
* the task configuration; implementations should wrap it in a dedicated implementation of
* {@link CommonConnectorConfig} and work with typed access to configuration properties that way
*/
protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration config);
@Override
public final List<SourceRecord> poll() throws InterruptedException {
boolean started = startIfNeededAndPossible();
// in backoff period after a retriable exception
if (!started) {
// WorkerSourceTask calls us immediately after we return the empty list.
// This turns into a throttling so we need to make a pause before we return
// the control back.
Metronome.parker(Duration.of(2, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
return Collections.emptyList();
}
try {
final List<SourceRecord> records = doPoll();
logStatistics(records);
return records;
}
catch (RetriableException e) {
stop(true);
throw e;
}
}
void logStatistics(final List<SourceRecord> records) {
if (records == null || !LOGGER.isInfoEnabled()) {
return;
}
int batchSize = records.size();
if (batchSize > 0) {
SourceRecord lastRecord = records.get(batchSize - 1);
lastOffset = lastRecord.sourceOffset();
previousOutputBatchSize += batchSize;
if (pollOutputDelay.hasElapsed()) {
// We want to record the status ...
final Instant currentTime = clock.currentTime();
LOGGER.info("{} records sent during previous {}, last recorded offset: {}", previousOutputBatchSize,
Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastOffset);
previousOutputInstant = currentTime;
previousOutputBatchSize = 0;
}
}
}
/**
* Returns the next batch of source records, if any are available.
*/
protected abstract List<SourceRecord> doPoll() throws InterruptedException;
/**
* Starts this connector in case it has been stopped after a retriable error,
* and the backoff period has passed.
*/
private boolean startIfNeededAndPossible() {
stateLock.lock();
try {
if (state.get() == State.RUNNING) {
return true;
}
else if (restartDelay != null && restartDelay.hasElapsed()) {
start(props);
return true;
}
else {
LOGGER.info("Awaiting end of restart backoff period after a retriable error");
return false;
}
}
finally {
stateLock.unlock();
}
}
@Override
public final void stop() {
stop(false);
}
private void stop(boolean restart) {
stateLock.lock();
try {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
if (restart) {
LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds());
}
else {
LOGGER.info("Stopping down connector");
}
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
doStop();
if (restart && restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
}
}
finally {
stateLock.unlock();
}
}
protected abstract void doStop();
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
Map<String, ?> currentOffset = record.sourceOffset();
if (currentOffset != null) {
this.lastOffset = currentOffset;
}
}
@Override
public void commit() throws InterruptedException {
boolean locked = stateLock.tryLock();
if (locked) {
try {
if (coordinator != null && lastOffset != null) {
coordinator.commitOffset(lastOffset);
}
}
finally {
stateLock.unlock();
}
}
else {
LOGGER.warn("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
}
}
/**
* Returns all configuration {@link Field} supported by this source task.
*/
protected abstract Iterable<Field> getAllConfigurationFields();
/**
* Loads the connector's persistent offsets (if present) via the given loader.
*/
protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
Set<P> partitions = provider.getPartitions();
OffsetReader<P, O, OffsetContext.Loader<O>> reader = new OffsetReader<>(
context.offsetStorageReader(), loader);
Map<P, O> offsets = reader.offsets(partitions);
boolean found = false;
for (P partition : partitions) {
O offset = offsets.get(partition);
if (offset != null) {
found = true;
LOGGER.info("Found previous partition offset {}: {}", partition, offset.getOffset());
}
}
if (!found) {
LOGGER.info("No previous offsets found");
}
return Offsets.of(offsets);
}
}