Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1269,17 +1269,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
// we'd better refine the logic among them
def workerCongestionControlDiskBufferLowWatermark: Option[Long] =
def workerCongestionControlDiskBufferLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
def workerCongestionControlDiskBufferHighWatermark: Option[Long] =
def workerCongestionControlDiskBufferHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
def workerCongestionControlUserProduceSpeedLowWatermark: Option[Long] =
def workerCongestionControlUserProduceSpeedLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlUserProduceSpeedHighWatermark: Option[Long] =
def workerCongestionControlUserProduceSpeedHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
def workerCongestionControlWorkerProduceSpeedLowWatermark: Option[Long] =
def workerCongestionControlWorkerProduceSpeedLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlWorkerProduceSpeedHighWatermark: Option[Long] =
def workerCongestionControlWorkerProduceSpeedHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)

def workerCongestionControlUserInactiveIntervalMs: Long =
Expand Down Expand Up @@ -3794,17 +3794,17 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark")
.withAlternative("celeborn.worker.congestionControl.low.watermark")
.categories("worker")
.doc("Will stop congest users if the total pending bytes of disk buffer is lower than " +
"this configuration")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark")
.withAlternative("celeborn.worker.congestionControl.high.watermark")
.categories("worker")
Expand All @@ -3815,41 +3815,41 @@ object CelebornConf extends Logging {
s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark")
.categories("worker")
.doc("For those users that produce byte speeds less than this configuration, " +
"stop congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark")
.categories("worker")
.doc("For those users that produce byte speeds greater than this configuration, " +
"start congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark")
.categories("worker")
.doc("Stop congestion If worker total produce speed less than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark")
.categories("worker")
.doc("Start congestion If worker total produce speed greater than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
.createWithDefault(Long.MaxValue)

val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.quota

case class UserTrafficQuota(
userProduceSpeedHighWatermark: Long,
userProduceSpeedLowWatermark: Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.quota

case class WorkerTrafficQuota(
diskBufferHighWatermark: Long,
diskBufferLowWatermark: Long,
workerProduceSpeedHighWatermark: Long,
workerProduceSpeedLowWatermark: Long)
12 changes: 6 additions & 6 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ license: |
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout |
| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | |
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |
| celeborn.worker.congestionControl.diskBuffer.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
| celeborn.worker.congestionControl.diskBuffer.low.watermark | <undefined> | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
| celeborn.worker.congestionControl.diskBuffer.high.watermark | 9223372036854775807b | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
| celeborn.worker.congestionControl.diskBuffer.low.watermark | 9223372036854775807b | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
| celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | |
| celeborn.worker.congestionControl.sample.time.window | 10s | false | The worker holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | |
| celeborn.worker.congestionControl.user.inactive.interval | 10min | false | How long will consider this user is inactive if it doesn't send data | 0.3.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | <undefined> | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | <undefined> | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | <undefined> | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | <undefined> | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | 9223372036854775807b | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | 9223372036854775807b | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | 9223372036854775807b | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -48,6 +49,8 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");

private LinkedBlockingDeque<Runnable> listeners;

public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException {
this.celebornConf = celebornConf;
this.systemConfigAtomicReference.set(new SystemConfig(celebornConf));
Expand All @@ -57,6 +60,7 @@ public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException {
() -> {
try {
refreshCache();
notifyListenersOnConfigUpdate();
} catch (Throwable e) {
LOG.error(
"Failed to refresh dynamic configs. Encounter exception: {}.", e.getMessage(), e);
Expand All @@ -65,6 +69,7 @@ public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException {
dynamicConfigRefreshInterval,
dynamicConfigRefreshInterval,
TimeUnit.MILLISECONDS);
this.listeners = new LinkedBlockingDeque<>();
}

@Override
Expand Down Expand Up @@ -101,4 +106,15 @@ public TenantConfig getRawTenantUserConfigFromCache(String tenantId, String user
public void shutdown() {
ThreadUtils.shutdown(configRefreshService);
}

@Override
public void registerListenerOnConfigUpdate(Runnable listener) {
listeners.add(listener);
}

private void notifyListenersOnConfigUpdate() {
for (Runnable listener : listeners) {
listener.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ default DynamicConfig getTenantUserConfigFromCache(String tenantId, String userI
*/
void refreshCache() throws IOException;

/**
* Registers a listener to be called when the configuration is updated.
*
* @param listener the listener to be registered
*/
void registerListenerOnConfigUpdate(Runnable listener);

/** Shutdowns configuration management service. */
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.quota.Quota;
import org.apache.celeborn.common.quota.UserTrafficQuota;
import org.apache.celeborn.common.quota.WorkerTrafficQuota;
import org.apache.celeborn.common.util.Utils;

/**
Expand Down Expand Up @@ -129,6 +131,44 @@ protected Quota currentQuota() {
ConfigType.STRING));
}

public UserTrafficQuota getUserTrafficQuota() {
return new UserTrafficQuota(
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK(),
Long.TYPE,
ConfigType.BYTES),
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK(),
Long.TYPE,
ConfigType.BYTES));
}

public WorkerTrafficQuota getWorkerTrafficQuota() {
return new WorkerTrafficQuota(
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK(),
Long.TYPE,
ConfigType.BYTES),
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK(),
Long.TYPE,
ConfigType.BYTES),
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK(),
Long.TYPE,
ConfigType.BYTES),
getValue(
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(),
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK(),
Long.TYPE,
ConfigType.BYTES));
}

public Map<String, String> getConfigs() {
return configs;
}
Expand Down
Loading