Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator primary segment assignment fix #5532

Merged
merged 3 commits into from Apr 2, 2018
Merged
Changes from 2 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -33,9 +33,12 @@
*/
public class SegmentReplicantLookup
{


This comment has been minimized.

Copy link
@jihoonson

jihoonson Mar 28, 2018

Contributor

Unnecessary lines.

public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();

for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
@@ -48,17 +51,29 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}

// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}

return new SegmentReplicantLookup(segmentsInCluster);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}

private final Table<String, String, Integer> segmentsInCluster;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
private final Table<String, String, Integer> loadingSegments;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster, Table<String, String, Integer> loadingSegments)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}

public Map<String, Integer> getClusterTiers(String segmentId)
@@ -82,4 +97,30 @@ public int getLoadedReplicants(String segmentId, String tier)
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}

public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}

public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}
@@ -95,8 +95,9 @@ private void assign(
final CoordinatorStats stats
)
{
// if primary replica already exists
if (!currentReplicants.isEmpty()) {
// if primary replica already exists or is loading
final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
if (!currentReplicants.isEmpty() || loading > 0) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
@@ -169,7 +170,6 @@ private ServerHolder assignPrimary(
if (targetReplicantsInTier <= 0) {
continue;
}

final String tier = entry.getKey();

final List<ServerHolder> holders = getFilteredHolders(
@@ -228,7 +228,7 @@ private void assignReplicas(
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
currentReplicants.getOrDefault(tier, 0),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier),
params,
createLoadQueueSizeLimitingPredicate(params),
segment
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.DruidServer;
@@ -942,6 +943,8 @@ public void testDropServerActuallyServesSegment()

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();

EasyMock.replay(anotherMockPeon);

DruidCluster druidCluster = new DruidCluster(
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.