Skip to content

Commit

Permalink
Fix metadata inconsistency error with concurrent append replace
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Jun 16, 2024
1 parent aec1d5d commit 988c41c
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 201 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.appenderator;

import com.google.common.collect.Iterables;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.realtime.appenderator.PublishedSegmentRetriever;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ActionBasedPublishedSegmentRetriever implements PublishedSegmentRetriever
{
private final TaskActionClient taskActionClient;

public ActionBasedPublishedSegmentRetriever(TaskActionClient taskActionClient)
{
this.taskActionClient = taskActionClient;
}

@Override
public Set<DataSegment> findPublishedSegments(Set<SegmentIdWithShardSpec> segmentIds) throws IOException
{
// Validate that all segments belong to the same datasource
final String dataSource = segmentIds.iterator().next().getDataSource();
final Set<SegmentId> segmentIdsToFind = new HashSet<>();
for (SegmentIdWithShardSpec segmentId : segmentIds) {
segmentIdsToFind.add(segmentId.asSegmentId());
if (!segmentId.getDataSource().equals(dataSource)) {
throw InvalidInput.exception(
"Published segment IDs to find cannot belong to multiple datasources[%s, %s].",
dataSource, segmentId.getDataSource()
);
}
}

final Set<DataSegment> publishedSegments = new HashSet<>();

// Search for the required segments in the "used" set
final List<Interval> usedSearchIntervals = JodaUtils.condenseIntervals(
Iterables.transform(segmentIdsToFind, SegmentId::getInterval)
);
final Collection<DataSegment> foundUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED)
);
for (DataSegment segment : foundUsedSegments) {
if (segmentIdsToFind.contains(segment.getId())) {
publishedSegments.add(segment);
segmentIdsToFind.remove(segment.getId());
}
}

if (segmentIdsToFind.isEmpty()) {
return publishedSegments;
}

// Search for the remaining segments in the "unused" set
final List<String> versions = segmentIdsToFind.stream().map(SegmentId::getVersion).collect(Collectors.toList());
final List<Interval> unusedSearchIntervals = JodaUtils.condenseIntervals(
Iterables.transform(segmentIdsToFind, SegmentId::getInterval)
);
for (Interval searchInterval : unusedSearchIntervals) {
final Collection<DataSegment> foundUnusedSegments = taskActionClient
.submit(new RetrieveUnusedSegmentsAction(dataSource, searchInterval, versions, null, null));
for (DataSegment segment : foundUnusedSegments) {
if (segmentIdsToFind.contains(segment.getId())) {
publishedSegments.add(segment);
segmentIdsToFind.remove(segment.getId());
}
}
}

return publishedSegments;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand Down Expand Up @@ -838,7 +838,7 @@ private static StreamAppenderatorDriver newDriver(
)
),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller(),
toolbox.getJsonMapper(),
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.indexing.common.task;

import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -134,7 +134,7 @@ public static BatchAppenderatorDriver newDriver(
return new BatchAppenderatorDriver(
appenderator,
segmentAllocator,
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public Collection<DataSegment> retrieveUsedSegmentsForIntervals(
{
return toolbox
.getTaskActionClient()
.submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, visibility));
.submit(new RetrieveUsedSegmentsAction(dataSource, intervals, visibility));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -237,7 +237,7 @@ public StreamAppenderatorDriver newDriver(
)
),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller(),
toolbox.getJsonMapper(),
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.PublishedSegmentRetriever;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
Expand All @@ -35,39 +36,18 @@
import java.io.IOException;
import java.util.Set;

public class ActionBasedUsedSegmentCheckerTest
public class ActionBasedPublishedSegmentRetrieverTest
{
@Test
public void testBasic() throws IOException
{
final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D")))
)
).andReturn(
ImmutableList.of(
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(0))
.version("b")
.size(0)
.build(),
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build()
)
);
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction(
"foo",
ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D"))
ImmutableList.of(Intervals.of("2000/P1D")),
Segments.INCLUDING_OVERSHADOWED
)
)
).andReturn(
Expand Down Expand Up @@ -104,12 +84,10 @@ public void testBasic() throws IOException
);
EasyMock.replay(taskActionClient);

final UsedSegmentChecker checker = new ActionBasedUsedSegmentChecker(taskActionClient);
final Set<DataSegment> segments = checker.findUsedSegments(
final PublishedSegmentRetriever checker = new ActionBasedPublishedSegmentRetriever(taskActionClient);
final Set<DataSegment> segments = checker.findPublishedSegments(
ImmutableSet.of(
new SegmentIdWithShardSpec("foo", Intervals.of("2000/P1D"), "a", new LinearShardSpec(1)),
new SegmentIdWithShardSpec("foo", Intervals.of("2001/P1D"), "b", new LinearShardSpec(0)),
new SegmentIdWithShardSpec("bar", Intervals.of("2002/P1D"), "b", new LinearShardSpec(0))
new SegmentIdWithShardSpec("foo", Intervals.of("2000/P1D"), "a", new LinearShardSpec(1))
)
);

Expand All @@ -121,18 +99,40 @@ public void testBasic() throws IOException
.shardSpec(new LinearShardSpec(1))
.version("a")
.size(0)
.build(),
.build()
),
segments
);

EasyMock.verify(taskActionClient);
}

@Test
public void testFindReturnsUnusedSegments() throws IOException
{
final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D")))
)
).andReturn(
ImmutableList.of(
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(0))
.version("b")
.size(0)
.build(),
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build()
),
segments
)
);

EasyMock.verify(taskActionClient);
}
}
Loading

0 comments on commit 988c41c

Please sign in to comment.