Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class ZKMetadataProvider {
private ZKMetadataProvider() {
}

public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
Expand Down Expand Up @@ -725,6 +729,54 @@ public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore
}
}

/**
* Applies the given consumer on segment ZK metadata for the given table in batches.
*
* @param batchSize batch size for ZK get calls
* @param consumer function invoked for each non-null segment metadata
*/
public static void forEachSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
int batchSize, Consumer<SegmentZKMetadata> consumer) {
Preconditions.checkArgument(batchSize > 0, "Segment metadata batchSize must be greater than 0: %s", batchSize);

String segmentsPath = constructPropertyStorePathForResource(tableNameWithType);
List<String> segmentNames = getSegments(propertyStore, tableNameWithType);
if (segmentNames == null || segmentNames.isEmpty()) {
LOGGER.debug("No segments found under path: {}", segmentsPath);
return;
}

for (int startIndex = 0; startIndex < segmentNames.size(); startIndex += batchSize) {
int endIndex = Math.min(startIndex + batchSize, segmentNames.size());
List<String> segmentNameBatch = segmentNames.subList(startIndex, endIndex);

List<String> segmentPathBatch = new ArrayList<>(segmentNameBatch.size());
for (String segmentName : segmentNameBatch) {
segmentPathBatch.add(constructPropertyStorePathForSegment(tableNameWithType, segmentName));
}

List<ZNRecord> znRecords = propertyStore.get(segmentPathBatch, null, AccessOption.PERSISTENT);
int numNullRecords = 0;
if (znRecords != null) {
for (int i = 0; i < segmentNameBatch.size(); i++) {
ZNRecord znRecord = i < znRecords.size() ? znRecords.get(i) : null;
if (znRecord == null) {
numNullRecords++;
} else {
consumer.accept(new SegmentZKMetadata(znRecord));
}
}
} else {
numNullRecords = segmentNameBatch.size();
}

if (numNullRecords > 0) {
LOGGER.warn("Failed to read {}/{} segment ZK metadata under path: {} for table: {}",
numNullRecords, segmentNameBatch.size(), segmentsPath, tableNameWithType);
}
}
}

/**
* Returns the segments for the given table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ public static class ControllerPeriodicTasksConf {
public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
"controller.retentionManager.agedSegmentsDeletionBatchSize";
public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
public static final String SEGMENTS_ZK_METADATA_BATCH_SIZE =
"controller.retentionManager.segmentsZkMetadataBatchSize";
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour.
Expand Down Expand Up @@ -1226,6 +1229,15 @@ public void setAgedSegmentsDeletionBatchSize(int agedSegmentsDeletionBatchSize)
setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE, agedSegmentsDeletionBatchSize);
}

public int getSegmentsZKMetadataBatchSize() {
return getProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE,
ControllerPeriodicTasksConf.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE);
}

public void setSegmentsZKMetadataBatchSize(int segmentZKMetadataBatchSize) {
setProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadataBatchSize);
}

public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,12 +770,9 @@ public Map<String, Map<String, String>> getZookeeperMetadata(
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
Map<String, Map<String, String>> segmentToMetadataMap = new HashMap<>();
List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);

for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this actually be slower due to more ZK point calls rather than earlier getChildren() calls ?

I think if use cases require all data, then earlier getChildren() calls is probably a better choice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For propertyStore, underlying it's also wrapping a for loop on top of zkclient to call each path to fetch the znRecord then construct the segment metadata.

So there is no difference in terms of the zk overhead.

GetChildren for all segment name is always just one zk call to fetch all.

segmentToMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata.toMap());
}
});
return segmentToMetadataMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,20 +902,17 @@ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExclu
selectedSegments = new ArrayList<>(segmentSet);
} else {
selectedSegments = new ArrayList<>();
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
ArrayList<String> filteredSegments = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
forEachSegmentsZKMetadata(tableNameWithType, ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE,
segmentZKMetadata -> {
String segmentName = segmentZKMetadata.getSegmentName();
// Compute the intersection of segmentZK metadata and idealstate for valid segments
if (!segmentSet.contains(segmentName)) {
filteredSegments.add(segmentName);
continue;
}
// Filter by time if the time range is specified
if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
} else if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
selectedSegments.add(segmentName);
}
}
});
LOGGER.info(
"Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: "
+ "{}. Only showing up to 100 filtered segments.", tableNameWithType, filteredSegments.size(),
Expand Down Expand Up @@ -995,13 +992,23 @@ public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String s
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType, batchSize, segmentMetadataConsumer);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType,
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentMetadataConsumer);
}

public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}

public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
for (SegmentZKMetadata zkMetadata : getSegmentsZKMetadata(tableNameWithType)) {
forEachSegmentsZKMetadata(tableNameWithType, zkMetadata -> {
if (zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
LLCSegmentName llcName = LLCSegmentName.of(zkMetadata.getSegmentName());
int partitionGroupId = llcName.getPartitionGroupId();
Expand All @@ -1012,7 +1019,7 @@ public Collection<String> getLastLLCCompletedSegments(String tableNameWithType)
partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, zkMetadata.getSegmentName());
}
}
}
});
return partitionIdToLastLLCCompletedSegmentMap.values();
}

Expand Down Expand Up @@ -1510,13 +1517,12 @@ public void addSchema(Schema schema, boolean override, boolean force)

public void updateSegmentsZKTimeInterval(String tableNameWithType, DateTimeFieldSpec timeColumnFieldSpec) {
LOGGER.info("Updating segment time interval in ZK metadata for table: {}", tableNameWithType);

List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
int version = segmentZKMetadata.toZNRecord().getVersion();
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
}
forEachSegmentsZKMetadata(tableNameWithType,
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadata -> {
int version = segmentZKMetadata.toZNRecord().getVersion();
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
});
}

public void updateSchema(Schema schema, boolean reload, boolean forceTableSchemaUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.helix.HelixDataAccessor;
Expand Down Expand Up @@ -115,6 +116,15 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, batchSize, segmentMetadataConsumer);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentMetadataConsumer);
}

public IdealState getIdealState(String tableNameWithType) {
return _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,20 @@ public int getMaxAttemptsPerTask(String minionTag) {
public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String tableNameWithType) {
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
Set<String> segmentsForTable = idealState.getPartitionSet();
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
_clusterInfoAccessor.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> {
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
selectedSegmentZKMetadataList.add(segmentZKMetadata);
}
}
});
return selectedSegmentZKMetadataList;
}

public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
Set<String> idealStateSegments = idealState.getPartitionSet();
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
_clusterInfoAccessor.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> {
String segmentName = segmentZKMetadata.getSegmentName();
if (idealStateSegments.contains(segmentName)
&& segmentZKMetadata.getStatus().isCompleted() // skip consuming segments
Expand All @@ -189,7 +187,7 @@ public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable
// We avoid picking up such segments to allow RealtimeSegmentValidationManager to fix them.
selectedSegmentZKMetadataList.add(segmentZKMetadata);
}
}
});
return selectedSegmentZKMetadataList;
}

Expand Down
Loading
Loading