Skip to content

Commit

Permalink
🎉 Surface latest sync status for connections. (#2405)
Browse files Browse the repository at this point in the history
Add the latestSyncJobStatus field to the WbConnectionRead struct. Take the chance to rename the lastSync field to latestSyncJobCreatedAt to better reflect what the field captures. lastSync was confusing as I expect it to return the Id/object of the last sync and not the last sync's timestamp.

We keep the current expectation when setting status - the same job that sets the timestamp field is used to set the status field.

We surface the job's status as is. This gives the UI flexibility on displaying the various statuses.
Since the 'latest' sync job is determined by the JobHistoryHandler, I buffed up the job history handler test cases to make sure we do always return the latest sync job.
  • Loading branch information
davinchia committed Mar 25, 2021
1 parent e33b0c2 commit 5ab0883
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 136 deletions.
6 changes: 4 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2152,10 +2152,12 @@ components:
$ref: "#/components/schemas/SourceRead"
destination:
$ref: "#/components/schemas/DestinationRead"
lastSync:
description: epoch time of last sync. null if no sync has taken place.
latestSyncJobCreatedAt:
description: epoch time of the latest sync job. null if no sync job has taken place.
type: integer
format: int64
latestSyncJobStatus:
$ref: "#/components/schemas/JobStatus"
isSyncing:
type: boolean
WbConnectionReadList:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobListRequestBody;
import io.airbyte.api.model.JobRead;
import io.airbyte.api.model.JobReadList;
import io.airbyte.api.model.JobStatus;
import io.airbyte.api.model.JobWithAttemptsRead;
Expand All @@ -62,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

public class WebBackendConnectionsHandler {

Expand Down Expand Up @@ -96,18 +98,29 @@ public WbConnectionReadList webBackendListConnectionsForWorkspace(WorkspaceIdReq
}

private WbConnectionRead buildWbConnectionRead(ConnectionRead connectionRead) throws ConfigNotFoundException, IOException, JsonValidationException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody()
.sourceId(connectionRead.getSourceId());
final SourceRead source = sourceHandler.getSource(sourceIdRequestBody);
final SourceRead source = getSourceRead(connectionRead);
final DestinationRead destination = getDestinationRead(connectionRead);
final WbConnectionRead wbConnectionRead = getWbConnectionRead(connectionRead, source, destination);

final JobReadList syncJobReadList = getSyncJobs(connectionRead);
Predicate<JobRead> hasRunningJob = (JobRead job) -> !TERMINAL_STATUSES.contains(job.getStatus());
wbConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob));
setLatestSyncJobProperties(wbConnectionRead, syncJobReadList);
return wbConnectionRead;
}

final DestinationIdRequestBody destinationIdRequestBody = new DestinationIdRequestBody().destinationId(connectionRead.getDestinationId());
final DestinationRead destination = destinationHandler.getDestination(destinationIdRequestBody);
private SourceRead getSourceRead(ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId());
return sourceHandler.getSource(sourceIdRequestBody);
}

final JobListRequestBody jobListRequestBody = new JobListRequestBody()
.configId(connectionRead.getConnectionId().toString())
.configTypes(Collections.singletonList(JobConfigType.SYNC));
private DestinationRead getDestinationRead(ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
final DestinationIdRequestBody destinationIdRequestBody = new DestinationIdRequestBody().destinationId(connectionRead.getDestinationId());
return destinationHandler.getDestination(destinationIdRequestBody);
}

final WbConnectionRead wbConnectionRead = new WbConnectionRead()
private WbConnectionRead getWbConnectionRead(ConnectionRead connectionRead, SourceRead source, DestinationRead destination) {
return new WbConnectionRead()
.connectionId(connectionRead.getConnectionId())
.sourceId(connectionRead.getSourceId())
.destinationId(connectionRead.getDestinationId())
Expand All @@ -118,14 +131,21 @@ private WbConnectionRead buildWbConnectionRead(ConnectionRead connectionRead) th
.schedule(connectionRead.getSchedule())
.source(source)
.destination(destination);
}

final JobReadList jobReadList = jobHistoryHandler.listJobsFor(jobListRequestBody);
wbConnectionRead.setIsSyncing(jobReadList.getJobs()
.stream().map(JobWithAttemptsRead::getJob)
.anyMatch(job -> job.getStatus() != JobStatus.FAILED && job.getStatus() != JobStatus.SUCCEEDED && job.getStatus() != JobStatus.CANCELLED));
jobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst().ifPresent(job -> wbConnectionRead.setLastSync(job.getCreatedAt()));
private JobReadList getSyncJobs(ConnectionRead connectionRead) throws IOException {
final JobListRequestBody jobListRequestBody = new JobListRequestBody()
.configId(connectionRead.getConnectionId().toString())
.configTypes(Collections.singletonList(JobConfigType.SYNC));
return jobHistoryHandler.listJobsFor(jobListRequestBody);
}

return wbConnectionRead;
private void setLatestSyncJobProperties(WbConnectionRead wbConnectionRead, JobReadList syncJobReadList) {
syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst()
.ifPresent(job -> {
wbConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
wbConnectionRead.setLatestSyncJobStatus(job.getStatus());
});
}

public WbConnectionRead webBackendGetConnection(WebBackendConnectionRequestBody webBackendConnectionRequestBody)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableList;
import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.JobConfigType;
Expand All @@ -43,6 +43,7 @@
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
import io.airbyte.scheduler.models.Job;
Expand All @@ -52,101 +53,156 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

@DisplayName("Job History Handler")
public class JobHistoryHandlerTest {

private static final long JOB_ID = 100L;
private static final long ATTEMPT_ID = 1002L;
private static final String JOB_CONFIG_ID = "123";
private static final JobStatus JOB_STATUS = JobStatus.RUNNING;
private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING;
private static final JobStatus JOB_STATUS = JobStatus.SUCCEEDED;
private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE;
private static final JobConfigType CONFIG_TYPE_FOR_API = JobConfigType.CHECK_CONNECTION_SOURCE;
private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(CONFIG_TYPE)
.withCheckConnection(new JobCheckConnectionConfig());
private static final Path LOG_PATH = Path.of("log_path");
private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>());
private static final long CREATED_AT = System.currentTimeMillis() / 1000;

private Job job;

private static final JobInfoRead JOB_INFO =
new JobInfoRead()
.job(new JobRead()
.id(JOB_ID)
.configId(JOB_CONFIG_ID)
.status(io.airbyte.api.model.JobStatus.RUNNING)
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
.createdAt(CREATED_AT)
.updatedAt(CREATED_AT))
.attempts(Lists.newArrayList(new AttemptInfoRead()
.attempt(new AttemptRead()
.id(ATTEMPT_ID)
.status(io.airbyte.api.model.AttemptStatus.RUNNING)
.updatedAt(CREATED_AT)
.createdAt(CREATED_AT)
.endedAt(CREATED_AT))
.logs(new LogRead().logLines(new ArrayList<>()))));

private static final JobWithAttemptsRead JOB_WITH_ATTEMPTS_READ = new JobWithAttemptsRead()
.job(JOB_INFO.getJob())
.attempts(JOB_INFO.getAttempts().stream().map(AttemptInfoRead::getAttempt).collect(Collectors.toList()));

private Job testJob;
private Attempt testJobAttempt;
private JobPersistence jobPersistence;
private JobHistoryHandler jobHistoryHandler;

private static JobRead toJobInfo(Job job) {
return new JobRead().id(job.getId())
.configId(job.getScope())
.status(Enums.convertTo(job.getStatus(), io.airbyte.api.model.JobStatus.class))
.configType(Enums.convertTo(job.getConfigType(), io.airbyte.api.model.JobConfigType.class))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond());

}

private static List<AttemptInfoRead> toAttemptInfoList(List<Attempt> attempts) {
final List<AttemptRead> attemptReads = attempts.stream().map(JobHistoryHandlerTest::toAttemptRead).collect(Collectors.toList());

final Function<AttemptRead, AttemptInfoRead> toAttemptInfoRead = (AttemptRead a) -> new AttemptInfoRead().attempt(a).logs(EMPTY_LOG_READ);
return attemptReads.stream().map(toAttemptInfoRead).collect(Collectors.toList());
}

private static AttemptRead toAttemptRead(Attempt a) {
return new AttemptRead()
.id(a.getId())
.status(Enums.convertTo(a.getStatus(), io.airbyte.api.model.AttemptStatus.class))
.createdAt(a.getCreatedAtInSecond())
.updatedAt(a.getUpdatedAtInSecond())
.endedAt(a.getEndedAtInSecond().orElse(null));
}

private static Attempt createSuccessfulAttempt(long jobId, long timestamps) {
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, timestamps, timestamps, timestamps);
}

@BeforeEach
public void setUp() {
job = mock(Job.class);
Attempt attempt = mock(Attempt.class);
when(job.getId()).thenReturn(JOB_ID);
when(job.getConfigType()).thenReturn(JOB_CONFIG.getConfigType());
when(job.getScope()).thenReturn(JOB_CONFIG_ID);
when(job.getConfig()).thenReturn(JOB_CONFIG);
when(job.getStatus()).thenReturn(JOB_STATUS);
when(job.getCreatedAtInSecond()).thenReturn(CREATED_AT);
when(job.getUpdatedAtInSecond()).thenReturn(CREATED_AT);
when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt));
when(attempt.getId()).thenReturn(ATTEMPT_ID);
when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS);
when(attempt.getLogPath()).thenReturn(LOG_PATH);
when(attempt.getCreatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getUpdatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getEndedAtInSecond()).thenReturn(Optional.of(CREATED_AT));
testJobAttempt = createSuccessfulAttempt(JOB_ID, CREATED_AT);
testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT,
CREATED_AT);

jobPersistence = mock(JobPersistence.class);
jobHistoryHandler = new JobHistoryHandler(jobPersistence);
}

@Test
public void testListJobsFor() throws IOException {
when(jobPersistence.listJobs(CONFIG_TYPE, JOB_CONFIG_ID)).thenReturn(Collections.singletonList(job));
@Nested
@DisplayName("When listing jobs")
class ListJobs {

@Test
@DisplayName("Should return jobs with/without attempts in descending order")
public void testListJobs() throws IOException {
final var successfulJob = testJob;

final var jobId2 = JOB_ID + 100;
final var createdAt2 = CREATED_AT + 1000;
final var latestJobNoAttempt =
new Job(jobId2, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, Collections.emptyList(), JobStatus.PENDING,
null, createdAt2, createdAt2);

when(jobPersistence.listJobs(CONFIG_TYPE, JOB_CONFIG_ID)).thenReturn(List.of(latestJobNoAttempt, successfulJob));

final var requestBody = new JobListRequestBody()
.configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API))
.configId(JOB_CONFIG_ID);
final var jobReadList = jobHistoryHandler.listJobsFor(requestBody);

final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead(
testJobAttempt)));
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList());
final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead));

assertEquals(expectedJobReadList, jobReadList);
}

@Test
@DisplayName("Should return jobs in descending order regardless of type")
public void testListJobsFor() throws IOException {
final var firstJob = testJob;

final var secondJobId = JOB_ID + 100;
final var createdAt2 = CREATED_AT + 1000;
final var secondJobAttempt = createSuccessfulAttempt(secondJobId, createdAt2);
final var secondJob = new Job(secondJobId, ConfigType.DISCOVER_SCHEMA, JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(secondJobAttempt),
JobStatus.SUCCEEDED, null, createdAt2, createdAt2);

final var latestJobId = secondJobId + 100;
final var createdAt3 = createdAt2 + 1000;
final var latestJob =
new Job(latestJobId, ConfigType.SYNC, JOB_CONFIG_ID, JOB_CONFIG, Collections.emptyList(), JobStatus.PENDING, null, createdAt3, createdAt3);

when(jobPersistence.listJobs(CONFIG_TYPE, JOB_CONFIG_ID)).thenReturn(List.of(latestJob, secondJob, firstJob));

final JobListRequestBody requestBody = new JobListRequestBody()
.configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA))
.configId(JOB_CONFIG_ID);
final JobReadList jobReadList = jobHistoryHandler.listJobsFor(requestBody);

final var firstJobWithAttemptRead =
new JobWithAttemptsRead().job(toJobInfo(firstJob)).attempts(ImmutableList.of(toAttemptRead(testJobAttempt)));
final var secondJobWithAttemptRead =
new JobWithAttemptsRead().job(toJobInfo(secondJob)).attempts(ImmutableList.of(toAttemptRead(secondJobAttempt)));
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)).attempts(Collections.emptyList());
final JobReadList expectedJobReadList =
new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead));

assertEquals(expectedJobReadList, jobReadList);
}

final JobListRequestBody requestBody = new JobListRequestBody()
.configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API))
.configId(JOB_CONFIG_ID);
final JobReadList jobReadList = jobHistoryHandler.listJobsFor(requestBody);

final JobReadList expectedJobReadList = new JobReadList().jobs(Collections.singletonList(JOB_WITH_ATTEMPTS_READ));

assertEquals(expectedJobReadList, jobReadList);
}

@Test
@DisplayName("Should return the right job info")
public void testGetJobInfo() throws IOException {
when(jobPersistence.getJob(JOB_ID)).thenReturn(job);
when(jobPersistence.getJob(JOB_ID)).thenReturn(testJob);

final JobIdRequestBody requestBody = new JobIdRequestBody().id(JOB_ID);
final JobInfoRead jobInfoActual = jobHistoryHandler.getJobInfo(requestBody);

assertEquals(JOB_INFO, jobInfoActual);
final JobInfoRead exp = new JobInfoRead().job(toJobInfo(testJob)).attempts(toAttemptInfoList(ImmutableList.of(testJobAttempt)));

assertEquals(exp, jobInfoActual);
}

@Test
@DisplayName("Should have compatible config enums")
public void testEnumConversion() {
assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package io.airbyte.server.handlers;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -96,7 +96,6 @@ class WebBackendConnectionsHandlerTest {
private WebBackendConnectionsHandler wbHandler;

private SourceRead sourceRead;
private DestinationRead destinationRead;
private ConnectionRead connectionRead;
private WbConnectionRead expected;
private WbConnectionRead expectedWithNewSchema;
Expand All @@ -116,7 +115,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE

final StandardDestinationDefinition destinationDefinition = DestinationDefinitionHelpers.generateDestination();
final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID());
destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition);
DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition);

final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId());
connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync);
Expand Down Expand Up @@ -165,7 +164,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.schedule(connectionRead.getSchedule())
.source(sourceRead)
.destination(destinationRead)
.lastSync(now.getEpochSecond())
.latestSyncJobCreatedAt(now.getEpochSecond())
.latestSyncJobStatus(JobStatus.SUCCEEDED)
.isSyncing(false);

final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateBasicApiCatalog();
Expand All @@ -186,7 +186,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.schedule(expected.getSchedule())
.source(expected.getSource())
.destination(expected.getDestination())
.lastSync(expected.getLastSync())
.latestSyncJobCreatedAt(expected.getLatestSyncJobCreatedAt())
.latestSyncJobStatus(expected.getLatestSyncJobStatus())
.isSyncing(expected.getIsSyncing());

when(schedulerHandler.resetConnection(any())).thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED)));
Expand Down
Loading

0 comments on commit 5ab0883

Please sign in to comment.