Skip to content

Commit

Permalink
Webackend Connection API routes should return Webbackend objects (#3806)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong committed Jun 2, 2021
1 parent a1035e0 commit 368a7ce
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 93 deletions.
14 changes: 7 additions & 7 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WbConnectionReadList"
$ref: "#/components/schemas/WebBackendConnectionReadList"
"404":
description: Workspace not found
"422":
Expand All @@ -1162,7 +1162,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WbConnectionRead"
$ref: "#/components/schemas/WebBackendConnectionRead"
"404":
description: Connection not found
"422":
Expand All @@ -1185,7 +1185,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionRead"
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInput"
/v1/web_backend/connections/update:
Expand All @@ -1206,7 +1206,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionRead"
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInput"
/v1/web_backend/sources/recreate:
Expand Down Expand Up @@ -2540,7 +2540,7 @@ components:
ConnectionStateObject:
type: object
# Web Backend
WbConnectionRead:
WebBackendConnectionRead:
type: object
required:
- connectionId
Expand Down Expand Up @@ -2587,15 +2587,15 @@ components:
$ref: "#/components/schemas/JobStatus"
isSyncing:
type: boolean
WbConnectionReadList:
WebBackendConnectionReadList:
type: object
required:
- connections
properties:
connections:
type: array
items:
$ref: "#/components/schemas/WbConnectionRead"
$ref: "#/components/schemas/WebBackendConnectionRead"
SyncMode:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
import io.airbyte.api.model.SourceReadList;
import io.airbyte.api.model.SourceRecreate;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.api.model.WbConnectionRead;
import io.airbyte.api.model.WbConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionCreate;
import io.airbyte.api.model.WebBackendConnectionRead;
import io.airbyte.api.model.WebBackendConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WorkspaceCreate;
Expand Down Expand Up @@ -506,7 +506,7 @@ public HealthCheckRead getHealthCheck() {
// WEB BACKEND

@Override
public WbConnectionReadList webBackendListConnectionsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
}

Expand All @@ -522,17 +522,17 @@ public SourceRead webBackendRecreateSource(@Valid SourceRecreate sourceRecreate)
}

@Override
public WbConnectionRead webBackendGetConnection(@Valid WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
public WebBackendConnectionRead webBackendGetConnection(@Valid WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody));
}

@Override
public ConnectionRead webBackendCreateConnection(WebBackendConnectionCreate webBackendConnectionCreate) {
public WebBackendConnectionRead webBackendCreateConnection(WebBackendConnectionCreate webBackendConnectionCreate) {
return execute(() -> webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate));
}

@Override
public ConnectionRead webBackendUpdateConnection(@Valid WebBackendConnectionUpdate webBackendConnectionUpdate) {
public WebBackendConnectionRead webBackendUpdateConnection(@Valid WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.WbConnectionRead;
import io.airbyte.api.model.WbConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionCreate;
import io.airbyte.api.model.WebBackendConnectionRead;
import io.airbyte.api.model.WebBackendConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand Down Expand Up @@ -97,27 +97,28 @@ public WebBackendConnectionsHandler(final ConnectionsHandler connectionsHandler,
this.operationsHandler = operationsHandler;
}

public WbConnectionReadList webBackendListConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody)
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {

final List<WbConnectionRead> reads = Lists.newArrayList();
final List<WebBackendConnectionRead> reads = Lists.newArrayList();
for (ConnectionRead connection : connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody).getConnections()) {
reads.add(buildWbConnectionRead(connection));
reads.add(buildWebBackendConnectionRead(connection));
}
return new WbConnectionReadList().connections(reads);
return new WebBackendConnectionReadList().connections(reads);
}

private WbConnectionRead buildWbConnectionRead(ConnectionRead connectionRead) throws ConfigNotFoundException, IOException, JsonValidationException {
private WebBackendConnectionRead buildWebBackendConnectionRead(ConnectionRead connectionRead)
throws ConfigNotFoundException, IOException, JsonValidationException {
final SourceRead source = getSourceRead(connectionRead);
final DestinationRead destination = getDestinationRead(connectionRead);
final OperationReadList operations = getOperationReadList(connectionRead);
final WbConnectionRead wbConnectionRead = getWbConnectionRead(connectionRead, source, destination, operations);
final WebBackendConnectionRead WebBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations);

final JobReadList syncJobReadList = getSyncJobs(connectionRead);
Predicate<JobRead> hasRunningJob = (JobRead job) -> !TERMINAL_STATUSES.contains(job.getStatus());
wbConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob));
setLatestSyncJobProperties(wbConnectionRead, syncJobReadList);
return wbConnectionRead;
WebBackendConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob));
setLatestSyncJobProperties(WebBackendConnectionRead, syncJobReadList);
return WebBackendConnectionRead;
}

private SourceRead getSourceRead(ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand All @@ -135,11 +136,11 @@ private OperationReadList getOperationReadList(ConnectionRead connectionRead) th
return operationsHandler.listOperationsForConnection(connectionIdRequestBody);
}

private WbConnectionRead getWbConnectionRead(ConnectionRead connectionRead,
SourceRead source,
DestinationRead destination,
OperationReadList operations) {
return new WbConnectionRead()
private WebBackendConnectionRead getWebBackendConnectionRead(ConnectionRead connectionRead,
SourceRead source,
DestinationRead destination,
OperationReadList operations) {
return new WebBackendConnectionRead()
.connectionId(connectionRead.getConnectionId())
.sourceId(connectionRead.getSourceId())
.destinationId(connectionRead.getDestinationId())
Expand All @@ -161,15 +162,15 @@ private JobReadList getSyncJobs(ConnectionRead connectionRead) throws IOExceptio
return jobHistoryHandler.listJobsFor(jobListRequestBody);
}

private void setLatestSyncJobProperties(WbConnectionRead wbConnectionRead, JobReadList syncJobReadList) {
private void setLatestSyncJobProperties(WebBackendConnectionRead WebBackendConnectionRead, JobReadList syncJobReadList) {
syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst()
.ifPresent(job -> {
wbConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
wbConnectionRead.setLatestSyncJobStatus(job.getStatus());
WebBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
WebBackendConnectionRead.setLatestSyncJobStatus(job.getStatus());
});
}

public WbConnectionRead webBackendGetConnection(WebBackendConnectionRequestBody webBackendConnectionRequestBody)
public WebBackendConnectionRead webBackendGetConnection(WebBackendConnectionRequestBody webBackendConnectionRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody()
.connectionId(webBackendConnectionRequestBody.getConnectionId());
Expand All @@ -187,7 +188,7 @@ public WbConnectionRead webBackendGetConnection(WebBackendConnectionRequestBody
connection.setSyncCatalog(combined);
}

return buildWbConnectionRead(connection);
return buildWebBackendConnectionRead(connection);
}

@VisibleForTesting
Expand Down Expand Up @@ -239,14 +240,14 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(AirbyteCatalog origina
return new AirbyteCatalog().streams(streams);
}

public ConnectionRead webBackendCreateConnection(WebBackendConnectionCreate webBackendConnectionCreate)
public WebBackendConnectionRead webBackendCreateConnection(WebBackendConnectionCreate webBackendConnectionCreate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = createOperations(webBackendConnectionCreate);
final ConnectionCreate connectionCreate = toConnectionCreate(webBackendConnectionCreate, operationIds);
return connectionsHandler.createConnection(connectionCreate);
return buildWebBackendConnectionRead(connectionsHandler.createConnection(connectionCreate));
}

public ConnectionRead webBackendUpdateConnection(WebBackendConnectionUpdate webBackendConnectionUpdate)
public WebBackendConnectionRead webBackendUpdateConnection(WebBackendConnectionUpdate webBackendConnectionUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);
Expand All @@ -261,8 +262,7 @@ public ConnectionRead webBackendUpdateConnection(WebBackendConnectionUpdate webB
// just create the job
schedulerHandler.syncConnection(connectionId);
}

return connectionRead;
return buildWebBackendConnectionRead(connectionRead);
}

private List<UUID> createOperations(WebBackendConnectionCreate webBackendConnectionCreate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.SyncMode;
import io.airbyte.api.model.SynchronousJobRead;
import io.airbyte.api.model.WbConnectionRead;
import io.airbyte.api.model.WbConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionCreate;
import io.airbyte.api.model.WebBackendConnectionRead;
import io.airbyte.api.model.WebBackendConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand Down Expand Up @@ -106,8 +106,8 @@ class WebBackendConnectionsHandlerTest {
private SourceRead sourceRead;
private ConnectionRead connectionRead;
private OperationReadList operationReadList;
private WbConnectionRead expected;
private WbConnectionRead expectedWithNewSchema;
private WebBackendConnectionRead expected;
private WebBackendConnectionRead expectedWithNewSchema;

@BeforeEach
public void setup() throws IOException, JsonValidationException, ConfigNotFoundException {
Expand Down Expand Up @@ -168,7 +168,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
jobListRequestBody.setConfigId(connectionRead.getConnectionId().toString());
when(jobHistoryHandler.listJobsFor(jobListRequestBody)).thenReturn(jobReadList);

expected = new WbConnectionRead()
expected = new WebBackendConnectionRead()
.connectionId(connectionRead.getConnectionId())
.sourceId(connectionRead.getSourceId())
.destinationId(connectionRead.getDestinationId())
Expand All @@ -192,7 +192,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.jobInfo(mock(SynchronousJobRead.class))
.catalog(modifiedCatalog));

expectedWithNewSchema = new WbConnectionRead()
expectedWithNewSchema = new WebBackendConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
Expand Down Expand Up @@ -224,9 +224,9 @@ public void testWebBackendListConnectionsForWorkspace() throws ConfigNotFoundExc
when(connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)).thenReturn(connectionReadList);
when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList);

final WbConnectionReadList wbConnectionReadList = wbHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody);
assertEquals(1, wbConnectionReadList.getConnections().size());
assertEquals(expected, wbConnectionReadList.getConnections().get(0));
final WebBackendConnectionReadList WebBackendConnectionReadList = wbHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody);
assertEquals(1, WebBackendConnectionReadList.getConnections().size());
assertEquals(expected, WebBackendConnectionReadList.getConnections().get(0));
}

@Test
Expand All @@ -240,9 +240,9 @@ public void testWebBackendGetConnection() throws ConfigNotFoundException, IOExce
when(connectionsHandler.getConnection(connectionIdRequestBody)).thenReturn(connectionRead);
when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList);

final WbConnectionRead wbConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionRequestBody);
final WebBackendConnectionRead WebBackendConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionRequestBody);

assertEquals(expected, wbConnectionRead);
assertEquals(expected, WebBackendConnectionRead);
}

@Test
Expand All @@ -257,9 +257,9 @@ public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundExce
when(connectionsHandler.getConnection(connectionIdRequestBody)).thenReturn(connectionRead);
when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList);

final WbConnectionRead wbConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody);
final WebBackendConnectionRead WebBackendConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody);

assertEquals(expectedWithNewSchema, wbConnectionRead);
assertEquals(expectedWithNewSchema, WebBackendConnectionRead);
}

@Test
Expand Down Expand Up @@ -368,7 +368,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
.status(expected.getStatus())
.schedule(expected.getSchedule()));

ConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody);
WebBackendConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expected.getSyncCatalog(), connectionRead.getSyncCatalog());

Expand Down Expand Up @@ -407,7 +407,7 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config
.status(expected.getStatus())
.schedule(expected.getSchedule()));
when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId()));
ConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);
WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds());
verify(operationsHandler, times(1)).updateOperation(operationUpdate);
Expand Down Expand Up @@ -436,7 +436,7 @@ void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, Con
.status(expected.getStatus())
.schedule(expected.getSchedule()));

ConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody);
WebBackendConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), connectionRead.getSyncCatalog());

Expand Down
Loading

0 comments on commit 368a7ce

Please sign in to comment.