Skip to content

Commit

Permalink
Add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekrb19 committed May 21, 2024
1 parent 92a7bac commit ee74c67
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.datasketches.cpc.TestUtil;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
package org.apache.druid.server.coordination;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand All @@ -47,6 +50,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;

Expand All @@ -61,33 +65,106 @@ public class SegmentLoadDropHandlerCacheTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SegmentLoadDropHandler loadDropHandler;
private TestStorageLocation storageLoc;
private DataSegmentAnnouncer segmentAnnouncer;
private DataSegmentServerAnnouncer serverAnnouncer;
private SegmentManager segmentManager;
private SegmentLoaderConfig config;

private TestStorageLocation storageLoc;
private ObjectMapper objectMapper;

private AtomicInteger observedAnnouncedServerCount;

@Before
public void setup() throws IOException
{
storageLoc = new TestStorageLocation(temporaryFolder);
SegmentLoaderConfig config = new SegmentLoaderConfig()
config = new SegmentLoaderConfig()
.withLocations(Collections.singletonList(storageLoc.toStorageLocationConfig(MAX_SIZE, null)))
.withInfoDir(storageLoc.getInfoDir());
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();

objectMapper = TestHelper.makeJsonMapper();
objectMapper.registerSubtypes(TestSegmentUtils.TestLoadSpec.class);
objectMapper.registerSubtypes(TestSegmentUtils.TestSegmentizerFactory.class);

SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, TestIndex.INDEX_IO, objectMapper);
SegmentManager segmentManager = new SegmentManager(cacheManager);
segmentManager = new SegmentManager(cacheManager);
segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class);

observedAnnouncedServerCount = new AtomicInteger(0);
serverAnnouncer = new DataSegmentServerAnnouncer()
{
@Override
public void announce()
{
observedAnnouncedServerCount.incrementAndGet();
}

@Override
public void unannounce()
{
observedAnnouncedServerCount.decrementAndGet();
}
};

loadDropHandler = new SegmentLoadDropHandler(
config,
segmentAnnouncer,
Mockito.mock(DataSegmentServerAnnouncer.class),
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL)
);

EmittingLogger.registerEmitter(new NoopServiceEmitter());
}

@Test
public void testLoadStartStopWithEmptyLocations() throws IOException
{
final List<StorageLocation> emptyLocations = ImmutableList.of();
segmentManager = new SegmentManager(
new SegmentLocalCacheManager(
emptyLocations,
config,
new LeastBytesUsedStorageLocationSelectorStrategy(emptyLocations),
TestIndex.INDEX_IO,
objectMapper
)
);

loadDropHandler = new SegmentLoadDropHandler(
config,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER)
);

loadDropHandler.start();
Assert.assertEquals(0, observedAnnouncedServerCount.get());

loadDropHandler.stop();
Assert.assertEquals(0, observedAnnouncedServerCount.get());
}

@Test
public void testLoadStartStop() throws IOException
{
loadDropHandler = new SegmentLoadDropHandler(
config,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER)
);

loadDropHandler.start();
Assert.assertEquals(1, observedAnnouncedServerCount.get());

loadDropHandler.stop();
Assert.assertEquals(0, observedAnnouncedServerCount.get());
}

@Test
public void testLoadLocalCache() throws IOException, SegmentLoadingException
{
Expand All @@ -97,17 +174,18 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException
int numSegments = (int) (MAX_SIZE / SEGMENT_SIZE);
List<DataSegment> expectedSegments = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
String name = "segment-" + i;
DataSegment segment = makeSegment("test", name);
String version = "segment-" + i;
DataSegment segment = makeSegment("test", version);
storageLoc.writeSegmentInfoToCache(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
File segmentDir = new File(cacheDir, storageDir);
new TestSegmentUtils.TestLoadSpec((int) SEGMENT_SIZE, name).loadSegment(segmentDir);
new TestSegmentUtils.TestLoadSpec((int) SEGMENT_SIZE, version).loadSegment(segmentDir);
expectedSegments.add(segment);
}

// Start the load drop handler
loadDropHandler.start();
Assert.assertEquals(1, observedAnnouncedServerCount.get());

// Verify the expected announcements
ArgumentCaptor<Iterable<DataSegment>> argCaptor = ArgumentCaptor.forClass(Iterable.class);
Expand All @@ -128,10 +206,13 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException
loadDropHandler.removeSegment(expectedSegments.get(0), null, false);
loadDropHandler.addSegment(newSegment, null);
Mockito.verify(segmentAnnouncer).announceSegment(newSegment);

loadDropHandler.stop();
Assert.assertEquals(0, observedAnnouncedServerCount.get());
}

private DataSegment makeSegment(String dataSource, String name)
private DataSegment makeSegment(String dataSource, String version)
{
return TestSegmentUtils.makeSegment(dataSource, name, SEGMENT_SIZE);
return TestSegmentUtils.makeSegment(dataSource, version, SEGMENT_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,23 @@ public class SegmentLoadDropHandlerTest

private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer segmentAnnouncer;
private AtomicInteger observedAnnouncedSegmentsCount;
private ConcurrentSkipListSet<DataSegment> observedAnnouncedSegments;
private DataSegmentServerAnnouncer serverAnnouncer;
private AtomicInteger observedAnnouncedServerCount;
private LoadDropSegmentCacheManager segmentCacheManager;
private Set<DataSegment> observedSegmentsRemovedFromCache;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
private ScheduledExecutorFactory scheduledExecutorFactory;

private File infoDir;
private List<StorageLocationConfig> locations;
private TestStorageLocation testStorageLocation;

private Set<DataSegment> observedSegmentsRemovedFromCache;
private ConcurrentSkipListSet<DataSegment> observedAnnouncedSegments;
private AtomicInteger observedAnnouncedSegmentsCount;
private AtomicInteger observedAnnouncedServerCount;

@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -710,16 +712,4 @@ private DataSegment makeSegment(String dataSource, String version, Interval inte
{
return TestSegmentUtils.makeSegment(dataSource, version, interval);
}

private void startLoadDropHandler() throws IOException
{
segmentLoadDropHandler.start();
Assert.assertEquals(1, observedAnnouncedServerCount.get());
}

private void stopLoadDropHandler()
{
segmentLoadDropHandler.stop();
Assert.assertEquals(1, observedAnnouncedServerCount.get());
}
}

0 comments on commit ee74c67

Please sign in to comment.