Skip to content

Commit

Permalink
Track number of streams in syncs (#15478)
Browse files Browse the repository at this point in the history
* Add number_of_streams to job sync tracking
  • Loading branch information
alovew committed Aug 9, 2022
1 parent 6c5d1ff commit f506c60
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ private Map<String, Object> generateSyncMetadata(final UUID connectionId) throws
operationUsage.put(OPERATION + operation.getOperatorType(), usageCount + 1);
}
}
return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage);

final Map<String, Object> streamCountData = new HashMap<>();
final Integer streamCount = standardSync.getCatalog().getStreams().size();
streamCountData.put("number_of_streams", streamCount);

return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage, streamCountData);
}

private static ImmutableMap<String, Object> generateStateMetadata(final JobState jobState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.Job;
Expand Down Expand Up @@ -118,6 +121,9 @@ class JobTrackerTest {
.put("table_prefix", false)
.put("operation_count", 0)
.build();
private static final ConfiguredAirbyteCatalog CATALOG = CatalogHelpers
.createConfiguredAirbyteCatalog("stream_name", "stream_namespace",
Field.of("int_field", JsonSchemaType.NUMBER));

private static final ConnectorSpecification SOURCE_SPEC;
private static final ConnectorSpecification DESTINATION_SPEC;
Expand Down Expand Up @@ -275,7 +281,9 @@ void testAsynchronous(final ConfigType configType, final Map<String, Object> add
final ImmutableMap<String, Object> metadata = getJobMetadata(configType, jobId);
final Job job = getJobMock(configType, jobId);
// test when frequency is manual.
when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true));

when(configRepository.getStandardSync(CONNECTION_ID))
.thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG));
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
final Map<String, Object> manualMetadata = MoreMaps.merge(
Expand All @@ -286,7 +294,7 @@ void testAsynchronous(final ConfigType configType, final Map<String, Object> add

// test when frequency is scheduled.
when(configRepository.getStandardSync(CONNECTION_ID))
.thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(false)
.thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(false).withCatalog(CATALOG)
.withSchedule(new Schedule().withUnits(1L).withTimeUnit(TimeUnit.MINUTES)));
final Map<String, Object> scheduledMetadata = MoreMaps.merge(
metadata,
Expand Down Expand Up @@ -393,7 +401,8 @@ void testAsynchronousAttempt(final ConfigType configType, final Job job, final M

final ImmutableMap<String, Object> metadata = getJobMetadata(configType, LONG_JOB_ID);
// test when frequency is manual.
when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true));
when(configRepository.getStandardSync(CONNECTION_ID))
.thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG));
when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(LONG_JOB_ID)).thenReturn(WORKSPACE_ID);
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
Expand Down Expand Up @@ -562,6 +571,7 @@ private ImmutableMap<String, Object> getJobMetadata(final ConfigType configType,
.put("namespace_definition", NamespaceDefinitionType.SOURCE)
.put("table_prefix", false)
.put("operation_count", 0)
.put("number_of_streams", 1)
.build();
}

Expand Down

0 comments on commit f506c60

Please sign in to comment.