Skip to content

Commit

Permalink
mysql-cdc:increase duration for records from debezium from 5 seconds …
Browse files Browse the repository at this point in the history
…to 5 minutes (#3757)

* mysql-cdc:increase duration for records from debezium from 5 seconds to 5 minutes

* upgrade docker version
  • Loading branch information
subodh1810 committed Jun 1, 2021
1 parent 1fed4b0 commit 9179a65
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.3.1",
"dockerImageTag": "0.3.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

private static final TimeUnit SLEEP_TIME_UNIT = TimeUnit.SECONDS;
/**
* This is not private and final because we need to override in tests otherwise each test would
* continue to run for 5 minutes
*/
static TimeUnit sleepTimeUnit = TimeUnit.MINUTES;
private static final int SLEEP_TIME_AMOUNT = 5;

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
Expand All @@ -79,7 +83,7 @@ protected ChangeEvent<String, String> computeNext() {
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
next = queue.poll(SLEEP_TIME_AMOUNT, SLEEP_TIME_UNIT);
next = queue.poll(SLEEP_TIME_AMOUNT, sleepTimeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;

Expand Down Expand Up @@ -108,6 +109,7 @@ protected List<String> getRegexTests() {

@Override
protected void setup(TestDestinationEnv testEnv) {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -131,6 +132,7 @@ public void setup() {
}

private void init() {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();
source = new MySqlSource();
Expand Down

0 comments on commit 9179a65

Please sign in to comment.