Skip to content

Commit

Permalink
Update the sourceCatalogId field when the schema is updated (#12505)
Browse files Browse the repository at this point in the history
* Set SourceCatalogId during connectionUpdate operation

* Return catalogId when get a connection

* Fix db operation of standardSync.sourceCatalogId

- value is not set correctly during update operation
- value is not read

* UI modification to set the sourceCatalogId

* remove sourceCatalogId from diff computation

Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
  • Loading branch information
malikdiarra and alafanechere committed May 4, 2022
1 parent 43470a2 commit dda50e1
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 35 deletions.
12 changes: 12 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3218,6 +3218,9 @@ components:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
sourceCatalogId:
type: string
format: uuid
WebBackendConnectionUpdate:
type: object
required:
Expand Down Expand Up @@ -3258,6 +3261,9 @@ components:
type: array
items:
$ref: "#/components/schemas/WebBackendOperationCreateOrUpdate"
sourceCatalogId:
type: string
format: uuid
ConnectionRead:
type: object
required:
Expand Down Expand Up @@ -3298,6 +3304,9 @@ components:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
sourceCatalogId:
type: string
format: uuid
ConnectionSearch:
type: object
properties:
Expand Down Expand Up @@ -4363,6 +4372,9 @@ components:
type: boolean
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
catalogId:
type: string
format: uuid
WebBackendConnectionReadList:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,7 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
.set(CONNECTION.MANUAL, standardSync.getManual())
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
.set(CONNECTION.UPDATED_AT, timestamp)
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
.where(CONNECTION.ID.eq(standardSync.getConnectionId()))
.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
.withSchedule(Jsons.deserialize(record.get(CONNECTION.SCHEDULE).data(), Schedule.class))
.withManual(record.get(CONNECTION.MANUAL))
.withOperationIds(connectionOperationId)
.withResourceRequirements(Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class));
.withResourceRequirements(Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class))
.withSourceCatalogId(record.get(CONNECTION.SOURCE_CATALOG_ID));
}

public static StandardWorkspace buildStandardWorkspace(final Record record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
.withPrefix(update.getPrefix())
.withOperationIds(update.getOperationIds())
.withCatalog(CatalogConverter.toProtocol(update.getSyncCatalog()))
.withStatus(toPersistenceStatus(update.getStatus()));
.withStatus(toPersistenceStatus(update.getStatus()))
.withSourceCatalogId(update.getSourceCatalogId());

if (update.getName() != null) {
newConnection.withName(update.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
final Predicate<JobRead> hasRunningJob = (JobRead job) -> !TERMINAL_STATUSES.contains(job.getStatus());
WebBackendConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob));
setLatestSyncJobProperties(WebBackendConnectionRead, syncJobReadList);
WebBackendConnectionRead.setCatalogId(connectionRead.getSourceCatalogId());
return WebBackendConnectionRead;
}

Expand Down Expand Up @@ -206,6 +207,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
final AirbyteCatalog discovered = discoverSchema.getCatalog();
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered);

connection.setSourceCatalogId(discoverSchema.getCatalogId());
connection.setSyncCatalog(combined);
}

Expand Down Expand Up @@ -399,6 +401,7 @@ protected static ConnectionUpdate toConnectionUpdate(final WebBackendConnectionU
connectionUpdate.schedule(webBackendConnectionUpdate.getSchedule());
connectionUpdate.status(webBackendConnectionUpdate.getStatus());
connectionUpdate.resourceRequirements(webBackendConnectionUpdate.getResourceRequirements());
connectionUpdate.sourceCatalogId(webBackendConnectionUpdate.getSourceCatalogId());

return connectionUpdate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
catalog.getStreams().get(0).getStream().setName("azkaban_users");
catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users");

final UUID newSourceCatalogId = UUID.randomUUID();

final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), NamespaceDefinitionType.class))
.namespaceFormat(standardSync.getNamespaceFormat())
Expand All @@ -335,7 +337,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
.cpuLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuLimit())
.cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest())
.memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit())
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()));
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()))
.sourceCatalogId(newSourceCatalogId);

final ConfiguredAirbyteCatalog configuredCatalog = ConnectionHelpers.generateBasicConfiguredAirbyteCatalog();
configuredCatalog.getStreams().get(0).getStream().withName("azkaban_users");
Expand All @@ -352,7 +355,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
.withStatus(StandardSync.Status.INACTIVE)
.withCatalog(configuredCatalog)
.withManual(true)
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS);
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
.withSourceCatalogId(newSourceCatalogId);

when(configRepository.getStandardSync(standardSync.getConnectionId()))
.thenReturn(standardSync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public void testForConnectionCreateCompleteness() {
public void testForConnectionUpdateCompleteness() {
final Set<String> handledMethods =
Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", "operationIds",
"resourceRequirements", "name");
"resourceRequirements", "name", "sourceCatalogId");

final Set<String> methods = Arrays.stream(ConnectionUpdate.class.getMethods())
.filter(method -> method.getReturnType() == ConnectionUpdate.class)
Expand All @@ -506,7 +506,8 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expected.getSyncCatalog());
.syncCatalog(expected.getSyncCatalog())
.sourceCatalogId(expected.getCatalogId());

when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
Expand Down
1 change: 1 addition & 0 deletions airbyte-webapp/src/core/domain/connection/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ export interface Connection {
source: Source;
destination: Destination;
operations: Operation[];
catalogId: string;
}
1 change: 1 addition & 0 deletions airbyte-webapp/src/hooks/services/useConnectionHook.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type UpdateConnection = {
schedule?: ScheduleProperties | null;
operations?: Operation[];
withRefreshedCatalog?: boolean;
sourceCatalogId?: string;
};

export type ListConnection = { connections: Connection[] };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const ReplicationView: React.FC<ReplicationViewProps> = ({ onAfterSaveSch
connectionId,
status: initialConnection.status || "",
withRefreshedCatalog: activeUpdatingSchemaMode,
sourceCatalogId: connection?.catalogId,
});

setSaved(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static StandardSync updateConnectionObject(final WorkspaceHelper workspac
.withPrefix(update.getPrefix())
.withOperationIds(update.getOperationIds())
.withCatalog(update.getCatalog())
.withStatus(update.getStatus());
.withStatus(update.getStatus())
.withSourceCatalogId(update.getSourceCatalogId());

// update name
if (update.getName() != null) {
Expand Down

0 comments on commit dda50e1

Please sign in to comment.