Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1313] Custom Network Location Aware Replication #2367

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ message PbRegisterWorker {
string requestId = 9;
map<string, PbResourceConsumption> userResourceConsumption = 8;
int32 internalPort = 10;
string networkLocation = 11;
}

message PbHeartbeatFromWorker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.hadoop.net.NetworkTopology

import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.StorageInfo
Expand All @@ -40,7 +42,7 @@ class WorkerInfo(
_diskInfos: util.Map[String, DiskInfo],
_userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption]) extends Serializable
with Logging {
var networkLocation = "/default-rack"
var networkLocation = NetworkTopology.DEFAULT_RACK
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
val diskInfos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.network
package org.apache.celeborn.common.network

import java.io.File
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object ControlMessages extends Logging {
fetchPort: Int,
replicatePort: Int,
internalPort: Int,
networkLocation: String,
disks: Map[String, DiskInfo],
userResourceConsumption: Map[UserIdentifier, ResourceConsumption],
requestId: String): PbRegisterWorker = {
Expand All @@ -100,6 +101,7 @@ object ControlMessages extends Logging {
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
.setInternalPort(internalPort)
.setNetworkLocation(networkLocation)
.addAllDisks(pbDisks)
.putAllUserResourceConsumption(pbUserResourceConsumption)
.setRequestId(requestId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.network
package org.apache.celeborn.common.network;

import java.io.{File, FileWriter}
import java.nio.charset.StandardCharsets
import java.util
import java.util.ArrayList
import java.util.concurrent.TimeUnit

import com.google.common.io.Files
Expand Down Expand Up @@ -62,7 +62,7 @@ class CelebornRackResolverSuite extends AnyFunSuite {
assertEquals("/rack1", resultMap(hostName1).getNetworkLocation)
assertEquals("/rack2", resultMap(hostName2).getNetworkLocation)

val hostNamesList = new util.ArrayList[String]()
val hostNamesList = new ArrayList[String]()
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
hostNamesList.add(hostName1)
hostNamesList.add(hostName2)
val resultMap2: Map[String, Node] = resolver.resolveToMap(hostNamesList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import scala.Option;

import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +50,7 @@
import org.apache.celeborn.common.meta.WorkerEventInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.network.CelebornRackResolver;
import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
import org.apache.celeborn.common.protocol.PbWorkerStatus;
import org.apache.celeborn.common.quota.ResourceConsumption;
Expand All @@ -57,7 +59,6 @@
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.util.WorkerStatusUtils;
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;

public abstract class AbstractMetaManager implements IMetadataHandler {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class);
Expand Down Expand Up @@ -225,6 +226,7 @@ public void updateRegisterWorkerMeta(
int fetchPort,
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption) {
WorkerInfo workerInfo =
Expand All @@ -238,7 +240,13 @@ public void updateRegisterWorkerMeta(
disks,
userResourceConsumption);
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
if (networkLocation != null
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
&& !networkLocation.isEmpty()
&& !networkLocation.equals(NetworkTopology.DEFAULT_RACK)) {
workerInfo.networkLocation_$eq(networkLocation);
} else {
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
}
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
synchronized (workers) {
if (!workers.contains(workerInfo)) {
Expand Down Expand Up @@ -330,8 +338,10 @@ public void restoreMetaFromFile(File file) throws IOException {
.peek(
workerInfo -> {
// Reset worker's network location with current master's configuration.
workerInfo.networkLocation_$eq(
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
if (workerInfo.networkLocation().equals(NetworkTopology.DEFAULT_RACK)) {
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
workerInfo.networkLocation_$eq(
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
akpatnam25 marked this conversation as resolved.
Show resolved Hide resolved
}
})
.collect(Collectors.toSet()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void handleRegisterWorker(
int fetchPort,
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.network.CelebornRackResolver;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;

public class SingleMasterMetaManager extends AbstractMetaManager {
private static final Logger LOG = LoggerFactory.getLogger(SingleMasterMetaManager.class);
Expand Down Expand Up @@ -136,6 +136,7 @@ public void handleRegisterWorker(
int fetchPort,
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId) {
Expand All @@ -146,6 +147,7 @@ public void handleRegisterWorker(
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.network.CelebornRackResolver;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
import org.apache.celeborn.service.deploy.master.clustermeta.MetaUtil;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Type;
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;

public class HAMasterMetaManager extends AbstractMetaManager {
private static final Logger LOG = LoggerFactory.getLogger(HAMasterMetaManager.class);
Expand Down Expand Up @@ -292,6 +292,7 @@ public void handleRegisterWorker(
int fetchPort,
int replicatePort,
int internalPort,
String networkLocation,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId) {
Expand All @@ -308,6 +309,7 @@ public void handleRegisterWorker(
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
.setInternalPort(internalPort)
.setNetworkLocation(networkLocation)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.putAllUserResourceConsumption(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
pushPort = request.getRegisterWorkerRequest().getPushPort();
fetchPort = request.getRegisterWorkerRequest().getFetchPort();
replicatePort = request.getRegisterWorkerRequest().getReplicatePort();
String networkLocation = request.getRegisterWorkerRequest().getNetworkLocation();
int internalPort = request.getRegisterWorkerRequest().getInternalPort();
diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
userResourceConsumption =
Expand All @@ -236,6 +237,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
fetchPort,
replicatePort,
internalPort,
networkLocation,
diskInfos,
userResourceConsumption);
break;
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ message RegisterWorkerRequest {
map<string, DiskInfo> disks = 6;
map<string, ResourceConsumption> userResourceConsumption = 7;
required int32 internalPort = 8;
optional string networkLocation = 9;
}

message ReportWorkerUnavailableRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.CelebornRackResolver
import org.apache.celeborn.common.network.protocol.TransportMessage
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode}
Expand All @@ -49,7 +50,6 @@ import org.apache.celeborn.common.util.{CelebornHadoopUtils, CollectionUtils, Ja
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver
import org.apache.celeborn.service.deploy.master.quota.QuotaManager

private[celeborn] class Master(
Expand Down Expand Up @@ -412,6 +412,7 @@ private[celeborn] class Master(
val fetchPort = pbRegisterWorker.getFetchPort
val replicatePort = pbRegisterWorker.getReplicatePort
val internalPort = pbRegisterWorker.getInternalPort
val networkLocation = pbRegisterWorker.getNetworkLocation
val disks = pbRegisterWorker.getDisksList.asScala
.map { pbDiskInfo => pbDiskInfo.getMountPoint -> PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) }
.toMap.asJava
Expand All @@ -430,6 +431,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
requestId))
Expand Down Expand Up @@ -712,6 +714,7 @@ private[celeborn] class Master(
fetchPort: Int,
replicatePort: Int,
internalPort: Int,
networkLocation: String,
disks: util.Map[String, DiskInfo],
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
requestId: String): Unit = {
Expand All @@ -738,6 +741,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
newRequestId)
Expand All @@ -753,6 +757,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
requestId)
Expand All @@ -765,6 +770,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
internalPort,
networkLocation,
disks,
userResourceConsumption,
requestId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.network.CelebornRackResolver;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;

public class SlotsAllocatorRackAwareSuiteJ {

Expand Down
Loading
Loading