Skip to content

Commit

Permalink
readability improvement in web backend catalog handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte committed Dec 9, 2022
1 parent b47269d commit 8252783
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* constructs a full picture of all existing configured + all new / updated streams in the newest
* catalog.
*/
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(), refreshedCatalog.get().getCatalog());
syncCatalog = updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(),
refreshedCatalog.get().getCatalog());
/*
* Diffing the catalog used to make the configured catalog gives us the clearest diff between the
* schema when the configured catalog was made and now. In the case where we do not have the
Expand All @@ -360,7 +361,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
connection.setStatus(refreshedCatalog.get().getConnectionStatus());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(), catalogUsedToMakeConfiguredCatalog.get());
syncCatalog = updateSchemaWithOriginalDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
// diff not relevant if there was no refresh.
diff = null;
} else {
Expand All @@ -374,6 +375,11 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private AirbyteCatalog updateSchemaWithOriginalDiscoveredCatalog(AirbyteCatalog configuredCatalog, AirbyteCatalog originalDiscoveredCatalog) {
// We pass the original discovered catalog in as the "new" discovered catalog.
return updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, originalDiscoveredCatalog, originalDiscoveredCatalog);
}

private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
Expand All @@ -397,9 +403,9 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
* catalog
*/
@VisibleForTesting
protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog originalConfigured,
AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
/*
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
* we just define a quick-and-dirty record class.
Expand Down Expand Up @@ -529,8 +535,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
.getConnectionAirbyteCatalog(connectionId);
if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// Update the Catalog returned to include all streams, including disabled ones
final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get(),
catalogUsedToMakeConfiguredCatalog.get());
final AirbyteCatalog syncCatalog =
updateSchemaWithRefreshedDiscoveredCatalog(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get(),
catalogUsedToMakeConfiguredCatalog.get());
updatedConnectionRead.setSyncCatalog(syncCatalog);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,8 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
when(connectionsHandler.getConnectionAirbyteCatalog(connectionRead.getConnectionId())).thenReturn(Optional.ofNullable(fullAirbyteCatalog));

final AirbyteCatalog expectedCatalogReturned =
WebBackendConnectionsHandler.updateSchemaWithDiscovery(expected.getSyncCatalog(), expected.getSyncCatalog(), fullAirbyteCatalog);
WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(expected.getSyncCatalog(), expected.getSyncCatalog(),
fullAirbyteCatalog);
final WebBackendConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedCatalogReturned, connectionRead.getSyncCatalog());
Expand Down Expand Up @@ -1076,7 +1077,7 @@ void testUpdateSchemaWithDiscoveryFromEmpty() {
.aliasName(STREAM1)
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered);
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -1126,7 +1127,7 @@ void testUpdateSchemaWithDiscoveryResetStream() {
.aliasName(STREAM1)
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered);
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -1203,7 +1204,7 @@ void testUpdateSchemaWithDiscoveryMergeNewStream() {
.setSelected(false);
expected.getStreams().add(expectedNewStream);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered);
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -1251,7 +1252,7 @@ void testUpdateSchemaWithNamespacedStreams() {
.aliasName(STREAM1)
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered);
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered);

assertEquals(expected, actual);
}
Expand Down

0 comments on commit 8252783

Please sign in to comment.