Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected SeekableStreamSupervisorReportPayload<Integer, Long> createReportPaylo
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<Integer, Long> partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets());
Map<Integer, Long> partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
Expand Down Expand Up @@ -260,7 +260,7 @@ protected Map<Integer, Long> getPartitionRecordLag()
);
}

return getRecordLagPerPartition(highestCurrentOffsets);
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
}

@Nullable
Expand All @@ -271,10 +271,10 @@ protected Map<Integer, Long> getPartitionTimeLag()
return null;
}

@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
// Used while calculating cummulative lag for entire stream
private Map<Integer, Long> getRecordLagPerPartitionInLatestSequences(Map<Integer, Long> currentOffsets)
{
if (latestSequenceFromStream == null) {
return Collections.emptyMap();
Expand All @@ -293,6 +293,30 @@ protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> current
);
}

@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while generating Supervisor lag reports per task
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
{
if (latestSequenceFromStream == null || currentOffsets == null) {
return Collections.emptyMap();
}

return currentOffsets
.entrySet()
.stream()
.filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old method reports the lag for every partition present in latestSequenceFromStream.
But this new one also seems to be doing the same thing because you are filtering the entries using the latestSequenceFromStream.

I am perhaps missing something but not sure how the two methods are different.
Please also add a test to verify the difference between the two results.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous method includes lags for all the latest partitions of the stream whereas this method only includes the required partitions (as passed by currentOffsets) lags.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also will add a unit test for the reports.

.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() != null
? latestSequenceFromStream.get(e.getKey()) - e.getValue()
: 0
)
);
}

@Override
protected Map<Integer, Long> getTimeLagPerPartition(Map<Integer, Long> currentOffsets)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,151 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}

@Test
public void testReportWhenMultipleActiveTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);

supervisor = getTestableSupervisorForIdleBehaviour(1, 2, true, "PT10S", null, null, false, null);

addSomeEvents(6);

Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(0, 2L, 2, 1L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);

Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(1, 3L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(1, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);

List<Task> existingTasks = ImmutableList.of(id1, id2);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();

replayAll();

supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
ImmutableMap.of(0, 0L, 2, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
ImmutableMap.of(1, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id2"),
ImmutableSet.of()
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
verifyAll();

EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 2L, 2, 1L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 3L)));

EasyMock.replay(taskClient);

supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();

SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();

Assert.assertEquals(DATASOURCE, report.getId());

KafkaSupervisorReportPayload payload = report.getPayload();

Assert.assertEquals(DATASOURCE, payload.getDataSource());
Assert.assertEquals(10L, payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, payload.getPartitions());
Assert.assertEquals(1, payload.getReplicas());
Assert.assertEquals(topic, payload.getStream());
Assert.assertEquals(2, payload.getActiveTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());

TaskReportData id1TaskReport = payload.getActiveTasks().get(0);
TaskReportData id2TaskReport = payload.getActiveTasks().get(1);

Assert.assertEquals("id2", id2TaskReport.getId());
Assert.assertEquals(ImmutableMap.of(1, 0L), id2TaskReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(1, 3L), id2TaskReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(1, 4L), id2TaskReport.getLag());

Assert.assertEquals("id1", id1TaskReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 2, 0L), id1TaskReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 2L, 2, 1L), id1TaskReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(0, 5L, 2, 6L), id1TaskReport.getLag());

Assert.assertEquals(ImmutableMap.of(0, 7L, 1, 7L, 2, 7L), payload.getLatestOffsets());
Assert.assertEquals(ImmutableMap.of(0, 5L, 1, 4L, 2, 6L), payload.getMinimumLag());
Assert.assertEquals(15L, (long) payload.getAggregateLag());
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}

@Test
public void testSupervisorIsIdleIfStreamInactive() throws Exception
{
Expand Down