Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 fix-mysql-cdc: poll for 5 minutes only when we have not received a single record #3789

Merged
merged 3 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,13 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

/**
* This is not private and final because we need to override in tests otherwise each test would
* continue to run for 5 minutes
*/
static TimeUnit sleepTimeUnit = TimeUnit.MINUTES;
private static final int SLEEP_TIME_AMOUNT = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would 100% trip me up because I would first go: "what is the time unit for this" and then realize it's actually being used as two different constants. Could we instead make it two different constants e.g:

private static final int FIRST_RECORD_WAIT_TIME_MINUTES = 5;
private static final int SUBESQUENT_RECORD_WAIT_TIME_SECONDS = 5;


private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final Optional<TargetFilePosition> targetFilePosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
Optional<TargetFilePosition> targetFilePosition,
Expand All @@ -73,6 +69,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.targetFilePosition = targetFilePosition;
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
}

@Override
Expand All @@ -83,7 +80,8 @@ protected ChangeEvent<String, String> computeNext() {
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
next = queue.poll(SLEEP_TIME_AMOUNT, sleepTimeUnit);
TimeUnit timeUnit = receivedFirstRecord ? TimeUnit.SECONDS : TimeUnit.MINUTES;
next = queue.poll(SLEEP_TIME_AMOUNT, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -100,7 +98,7 @@ protected ChangeEvent<String, String> computeNext() {
if (shouldSignalClose(next)) {
requestClose();
}

receivedFirstRecord = true;
return next;
}
return endOfData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;

Expand Down Expand Up @@ -109,7 +108,6 @@ protected List<String> getRegexTests() {

@Override
protected void setup(TestDestinationEnv testEnv) {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -132,7 +131,6 @@ public void setup() {
}

private void init() {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();
source = new MySqlSource();
Expand Down