diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index b4657055a9db1..7a12538826c07 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.3.2", + "dockerImageTag": "0.3.3", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 935aedbd3d7df..e43e0acc3b1c6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -66,7 +66,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.3.2 + dockerImageTag: 0.3.3 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 4e540d1ed476b..72763700d6b18 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.2 +LABEL io.airbyte.version=0.3.3 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java index 973b6ca6cff75..d7ec84d0bc1f6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java @@ -53,17 +53,14 @@ public class DebeziumRecordIterator extends AbstractIterator> queue; private final Optional targetFilePosition; private final Supplier publisherStatusSupplier; private final VoidCallable requestClose; + private boolean receivedFirstRecord; public DebeziumRecordIterator(LinkedBlockingQueue> queue, Optional targetFilePosition, @@ -73,6 +70,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue> q this.targetFilePosition = targetFilePosition; this.publisherStatusSupplier = publisherStatusSupplier; this.requestClose = requestClose; + this.receivedFirstRecord = false; } @Override @@ -83,7 +81,8 @@ protected ChangeEvent computeNext() { while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { final ChangeEvent next; try { - next = queue.poll(SLEEP_TIME_AMOUNT, sleepTimeUnit); + WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES; + next = queue.poll(waitTime.period, waitTime.timeUnit); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -100,7 +99,7 @@ protected ChangeEvent computeNext() { if (shouldSignalClose(next)) { requestClose(); } - + receivedFirstRecord = true; return next; } return endOfData(); @@ -146,4 +145,16 @@ enum SnapshotMetadata { LAST } + private static class WaitTime { + + public final int period; + public final TimeUnit timeUnit; + + public WaitTime(int period, TimeUnit timeUnit) { + this.period = period; + this.timeUnit = timeUnit; + } + + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java index 0ab2b6fae1ec2..d5bdee740f220 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java @@ -42,7 +42,6 @@ import io.airbyte.protocol.models.SyncMode; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import org.jooq.SQLDialect; import org.testcontainers.containers.MySQLContainer; @@ -109,7 +108,6 @@ protected List getRegexTests() { @Override protected void setup(TestDestinationEnv testEnv) { - DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS; container = new MySQLContainer<>("mysql:8.0"); container.start(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java index 4bd6576788ad7..28982bb462f1b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java @@ -68,7 +68,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.jooq.SQLDialect; @@ -132,7 +131,6 @@ public void setup() { } private void init() { - DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS; container = new MySQLContainer<>("mysql:8.0"); container.start(); source = new MySqlSource();