Skip to content

Commit

Permalink
Add test case for new fields appearing in data (#15372)
Browse files Browse the repository at this point in the history
* add test case for new field(s) appearing in data

* rework test to verify that sync at least not failed if new fields are present
  • Loading branch information
yurii-bidiuk committed Aug 10, 2022
1 parent 6e1a76f commit c724630
Showing 1 changed file with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,57 @@ public void testEntrypointEnvVar() throws Exception {
assertFalse(entrypoint.isBlank());
}

/**
* Verify that destination doesn't fail if new fields arrive in the data after initial schema
* discovery and sync.
*
* @throws Exception
*/
@Test
public void testSyncNotFailsWithNewFields() throws Exception {
if (!implementsOverwrite()) {
LOGGER.info("Destination's spec.json does not support overwrite sync mode.");
return;
}

final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false);
final var stream = catalog.getStreams().get(0);

// Run second sync with new fields on the message
final List<AirbyteMessage> secondSyncMessagesWithNewFields = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(stream.getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-03-31T00:00:00Z")
.put("newFieldString", "Value for new field")
.put("newFieldNumber", 3)
.put("HKD", 10.1)
.put("NZD", 700.1)
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

// Run sync and verify that all message were written without failing
runSyncAndVerifyStateOutput(config, secondSyncMessagesWithNewFields, configuredCatalog, false);
var destinationOutput = retrieveRecords(testEnv, stream.getName(), getDefaultSchema(config), stream.getJsonSchema());
// Remove state message
secondSyncMessagesWithNewFields.removeIf(airbyteMessage -> airbyteMessage.getType().equals(Type.STATE));
assertEquals(secondSyncMessagesWithNewFields.size(), destinationOutput.size());
}

/**
* Whether the destination should be tested against different namespaces.
*/
Expand Down

0 comments on commit c724630

Please sign in to comment.