Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7338] Allow Reloading Segments with Multiple Threads #7893

Merged
merged 24 commits into from Dec 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5a626b7
Added reload parallelism to SegmentReloadMessage
suddendust Dec 12, 2021
3be37c7
Undo unwanted change
suddendust Dec 12, 2021
801b2b6
Accept parallelism param in PinotSegmentRestletResource
suddendust Dec 12, 2021
bcc17b7
Added default parallelism value in PinotSegmentRestletResource reload…
suddendust Dec 12, 2021
3817fce
Minor refactoring
suddendust Dec 12, 2021
f5f9eb8
Fixed checkstyle violations
suddendust Dec 12, 2021
98d1fe9
Added @QueryParam in API signatures
suddendust Dec 12, 2021
00047d8
fix worker pool size
suddendust Dec 13, 2021
a7602c1
added preconditions on parallelism param, added log when noSegments <…
suddendust Dec 13, 2021
0d30a5b
better use of completablefuture API as recommended
suddendust Dec 13, 2021
5242862
spotless
suddendust Dec 13, 2021
7963329
Use existing _refreshThreadsSemaphore to control reload parallelism
suddendust Dec 21, 2021
416b7e8
Reverted unwanted changes
suddendust Dec 21, 2021
14f608e
Removed signature change for InstanceDataManager#reloadSegment
suddendust Dec 21, 2021
00f6a8d
Nullcheck during sema release, minor refactoring
suddendust Dec 22, 2021
6cc9635
fix checkstyle
suddendust Dec 22, 2021
80ebb4d
refactor
suddendust Dec 22, 2021
49ea862
Addressed comments
suddendust Dec 23, 2021
db376e9
Refactoring
suddendust Dec 23, 2021
efb8a3f
spotless
suddendust Dec 23, 2021
ea7800b
Added missing header
suddendust Dec 23, 2021
29c52d4
Update pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRef…
suddendust Dec 24, 2021
2fdb3c2
fixed compilation issue due to upstream commit
suddendust Dec 24, 2021
b486e5a
removed redundant catch block
suddendust Dec 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -93,9 +94,11 @@ void reloadSegment(String tableNameWithType, String segmentName, boolean forceDo
throws Exception;

/**
* Reloads all segments in a table.
* Reloads all segments of a table.
* @param segmentRefreshSemaphore semaphore to control concurrent segment reloads/refresh
*/
void reloadAllSegments(String tableNameWithType, boolean forceDownload)
void reloadAllSegments(String tableNameWithType, boolean forceDownload,
SegmentRefreshSemaphore segmentRefreshSemaphore)
throws Exception;

/**
Expand Down
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.util;

import java.util.concurrent.Semaphore;
import org.slf4j.Logger;


/**
* Wrapper class for semaphore used to control concurrent segment reload/refresh
*/
public class SegmentRefreshSemaphore {

private final Semaphore _semaphore;

public SegmentRefreshSemaphore(int permits, boolean fair) {
if (permits > 0) {
_semaphore = new Semaphore(permits, fair);
} else {
_semaphore = null;
}
}

public void acquireSema(String segmentName, Logger logger)
throws InterruptedException {
if (_semaphore != null) {
long startTime = System.currentTimeMillis();
logger.info("Waiting for lock to refresh : {}, queue-length: {}", segmentName,
_semaphore.getQueueLength());
_semaphore.acquire();
logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, queue-length={})", segmentName,
System.currentTimeMillis() - startTime, _semaphore.getQueueLength());
} else {
logger.info("Locking of refresh threads disabled (segment: {})", segmentName);
}
}

public void releaseSema() {
if (_semaphore != null) {
_semaphore.release();
}
}
}
Expand Up @@ -27,7 +27,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -44,6 +48,7 @@
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
Expand Down Expand Up @@ -197,7 +202,8 @@ public void reloadSegment(String tableNameWithType, String segmentName, boolean
LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType);
SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType, segmentName);
if (segmentMetadata == null) {
LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", segmentName, tableNameWithType);
LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", segmentName,
tableNameWithType);
return;
}

Expand All @@ -212,33 +218,41 @@ public void reloadSegment(String tableNameWithType, String segmentName, boolean
}

@Override
public void reloadAllSegments(String tableNameWithType, boolean forceDownload) {
public void reloadAllSegments(String tableNameWithType, boolean forceDownload,
SegmentRefreshSemaphore segmentRefreshSemaphore)
throws Exception {
LOGGER.info("Reloading all segments in table: {}", tableNameWithType);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkNotNull(tableConfig);

Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);

List<String> failedSegments = new ArrayList<>();
Exception sampleException = null;
List<SegmentMetadata> segmentsMetadata = getAllSegmentsMetadata(tableNameWithType);
for (SegmentMetadata segmentMetadata : segmentsMetadata) {
ExecutorService workers = Executors.newCachedThreadPool();
final AtomicReference<Exception> sampleException = new AtomicReference<>();
//calling thread hasn't acquired any permit so we don't reload any segments using it.
CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata -> CompletableFuture.runAsync(() -> {
String segmentName = segmentMetadata.getName();
try {
reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload);
segmentRefreshSemaphore.acquireSema(segmentMetadata.getName(), LOGGER);
try {
reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload);
} finally {
segmentRefreshSemaphore.releaseSema();
}
} catch (Exception e) {
String segmentName = segmentMetadata.getName();
LOGGER.error("Caught exception while reloading segment: {} in table: {}", segmentName, tableNameWithType, e);
failedSegments.add(segmentName);
sampleException = e;
sampleException.set(e);
}
}
}, workers)).toArray(CompletableFuture[]::new)).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to shut down the executor service after it is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be GCed after ~ 60s (TTL of cached threads). However, it's a good idea to shut it down explicitly. Addressed.


if (sampleException != null) {
workers.shutdownNow();

if (sampleException.get() != null) {
throw new RuntimeException(
String.format("Failed to reload %d/%d segments: %s in table: %s", failedSegments.size(),
segmentsMetadata.size(), failedSegments, tableNameWithType), sampleException);
segmentsMetadata.size(), failedSegments, tableNameWithType), sampleException.get());
}

LOGGER.info("Reloaded all segments in table: {}", tableNameWithType);
}

Expand Down
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.server.starter.helix;

import java.util.concurrent.Semaphore;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
Expand All @@ -30,6 +29,7 @@
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,39 +39,14 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {

// We only allow limited number of segments refresh/reload happen at the same time
// The reason for that is segment refresh/reload will temporarily use double-sized memory
private final Semaphore _refreshThreadSemaphore;
private final InstanceDataManager _instanceDataManager;
private final ServerMetrics _metrics;
private final SegmentRefreshSemaphore _segmentRefreshSemaphore;

public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, ServerMetrics metrics) {
_instanceDataManager = instanceDataManager;
_metrics = metrics;
int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads();
if (maxParallelRefreshThreads > 0) {
_refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true);
} else {
_refreshThreadSemaphore = null;
}
}

private void acquireSema(String context, Logger logger)
throws InterruptedException {
if (_refreshThreadSemaphore != null) {
long startTime = System.currentTimeMillis();
logger.info("Waiting for lock to refresh : {}, queue-length: {}", context,
_refreshThreadSemaphore.getQueueLength());
_refreshThreadSemaphore.acquire();
logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, queue-length={})", context,
System.currentTimeMillis() - startTime, _refreshThreadSemaphore.getQueueLength());
} else {
LOGGER.info("Locking of refresh threads disabled (segment: {})", context);
}
}

private void releaseSema() {
if (_refreshThreadSemaphore != null) {
_refreshThreadSemaphore.release();
}
_segmentRefreshSemaphore = new SegmentRefreshSemaphore(instanceDataManager.getMaxParallelRefreshThreads(), true);
}

// Called each time a message is received.
Expand Down Expand Up @@ -113,15 +88,15 @@ public HelixTaskResult handleMessage()
HelixTaskResult result = new HelixTaskResult();
_logger.info("Handling message: {}", _message);
try {
acquireSema(_segmentName, _logger);
_segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
// The number of retry times depends on the retry count in Constants.
_instanceDataManager.addOrReplaceSegment(_tableNameWithType, _segmentName);
result.setSuccess(true);
} catch (Exception e) {
_metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REFRESH_FAILURES, 1);
Utils.rethrowException(e);
} finally {
releaseSema();
_segmentRefreshSemaphore.releaseSema();
}
return result;
}
Expand All @@ -143,14 +118,19 @@ public HelixTaskResult handleMessage()
_logger.info("Handling message: {}", _message);
try {
if (_segmentName.equals("")) {
acquireSema("ALL", _logger);
// NOTE: the method aborts if any segment reload encounters an unhandled exception,
// and can lead to inconsistent state across segments
_instanceDataManager.reloadAllSegments(_tableNameWithType, _forceDownload);
// and can lead to inconsistent state across segments.
//we don't acquire any permit here as they'll be acquired by worked threads later
_instanceDataManager.reloadAllSegments(_tableNameWithType, _forceDownload,
_segmentRefreshSemaphore);
} else {
// Reload one segment
acquireSema(_segmentName, _logger);
_instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, _forceDownload);
_segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
try {
_instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, _forceDownload);
} finally {
_segmentRefreshSemaphore.releaseSema();
}
}
helixTaskResult.setSuccess(true);
} catch (Throwable e) {
Expand All @@ -159,8 +139,6 @@ public HelixTaskResult handleMessage()
// (without any corresponding logs to indicate failure!) in the callable path
throw new RuntimeException(
"Caught exception while reloading segment: " + _segmentName + " in table: " + _tableNameWithType, e);
} finally {
releaseSema();
}
return helixTaskResult;
}
Expand Down