Skip to content

Commit

Permalink
Add test for failure scenario and cleanup logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekrb19 committed Jun 15, 2024
1 parent a363511 commit 97e9734
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ private void loadSegmentsOnStartup() throws IOException
startupSegments.addAll(segmentManager.getCachedSegments());
startupSegments.addAll(getBootstrapSegments());

log.info("Server type[%s]", serverTypeConfig.getServerType());

final Stopwatch stopwatch = Stopwatch.createStarted();

// Start a temporary thread pool to load cachedSegments into page cache during bootstrap
Expand Down Expand Up @@ -318,17 +316,18 @@ private void loadSegmentsOnStartup() throws IOException
}
}

/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
*/
private List<DataSegment> getBootstrapSegments()
{
log.info("Fetching bootstrap segments from the coordinator.");
final Stopwatch stopwatch = Stopwatch.createStarted();

final ListenableFuture<CloseableIterator<DataSegment>> bootstrapSegmentsFuture =
coordinatorClient.fetchBootstrapSegments();

List<DataSegment> bootstrapSegments = new ArrayList<>();

try (CloseableIterator<DataSegment> iterator = FutureUtils.getUnchecked(bootstrapSegmentsFuture, true)) {
try (final CloseableIterator<DataSegment> iterator =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true)) {
bootstrapSegments = ImmutableList.copyOf(iterator);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,7 @@ public Response getDataSourceInformation(
public Response getBootstrapSegments()
{
try {
log.info("Hmm call to bootstrap segments..");
Set<DataSegment> broadcastSegments = coordinator.getBroadcastSegments();
log.info(
"Number of bootstrap segments coordinator is returning [%d] and they are [%s]",
broadcastSegments.size(),
broadcastSegments
);
return Response.status(Response.Status.OK).entity(broadcastSegments).build();
}
catch (DruidException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
Expand Down Expand Up @@ -302,7 +303,7 @@ public void testLoadCache() throws Exception
@Test
public void testLoadBootstrapSegments() throws Exception
{
Set<DataSegment> segments = new HashSet<>();
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
Expand Down Expand Up @@ -334,9 +335,31 @@ public void testLoadBootstrapSegments() throws Exception

Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegmentsLoadedIntoPageCache);

handler.stop();

Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}

@Test
public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);

final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());

Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());

handler.start();

Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());

Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);

handler.stop();

Expand Down Expand Up @@ -595,7 +618,7 @@ public int getDropSegmentDelayMillis()
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}

private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager, TestCoordinatorClient coordinatorClient)
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager, CoordinatorClient coordinatorClient)
{
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
}
Expand Down

0 comments on commit 97e9734

Please sign in to comment.