diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index 92683068b3b4..3d67f64d0cc8 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -126,7 +126,7 @@ public SegmentLocalCacheManager( ); } - if (this.config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() > 0) { + if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() > 0) { loadOnDownloadExec = Executors.newFixedThreadPool( config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload(), Execs.makeThreadFactory("LoadSegmentsIntoPageCacheOnDownload-%s") @@ -148,11 +148,11 @@ public List getCachedSegments() throws IOException "canHandleSegments() is false. getCachedSegments() must be invoked only when canHandleSegments() returns true." ); } - final File baseDir = getInfoDir(); - FileUtils.mkdirp(baseDir); + final File infoDir = getEffectiveInfoDir(); + FileUtils.mkdirp(infoDir); final List cachedSegments = new ArrayList<>(); - final File[] segmentsToLoad = baseDir.listFiles(); + final File[] segmentsToLoad = infoDir.listFiles(); int ignored = 0; @@ -173,7 +173,7 @@ public List getCachedSegments() throws IOException } } catch (Exception e) { - log.makeAlert(e, "Failed to load segment from segmentInfo file") + log.makeAlert(e, "Failed to load segment from segment cache file.") .addData("file", file) .emit(); } @@ -191,8 +191,7 @@ public List getCachedSegments() throws IOException @Override public void storeInfoFile(DataSegment segment) throws IOException { - final File infoDir = getInfoDir(); - final File segmentInfoCacheFile = new File(infoDir, segment.getId().toString()); + final File segmentInfoCacheFile = new File(getEffectiveInfoDir(), segment.getId().toString()); if (!segmentInfoCacheFile.exists()) { jsonMapper.writeValue(segmentInfoCacheFile, segment); } @@ -201,7 +200,7 @@ public void storeInfoFile(DataSegment segment) throws IOException @Override public void removeInfoFile(DataSegment segment) { - final File segmentInfoCacheFile = new File(getInfoDir(), segment.getId().toString()); + final File segmentInfoCacheFile = new File(getEffectiveInfoDir(), segment.getId().toString()); if (!segmentInfoCacheFile.delete()) { log.warn("Unable to delete cache file[%s] for segment[%s].", segmentInfoCacheFile, segment.getId()); } @@ -248,7 +247,19 @@ private SegmentizerFactory getSegmentFactory(final File segmentFiles) throws Seg return factory; } - private File getInfoDir() + /** + * Determines and returns the effective segment info directory based on the configuration settings. + * The directory is selected based on the following configurations injected into this class: + *
    + *
  • {@link SegmentLoaderConfig#infoDir} - If this is set, it is used as the info directory.
  • + *
  • {@link SegmentLoaderConfig#locations} - If the info directory is not set, the first location from this list is used.
  • + *
  • List of {@link StorageLocation}s injected - If both the info directory and locations list are not set, the + * first storage location is used.
  • + *
+ * + * @throws DruidException if none of the configurations are set, and the info directory cannot be determined. + */ + private File getEffectiveInfoDir() { final File infoDir; if (config.getInfoDir() != null) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index 97f0d40bbefd..922f373acebe 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -41,19 +41,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.ArgumentMatchers.any; - /** * Similar to {@link SegmentLoadDropHandlerTest}. This class includes tests that cover the * storage location layer as well. @@ -65,7 +60,7 @@ public class SegmentLoadDropHandlerCacheTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); private SegmentLoadDropHandler loadDropHandler; - private DataSegmentAnnouncer segmentAnnouncer; + private TestDataSegmentAnnouncer segmentAnnouncer; private DataSegmentServerAnnouncer serverAnnouncer; private SegmentManager segmentManager; private SegmentLoaderConfig loaderConfig; @@ -108,7 +103,7 @@ public List getLocations() objectMapper ); segmentManager = new SegmentManager(cacheManager); - segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class); + segmentAnnouncer = new TestDataSegmentAnnouncer(); observedAnnouncedServerCount = new AtomicInteger(0); serverAnnouncer = new DataSegmentServerAnnouncer() @@ -205,24 +200,17 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException Assert.assertEquals(1, observedAnnouncedServerCount.get()); // Verify the expected announcements - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - Mockito.verify(segmentAnnouncer).announceSegments(argCaptor.capture()); - List announcedSegments = new ArrayList<>(); - argCaptor.getValue().forEach(announcedSegments::add); - announcedSegments.sort(Comparator.comparing(DataSegment::getVersion)); - Assert.assertEquals(expectedSegments, announcedSegments); - - // make sure adding segments beyond allowed size fails - Mockito.reset(segmentAnnouncer); + Assert.assertTrue(segmentAnnouncer.getObservedSegments().containsAll(expectedSegments)); + + // Make sure adding segments beyond allowed size fails DataSegment newSegment = makeSegment("test", "new-segment"); loadDropHandler.addSegment(newSegment, null); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any()); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any()); + Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(newSegment)); - // clearing some segment should allow for new segments + // Clearing some segment should allow for new segments loadDropHandler.removeSegment(expectedSegments.get(0), null, false); loadDropHandler.addSegment(newSegment, null); - Mockito.verify(segmentAnnouncer).announceSegment(newSegment); + Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(newSegment)); loadDropHandler.stop(); Assert.assertEquals(0, observedAnnouncedServerCount.get()); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 466618a752d8..02e9ed3c10fc 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -61,7 +61,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; public class SegmentLoadDropHandlerTest { @@ -156,13 +155,13 @@ public void testSegmentLoading1() throws Exception handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); handler.removeSegment(segment, DataSegmentChangeCallback.NOOP); - Assert.assertFalse(segmentAnnouncer.observedSegments.contains(segment)); + Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(segment)); handler.addSegment(segment, DataSegmentChangeCallback.NOOP); @@ -177,14 +176,14 @@ public void testSegmentLoading1() throws Exception Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments); Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache); - Assert.assertEquals(ImmutableList.of(segment), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment), segmentAnnouncer.getObservedSegments()); Assert.assertFalse( "segment files shouldn't be deleted", cacheManager.observedSegmentsRemovedFromCache.contains(segment) ); handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } /** @@ -205,17 +204,17 @@ public void testSegmentLoading2() throws Exception handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); handler.addSegment(segment, DataSegmentChangeCallback.NOOP); - Assert.assertTrue(segmentAnnouncer.observedSegments.contains(segment)); + Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment)); handler.removeSegment(segment, DataSegmentChangeCallback.NOOP); - Assert.assertFalse(segmentAnnouncer.observedSegments.contains(segment)); + Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(segment)); handler.addSegment(segment, DataSegmentChangeCallback.NOOP); @@ -233,14 +232,14 @@ public void testSegmentLoading2() throws Exception Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments); Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache); - Assert.assertTrue(segmentAnnouncer.observedSegments.contains(segment)); + Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment)); Assert.assertFalse( "segment files shouldn't be deleted", cacheManager.observedSegmentsRemovedFromCache.contains(segment) ); handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } @Test @@ -271,7 +270,7 @@ public void testLoadCache() throws Exception handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { @@ -279,7 +278,7 @@ public void testLoadCache() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments); @@ -289,7 +288,7 @@ public void testLoadCache() throws Exception handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } @Test @@ -312,7 +311,7 @@ public void testStartStop() throws Exception handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { @@ -320,7 +319,7 @@ public void testStartStop() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments); @@ -330,7 +329,7 @@ public void testStartStop() throws Exception handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } @Test(timeout = 60_000L) @@ -342,7 +341,7 @@ public void testProcessBatch() throws Exception handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); DataSegment segment2 = makeSegment("batchtest2", "1", Intervals.of("P1d/2011-04-01")); @@ -369,7 +368,7 @@ public void testProcessBatch() throws Exception result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); Assert.assertEquals(SegmentChangeStatus.SUCCESS, result.get(0).getStatus()); - Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments()); final ImmutableList expectedSegments = ImmutableList.of(segment1); Assert.assertEquals(expectedSegments, cacheManager.observedSegments); @@ -378,7 +377,7 @@ public void testProcessBatch() throws Exception Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache); handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } @Test(timeout = 60_000L) @@ -394,7 +393,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); @@ -406,7 +405,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ } List result = future.get(); Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments()); future = handler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -414,10 +413,10 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ } result = future.get(); Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.getObservedSegments()); handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } @Test(timeout = 60_000L) @@ -470,7 +469,7 @@ public int getDropSegmentDelayMillis() handler.start(); - Assert.assertEquals(1, serverAnnouncer.observedCount.get()); + Assert.assertEquals(1, serverAnnouncer.getObservedCount()); final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); @@ -482,7 +481,7 @@ public int getDropSegmentDelayMillis() } List result = future.get(); Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments()); scheduledRunnable.clear(); // Request 2: Drop the segment @@ -493,8 +492,8 @@ public int getDropSegmentDelayMillis() } result = future.get(); Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.observedSegments); - Assert.assertFalse(segmentAnnouncer.observedSegments.contains(segment1)); // + Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments()); + Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(segment1)); // scheduledRunnable.clear(); // check invocations after a load-drop sequence @@ -511,7 +510,7 @@ public int getDropSegmentDelayMillis() } result = future.get(); Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments()); scheduledRunnable.clear(); // check invocations - 1 more load has happened @@ -528,7 +527,7 @@ public int getDropSegmentDelayMillis() } result = future.get(); Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); - Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.observedSegments); + Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.getObservedSegments()); scheduledRunnable.clear(); // check invocations - the load segment counter should bump up @@ -538,7 +537,7 @@ public int getDropSegmentDelayMillis() .dropSegment(ArgumentMatchers.any()); handler.stop(); - Assert.assertEquals(0, serverAnnouncer.observedCount.get()); + Assert.assertEquals(0, serverAnnouncer.getObservedCount()); } private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager) @@ -662,70 +661,4 @@ public void cleanup(DataSegment segment) this.observedSegmentsRemovedFromCache.add(segment); } } - - /** - * A test data segment announcer that tracks the state of all segment announcements and unannouncements. - */ - private static class TestDataSegmentAnnouncer extends NoopDataSegmentAnnouncer - { - private final List observedSegments; - - TestDataSegmentAnnouncer() - { - this.observedSegments = new ArrayList<>(); - } - - @Override - public void announceSegment(DataSegment segment) - { - this.observedSegments.add(segment); - } - - @Override - public void unannounceSegment(DataSegment segment) - { - this.observedSegments.remove(segment); - } - - @Override - public void announceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - this.observedSegments.add(segment); - } - } - - @Override - public void unannounceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - observedSegments.remove(segment); - } - } - } - - /** - * A test data server announcer that tracks the count of all announcements and unannouncements. - */ - private static class TestDataServerAnnouncer implements DataSegmentServerAnnouncer - { - private final AtomicInteger observedCount; - - TestDataServerAnnouncer() - { - this.observedCount = new AtomicInteger(0); - } - - @Override - public void announce() - { - observedCount.incrementAndGet(); - } - - @Override - public void unannounce() - { - observedCount.decrementAndGet(); - } - } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java new file mode 100644 index 000000000000..646687930359 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import org.apache.druid.timeline.DataSegment; + +import java.util.ArrayList; +import java.util.List; + +/** + * A test data segment announcer that tracks the state of all segment announcements and unannouncements. + */ +public class TestDataSegmentAnnouncer extends NoopDataSegmentAnnouncer +{ + private final List observedSegments; + + TestDataSegmentAnnouncer() + { + this.observedSegments = new ArrayList<>(); + } + + @Override + public void announceSegment(DataSegment segment) + { + this.observedSegments.add(segment); + } + + @Override + public void unannounceSegment(DataSegment segment) + { + this.observedSegments.remove(segment); + } + + @Override + public void announceSegments(Iterable segments) + { + for (DataSegment segment : segments) { + observedSegments.add(segment); + } + } + + @Override + public void unannounceSegments(Iterable segments) + { + for (DataSegment segment : segments) { + observedSegments.remove(segment); + } + } + + public List getObservedSegments() + { + return observedSegments; + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestDataServerAnnouncer.java b/server/src/test/java/org/apache/druid/server/coordination/TestDataServerAnnouncer.java new file mode 100644 index 000000000000..d88b753f5ff6 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/TestDataServerAnnouncer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A test data server announcer that tracks the count of all announcements and unannouncements. + * The counter is incremented and decremented on each announce and unannounce respectively. + */ +public class TestDataServerAnnouncer implements DataSegmentServerAnnouncer +{ + private final AtomicInteger observedCount; + + TestDataServerAnnouncer() + { + this.observedCount = new AtomicInteger(0); + } + + @Override + public void announce() + { + observedCount.incrementAndGet(); + } + + @Override + public void unannounce() + { + observedCount.decrementAndGet(); + } + + public int getObservedCount() + { + return observedCount.get(); + } +}