Skip to content

Commit

Permalink
Hotfix: Use new JdbcStressTest class (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Feb 2, 2021
1 parent 3316d38 commit 1fd5bd2
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceStressTest;
import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.Optional;
import java.util.Set;
Expand All @@ -51,7 +51,7 @@
* in source-postgres.
*/
@Disabled
class DefaultJdbcSourceStressTest extends JdbcSourceStressTest {
class DefaultJdbcStressTest extends JdbcStressTest {

private static PostgreSQLContainer<?> PSQL_DB;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
Expand Down Expand Up @@ -139,13 +140,31 @@ public void setup() throws Exception {

}

// todo (cgardens) - restructure these tests so that testFullRefresh() and testIncremental() can be
// separate tests. current constrained by only wanting to setup the fixture in the database once,
// but it is not trivial to move them to @BeforeAll because it is static and we are doing
// inheritance. Not impossible, just needs to be done thoughtfully and for all JdbcSources.
@Test
public void stressTest() throws Exception {
final Stream<AirbyteMessage> read = source.read(config, getConfiguredCatalog(), Jsons.jsonNode(Collections.emptyMap()));
testFullRefresh();
testIncremental();
}

private void testFullRefresh() throws Exception {
runTest(getConfiguredCatalogFullRefresh(), "full_refresh");
}

private void testIncremental() throws Exception {
runTest(getConfiguredCatalogIncremental(), "incremental");
}

private void runTest(ConfiguredAirbyteCatalog configuredCatalog, String testName) throws Exception {
LOGGER.info("running stress test for: " + testName);
final Stream<AirbyteMessage> read = source.read(config, configuredCatalog, Jsons.jsonNode(Collections.emptyMap()));
final long actualCount = read
.filter(m -> m.getType() == Type.RECORD)
.peek(m -> {
if (m.getRecord().getData().get("id").asLong() % 10000 == 0) {
if (m.getRecord().getData().get("id").asLong() % 100000 == 0) {
LOGGER.info("reading batch: " + m.getRecord().getData().get("id").asLong() / 1000);
}
})
Expand All @@ -155,8 +174,8 @@ public void stressTest() throws Exception {
final long expectedRoundedRecordsCount = TOTAL_RECORDS - TOTAL_RECORDS % 1000;
LOGGER.info("expected records count: " + TOTAL_RECORDS);
LOGGER.info("actual records count: " + actualCount);
assertEquals(expectedRoundedRecordsCount, actualCount);
assertEquals(expectedRoundedRecordsCount, bitSet.cardinality());
assertEquals(expectedRoundedRecordsCount, actualCount, "testing: " + testName);
assertEquals(expectedRoundedRecordsCount, bitSet.cardinality(), "testing: " + testName);
}

// each is roughly 106 bytes.
Expand All @@ -171,10 +190,17 @@ private void assertExpectedMessage(AirbyteMessage actualMessage) {
assertEquals(expectedMessage, actualMessage);
}

private static ConfiguredAirbyteCatalog getConfiguredCatalog() {
private static ConfiguredAirbyteCatalog getConfiguredCatalogFullRefresh() {
return CatalogHelpers.toDefaultConfiguredCatalog(getCatalog());
}

private static ConfiguredAirbyteCatalog getConfiguredCatalogIncremental() {
return new ConfiguredAirbyteCatalog()
.withStreams(Collections.singletonList(new ConfiguredAirbyteStream().withStream(getCatalog().getStreams().get(0))
.withCursorField(Collections.singletonList("id"))
.withSyncMode(SyncMode.INCREMENTAL)));
}

private static AirbyteCatalog getCatalog() {
return new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream(
streamName,
Expand Down

0 comments on commit 1fd5bd2

Please sign in to comment.