Skip to content

Commit

Permalink
Revert integration test back to the old behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Feb 2, 2023
1 parent 9bdcd69 commit c0eedfe
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,6 @@ public List<JsonNode> retrieveSourceRecords(final Database database, final Strin
return database.query(context -> context.fetch(String.format("SELECT * FROM %s;", table)))
.stream()
.map(Record::intoMap)
.map(rec -> {
// The protocol requires converting numbers to strings. source-postgres does that internally,
// but we're querying the DB directly, so we have to do it manually.
final Map<String, Object> stringifiedNumbers = new HashMap<>();
for (final String key : rec.keySet()) {
Object o = rec.get(key);
if (o instanceof Number) {
o = o.toString();
}
stringifiedNumbers.put(key, o);
}
return stringifiedNumbers;
})
.map(Jsons::jsonNode)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.DataType;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdWithWorkspaceId;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
Expand Down Expand Up @@ -319,8 +320,8 @@ void testDiscoverSourceSchema() throws ApiException {
final AirbyteCatalog actual = testHarness.discoverSourceSchema(sourceId);

final Map<String, Map<String, String>> fields = ImmutableMap.of(
COLUMN_ID, ImmutableMap.of(REF, INTEGER_REFERENCE),
COLUMN_NAME, ImmutableMap.of(REF, STRING_REFERENCE));
COLUMN_ID, ImmutableMap.of(TYPE, DataType.NUMBER.getValue(), "airbyte_type", "integer"),
COLUMN_NAME, ImmutableMap.of(TYPE, DataType.STRING.getValue()));
final JsonNode jsonSchema = Jsons.jsonNode(ImmutableMap.builder()
.put(TYPE, "object")
.put("properties", fields)
Expand Down Expand Up @@ -573,8 +574,8 @@ void testIncrementalDedupeSync() throws Exception {
// add new records and run again.
final Database source = testHarness.getSourceDatabase();
final List<JsonNode> expectedRawRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "sherif").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "7").put(COLUMN_NAME, "chris").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "sherif").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).put(COLUMN_NAME, "chris").build()));
source.query(ctx -> ctx.execute("UPDATE id_and_name SET id=6 WHERE name='sherif'"));
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')"));
// retrieve latest snapshot of source records after modifications; the deduplicated table in
Expand Down Expand Up @@ -627,7 +628,7 @@ void testIncrementalSync() throws Exception {
final Database source = testHarness.getSourceDatabase();
// get contents of source before mutating records.
final List<JsonNode> expectedRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build()));
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build()));
// add a new record
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
// mutate a record that was already synced with out updating its cursor value. if we are actually
Expand Down Expand Up @@ -925,7 +926,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
final Database sourceDatabase = testHarness.getSourceDatabase();
// get contents of source before mutating records.
final List<JsonNode> expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME);
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build()));
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build()));
// add a new record
sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
// mutate a record that was already synced with out updating its cursor value. if we are actually
Expand Down Expand Up @@ -1226,9 +1227,9 @@ void testIncrementalSyncMultipleStreams() throws Exception {
testHarness.retrieveSourceRecords(source, STAGING_SCHEMA_NAME + "." + COOL_EMPLOYEES_TABLE_NAME);
final List<JsonNode> expectedRecordsAwesomePeople =
testHarness.retrieveSourceRecords(source, STAGING_SCHEMA_NAME + "." + AWESOME_PEOPLE_TABLE_NAME);
expectedRecordsIdAndName.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build()));
expectedRecordsCoolEmployees.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, GERALT).build()));
expectedRecordsAwesomePeople.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "3").put(COLUMN_NAME, GERALT).build()));
expectedRecordsIdAndName.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build()));
expectedRecordsCoolEmployees.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, GERALT).build()));
expectedRecordsAwesomePeople.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 3).put(COLUMN_NAME, GERALT).build()));
// add a new record to each table
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
source.query(ctx -> ctx.execute("INSERT INTO staging.cool_employees(id, name) VALUES(6, 'geralt')"));
Expand Down Expand Up @@ -1463,8 +1464,8 @@ void testIncrementalDedupeSyncRemoveOneColumn() throws Exception {
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'mike')"));
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')"));
// The expected new raw records should only have the ID column.
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "7").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).build()));
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,23 @@ void testIncrementalCdcSync(final TestInfo testInfo) throws Exception {
// new value and an updated_at time corresponding to this update query
source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2"));
expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "geralt").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()),
beforeFirstUpdate,
Optional.empty()));
expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "2").put(COLUMN_NAME, "yennefer").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_NAME, "yennefer").build()),
beforeFirstUpdate,
Optional.empty()));

// do the same for the other table
source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')"));
source.query(ctx -> ctx.execute("UPDATE color_palette SET color='purple' WHERE id=2"));
expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "4").put(COLUMN_COLOR, "yellow").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()),
beforeFirstUpdate,
Optional.empty()));
expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "2").put(COLUMN_COLOR, "purple").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_COLOR, "purple").build()),
beforeFirstUpdate,
Optional.empty()));

Expand Down Expand Up @@ -431,13 +431,13 @@ void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Excepti
final Instant beforeInsert = Instant.now();
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "6").put(COLUMN_NAME, "geralt").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()),
beforeInsert,
Optional.empty()));

source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')"));
expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, "4").put(COLUMN_COLOR, "yellow").build()),
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()),
beforeInsert,
Optional.empty()));

Expand Down

0 comments on commit c0eedfe

Please sign in to comment.