From 49dbc4f31e184b0e3e5af5d6a1c0553734266586 Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Thu, 7 Mar 2024 14:50:03 -0800 Subject: [PATCH 1/5] [CELEBORN-1313] Custom Network Location Aware Replication --- common/src/main/proto/TransportMessages.proto | 1 + .../celeborn/common/meta/WorkerInfo.scala | 4 ++- .../network/CelebornRackResolver.scala | 2 +- .../protocol/message/ControlMessages.scala | 2 ++ .../network/CelebornRackResolverSuite.scala | 6 ++-- .../clustermeta/AbstractMetaManager.java | 16 ++++++--- .../master/clustermeta/IMetadataHandler.java | 1 + .../clustermeta/SingleMasterMetaManager.java | 4 ++- .../clustermeta/ha/HAMasterMetaManager.java | 4 ++- .../master/clustermeta/ha/MetaHandler.java | 2 ++ master/src/main/proto/Resource.proto | 1 + .../service/deploy/master/Master.scala | 8 ++++- .../master/SlotsAllocatorRackAwareSuiteJ.java | 2 +- .../clustermeta/DefaultMetaSystemSuiteJ.java | 29 ++++++++++++++++ .../ha/RatisMasterStatusSystemSuiteJ.java | 33 +++++++++++++++++++ .../service/deploy/worker/Worker.scala | 6 +++- 16 files changed, 107 insertions(+), 14 deletions(-) rename {master/src/main/scala/org/apache/celeborn/service/deploy/master => common/src/main/scala/org/apache/celeborn/common}/network/CelebornRackResolver.scala (99%) rename {master/src/test/scala/org/apache/celeborn/service/deploy/master => common/src/test/scala/org/apache/celeborn/common}/network/CelebornRackResolverSuite.scala (97%) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index e949aa9d4f..2d24431e27 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -183,6 +183,7 @@ message PbRegisterWorker { string requestId = 9; map userResourceConsumption = 8; int32 internalPort = 10; + string networkLocation = 11; } message PbHeartbeatFromWorker { diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 629417d255..3a1446be3f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import org.apache.hadoop.net.NetworkTopology + import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.StorageInfo @@ -40,7 +42,7 @@ class WorkerInfo( _diskInfos: util.Map[String, DiskInfo], _userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption]) extends Serializable with Logging { - var networkLocation = "/default-rack" + var networkLocation = NetworkTopology.DEFAULT_RACK var lastHeartbeat: Long = 0 var workerStatus = WorkerStatus.normalWorkerStatus() val diskInfos = diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala similarity index 99% rename from master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala rename to common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala index bd1af15261..f155f28e92 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala +++ b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.master.network +package org.apache.celeborn.common.network import java.io.File import java.util.concurrent.{ScheduledExecutorService, TimeUnit} diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 07573e6482..82946dfb7b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -87,6 +87,7 @@ object ControlMessages extends Logging { fetchPort: Int, replicatePort: Int, internalPort: Int, + networkLocation: String, disks: Map[String, DiskInfo], userResourceConsumption: Map[UserIdentifier, ResourceConsumption], requestId: String): PbRegisterWorker = { @@ -100,6 +101,7 @@ object ControlMessages extends Logging { .setFetchPort(fetchPort) .setReplicatePort(replicatePort) .setInternalPort(internalPort) + .setNetworkLocation(networkLocation) .addAllDisks(pbDisks) .putAllUserResourceConsumption(pbUserResourceConsumption) .setRequestId(requestId) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala similarity index 97% rename from master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala rename to common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala index 3bf3da56c3..4b46b26028 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.master.network +package org.apache.celeborn.common.network; import java.io.{File, FileWriter} import java.nio.charset.StandardCharsets -import java.util +import java.util.ArrayList import java.util.concurrent.TimeUnit import com.google.common.io.Files @@ -62,7 +62,7 @@ class CelebornRackResolverSuite extends AnyFunSuite { assertEquals("/rack1", resultMap(hostName1).getNetworkLocation) assertEquals("/rack2", resultMap(hostName2).getNetworkLocation) - val hostNamesList = new util.ArrayList[String]() + val hostNamesList = new ArrayList[String]() hostNamesList.add(hostName1) hostNamesList.add(hostName2) val resultMap2: Map[String, Node] = resolver.resolveToMap(hostNamesList) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 7957fd1aa8..f77428c280 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -35,6 +35,7 @@ import scala.Option; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,7 @@ import org.apache.celeborn.common.meta.WorkerEventInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; +import org.apache.celeborn.common.network.CelebornRackResolver; import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo; import org.apache.celeborn.common.protocol.PbWorkerStatus; import org.apache.celeborn.common.quota.ResourceConsumption; @@ -57,7 +59,6 @@ import org.apache.celeborn.common.util.PbSerDeUtils; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.util.WorkerStatusUtils; -import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver; public abstract class AbstractMetaManager implements IMetadataHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class); @@ -225,6 +226,7 @@ public void updateRegisterWorkerMeta( int fetchPort, int replicatePort, int internalPort, + String networkLocation, Map disks, Map userResourceConsumption) { WorkerInfo workerInfo = @@ -238,7 +240,11 @@ public void updateRegisterWorkerMeta( disks, userResourceConsumption); workerInfo.lastHeartbeat_$eq(System.currentTimeMillis()); - workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation()); + if (networkLocation != null + && !networkLocation.isEmpty() + && !networkLocation.equals(NetworkTopology.DEFAULT_RACK)) { + workerInfo.networkLocation_$eq(networkLocation); + } workerInfo.updateDiskMaxSlots(estimatedPartitionSize); synchronized (workers) { if (!workers.contains(workerInfo)) { @@ -330,8 +336,10 @@ public void restoreMetaFromFile(File file) throws IOException { .peek( workerInfo -> { // Reset worker's network location with current master's configuration. - workerInfo.networkLocation_$eq( - resolveMap.get(workerInfo.host()).get().getNetworkLocation()); + if (workerInfo.networkLocation().equals(NetworkTopology.DEFAULT_RACK)) { + workerInfo.networkLocation_$eq( + resolveMap.get(workerInfo.host()).get().getNetworkLocation()); + } }) .collect(Collectors.toSet())); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index ed9eeba5af..95257adc25 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -73,6 +73,7 @@ void handleRegisterWorker( int fetchPort, int replicatePort, int internalPort, + String networkLocation, Map disks, Map userResourceConsumption, String requestId); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index b4db36e2a4..8c2c73e53a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -30,9 +30,9 @@ import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; +import org.apache.celeborn.common.network.CelebornRackResolver; import org.apache.celeborn.common.quota.ResourceConsumption; import org.apache.celeborn.common.rpc.RpcEnv; -import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver; public class SingleMasterMetaManager extends AbstractMetaManager { private static final Logger LOG = LoggerFactory.getLogger(SingleMasterMetaManager.class); @@ -136,6 +136,7 @@ public void handleRegisterWorker( int fetchPort, int replicatePort, int internalPort, + String networkLocation, Map disks, Map userResourceConsumption, String requestId) { @@ -146,6 +147,7 @@ public void handleRegisterWorker( fetchPort, replicatePort, internalPort, + networkLocation, disks, userResourceConsumption); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index c3b18b3d70..00eb00b607 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -33,6 +33,7 @@ import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; +import org.apache.celeborn.common.network.CelebornRackResolver; import org.apache.celeborn.common.quota.ResourceConsumption; import org.apache.celeborn.common.rpc.RpcEnv; import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager; @@ -40,7 +41,6 @@ import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Type; -import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver; public class HAMasterMetaManager extends AbstractMetaManager { private static final Logger LOG = LoggerFactory.getLogger(HAMasterMetaManager.class); @@ -292,6 +292,7 @@ public void handleRegisterWorker( int fetchPort, int replicatePort, int internalPort, + String networkLocation, Map disks, Map userResourceConsumption, String requestId) { @@ -308,6 +309,7 @@ public void handleRegisterWorker( .setFetchPort(fetchPort) .setReplicatePort(replicatePort) .setInternalPort(internalPort) + .setNetworkLocation(networkLocation) .putAllDisks(MetaUtil.toPbDiskInfos(disks)) .putAllUserResourceConsumption( MetaUtil.toPbUserResourceConsumption(userResourceConsumption)) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 2cfb6a5bb9..948cca6ef0 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -214,6 +214,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques pushPort = request.getRegisterWorkerRequest().getPushPort(); fetchPort = request.getRegisterWorkerRequest().getFetchPort(); replicatePort = request.getRegisterWorkerRequest().getReplicatePort(); + String networkLocation = request.getRegisterWorkerRequest().getNetworkLocation(); int internalPort = request.getRegisterWorkerRequest().getInternalPort(); diskInfos = MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap()); userResourceConsumption = @@ -236,6 +237,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques fetchPort, replicatePort, internalPort, + networkLocation, diskInfos, userResourceConsumption); break; diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 6c630646c4..660a30cb9f 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -173,6 +173,7 @@ message RegisterWorkerRequest { map disks = 6; map userResourceConsumption = 7; required int32 internalPort = 8; + optional string networkLocation = 9; } message ReportWorkerUnavailableRequest { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 7af28474ac..b149573e4c 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -38,6 +38,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource} +import org.apache.celeborn.common.network.CelebornRackResolver import org.apache.celeborn.common.network.protocol.TransportMessage import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} @@ -49,7 +50,6 @@ import org.apache.celeborn.common.util.{CelebornHadoopUtils, CollectionUtils, Ja import org.apache.celeborn.server.common.{HttpService, Service} import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} -import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver import org.apache.celeborn.service.deploy.master.quota.QuotaManager private[celeborn] class Master( @@ -412,6 +412,7 @@ private[celeborn] class Master( val fetchPort = pbRegisterWorker.getFetchPort val replicatePort = pbRegisterWorker.getReplicatePort val internalPort = pbRegisterWorker.getInternalPort + val networkLocation = pbRegisterWorker.getNetworkLocation val disks = pbRegisterWorker.getDisksList.asScala .map { pbDiskInfo => pbDiskInfo.getMountPoint -> PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) } .toMap.asJava @@ -430,6 +431,7 @@ private[celeborn] class Master( fetchPort, replicatePort, internalPort, + networkLocation, disks, userResourceConsumption, requestId)) @@ -712,6 +714,7 @@ private[celeborn] class Master( fetchPort: Int, replicatePort: Int, internalPort: Int, + networkLocation: String, disks: util.Map[String, DiskInfo], userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption], requestId: String): Unit = { @@ -738,6 +741,7 @@ private[celeborn] class Master( fetchPort, replicatePort, internalPort, + networkLocation, disks, userResourceConsumption, newRequestId) @@ -753,6 +757,7 @@ private[celeborn] class Master( fetchPort, replicatePort, internalPort, + networkLocation, disks, userResourceConsumption, requestId) @@ -765,6 +770,7 @@ private[celeborn] class Master( fetchPort, replicatePort, internalPort, + networkLocation, disks, userResourceConsumption, requestId) diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java index 7af8953f29..6c1627fff1 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java @@ -42,9 +42,9 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; +import org.apache.celeborn.common.network.CelebornRackResolver; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.protocol.StorageInfo; -import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver; public class SlotsAllocatorRackAwareSuiteJ { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 45b059da5f..081c1b39e9 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -59,6 +59,7 @@ public class DefaultMetaSystemSuiteJ { private static final int FETCHPORT1 = 1113; private static final int REPLICATEPORT1 = 1114; private static final int INTERNALPORT1 = 1115; + private static final String NETWORK_LOCATION1 = "networkLocation1"; private static final Map disks1 = new HashMap<>(); private static final Map userResourceConsumption1 = new HashMap<>(); @@ -69,6 +70,7 @@ public class DefaultMetaSystemSuiteJ { private static final int FETCHPORT2 = 2113; private static final int REPLICATEPORT2 = 2114; private static final int INTERNALPORT2 = 2115; + private static final String NETWORK_LOCATION2 = "networkLocation2"; private static final Map disks2 = new HashMap<>(); private static final Map userResourceConsumption2 = new HashMap<>(); @@ -79,6 +81,7 @@ public class DefaultMetaSystemSuiteJ { private static final int FETCHPORT3 = 3113; private static final int REPLICATEPORT3 = 3114; private static final int INTERNALPORT3 = 3115; + private static final String NETWORK_LOCATION3 = "networkLocation3"; private static final Map disks3 = new HashMap<>(); private static final Map userResourceConsumption3 = new HashMap<>(); @@ -126,6 +129,7 @@ public void testHandleRegisterWorker() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -136,6 +140,7 @@ public void testHandleRegisterWorker() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -146,6 +151,7 @@ public void testHandleRegisterWorker() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -183,6 +189,7 @@ public void testHandleWorkerExclude() { workerInfo1.fetchPort(), workerInfo1.replicatePort(), workerInfo1.internalPort(), + workerInfo1.networkLocation(), workerInfo1.diskInfos(), workerInfo1.userResourceConsumption(), getNewReqeustId()); @@ -193,6 +200,7 @@ public void testHandleWorkerExclude() { workerInfo2.fetchPort(), workerInfo2.replicatePort(), workerInfo2.internalPort(), + workerInfo2.networkLocation(), workerInfo2.diskInfos(), workerInfo2.userResourceConsumption(), getNewReqeustId()); @@ -215,6 +223,7 @@ public void testHandleWorkerLost() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -225,6 +234,7 @@ public void testHandleWorkerLost() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -235,6 +245,7 @@ public void testHandleWorkerLost() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -257,6 +268,7 @@ public void testHandleRequestSlots() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -267,6 +279,7 @@ public void testHandleRequestSlots() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -277,6 +290,7 @@ public void testHandleRequestSlots() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -334,6 +348,7 @@ public void testHandleReleaseSlots() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -344,6 +359,7 @@ public void testHandleReleaseSlots() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -354,6 +370,7 @@ public void testHandleReleaseSlots() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -397,6 +414,7 @@ public void testHandleAppLost() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -407,6 +425,7 @@ public void testHandleAppLost() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -417,6 +436,7 @@ public void testHandleAppLost() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -468,6 +488,7 @@ public void testHandleUnRegisterShuffle() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -478,6 +499,7 @@ public void testHandleUnRegisterShuffle() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -488,6 +510,7 @@ public void testHandleUnRegisterShuffle() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -550,6 +573,7 @@ public void testHandleWorkerHeartbeat() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, new HashMap<>(), userResourceConsumption1, getNewReqeustId()); @@ -560,6 +584,7 @@ public void testHandleWorkerHeartbeat() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, new HashMap<>(), userResourceConsumption2, getNewReqeustId()); @@ -570,6 +595,7 @@ public void testHandleWorkerHeartbeat() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, new HashMap<>(), userResourceConsumption3, getNewReqeustId()); @@ -648,6 +674,7 @@ public void testHandleReportWorkerFailure() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -658,6 +685,7 @@ public void testHandleReportWorkerFailure() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -668,6 +696,7 @@ public void testHandleReportWorkerFailure() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index e42d3aac9a..772845ec0a 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -187,6 +187,7 @@ public void testLeaderAvaiable() { private static final int FETCHPORT1 = 1113; private static final int REPLICATEPORT1 = 1114; private static final int INTERNALPORT1 = 1115; + private static final String NETWORK_LOCATION1 = "networkLocation1"; private static final Map disks1 = new HashMap<>(); private static final Map userResourceConsumption1 = new HashMap<>(); @@ -197,6 +198,7 @@ public void testLeaderAvaiable() { private static final int FETCHPORT2 = 2113; private static final int REPLICATEPORT2 = 2114; private static final int INTERNALPORT2 = 2115; + private static final String NETWORK_LOCATION2 = "networkLocation2"; private static final Map disks2 = new HashMap<>(); private static final Map userResourceConsumption2 = new HashMap<>(); @@ -207,6 +209,7 @@ public void testLeaderAvaiable() { private static final int FETCHPORT3 = 3113; private static final int REPLICATEPORT3 = 3114; private static final int INTERNALPORT3 = 3115; + private static final String NETWORK_LOCATION3 = "networkLocation3"; private static final Map disks3 = new HashMap<>(); private static final Map userResourceConsumption3 = new HashMap<>(); @@ -255,6 +258,7 @@ public void testRaftSystemException() throws Exception { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -278,6 +282,7 @@ public void testHandleRegisterWorker() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -288,6 +293,7 @@ public void testHandleRegisterWorker() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -298,6 +304,7 @@ public void testHandleRegisterWorker() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -342,6 +349,7 @@ public void testHandleWorkerExclude() throws InterruptedException { workerInfo1.fetchPort(), workerInfo1.replicatePort(), workerInfo1.internalPort(), + workerInfo1.networkLocation(), workerInfo1.diskInfos(), workerInfo1.userResourceConsumption(), getNewReqeustId()); @@ -352,6 +360,7 @@ public void testHandleWorkerExclude() throws InterruptedException { workerInfo2.fetchPort(), workerInfo2.replicatePort(), workerInfo2.internalPort(), + workerInfo2.networkLocation(), workerInfo2.diskInfos(), workerInfo2.userResourceConsumption(), getNewReqeustId()); @@ -385,6 +394,7 @@ public void testHandleWorkerLost() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -395,6 +405,7 @@ public void testHandleWorkerLost() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -405,6 +416,7 @@ public void testHandleWorkerLost() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -430,6 +442,7 @@ public void testHandleRequestSlots() { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -440,6 +453,7 @@ public void testHandleRequestSlots() { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -450,6 +464,7 @@ public void testHandleRequestSlots() { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -535,6 +550,7 @@ public void testHandleReleaseSlots() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -545,6 +561,7 @@ public void testHandleReleaseSlots() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION3, disks2, userResourceConsumption2, getNewReqeustId()); @@ -555,6 +572,7 @@ public void testHandleReleaseSlots() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -622,6 +640,7 @@ public void testHandleAppLost() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -632,6 +651,7 @@ public void testHandleAppLost() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -642,6 +662,7 @@ public void testHandleAppLost() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -700,6 +721,7 @@ public void testHandleUnRegisterShuffle() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -710,6 +732,7 @@ public void testHandleUnRegisterShuffle() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -720,6 +743,7 @@ public void testHandleUnRegisterShuffle() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -803,6 +827,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -813,6 +838,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -823,6 +849,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -960,6 +987,7 @@ public void testHandleReportWorkerFailure() throws InterruptedException { FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -970,6 +998,7 @@ public void testHandleReportWorkerFailure() throws InterruptedException { FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -980,6 +1009,7 @@ public void testHandleReportWorkerFailure() throws InterruptedException { FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); @@ -1018,6 +1048,7 @@ public void testHandleRemoveWorkersUnavailableInfo() throws InterruptedException FETCHPORT1, REPLICATEPORT1, INTERNALPORT1, + NETWORK_LOCATION1, disks1, userResourceConsumption1, getNewReqeustId()); @@ -1028,6 +1059,7 @@ public void testHandleRemoveWorkersUnavailableInfo() throws InterruptedException FETCHPORT2, REPLICATEPORT2, INTERNALPORT2, + NETWORK_LOCATION2, disks2, userResourceConsumption2, getNewReqeustId()); @@ -1038,6 +1070,7 @@ public void testHandleRemoveWorkersUnavailableInfo() throws InterruptedException FETCHPORT3, REPLICATEPORT3, INTERNALPORT3, + NETWORK_LOCATION3, disks3, userResourceConsumption3, getNewReqeustId()); diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 5743d27884..54d6ee9aac 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLocationInfo} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource} -import org.apache.celeborn.common.network.TransportContext +import org.apache.celeborn.common.network.{CelebornRackResolver, TransportContext} import org.apache.celeborn.common.network.sasl.SaslServerBootstrap import org.apache.celeborn.common.network.sasl.SecretRegistryImpl import org.apache.celeborn.common.network.server.TransportServerBootstrap @@ -293,6 +293,9 @@ private[celeborn] class Worker( private val masterClient = new MasterClient(internalRpcEnvInUse, conf, true) + private val rackResolver = new CelebornRackResolver(conf) + private val networkLocation = rackResolver.resolve(host).getNetworkLocation + // (workerInfo -> last connect timeout timestamp) val unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]() @@ -563,6 +566,7 @@ private[celeborn] class Worker( fetchPort, replicatePort, internalPort, + networkLocation, // Use WorkerInfo's diskInfo since re-register when heartbeat return not-registered, // StorageManager have update the disk info. workerInfo.diskInfos.asScala.toMap, From ac2d651979ea3bc48ca5651e754dbc8e2f77d891 Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Thu, 7 Mar 2024 14:57:16 -0800 Subject: [PATCH 2/5] add else clause --- .../service/deploy/master/clustermeta/AbstractMetaManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index f77428c280..86209afb14 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -244,6 +244,8 @@ public void updateRegisterWorkerMeta( && !networkLocation.isEmpty() && !networkLocation.equals(NetworkTopology.DEFAULT_RACK)) { workerInfo.networkLocation_$eq(networkLocation); + } else { + workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation()); } workerInfo.updateDiskMaxSlots(estimatedPartitionSize); synchronized (workers) { From 2ceb79842d9b6950cc39c4d8238e3c28440ee4ba Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Tue, 12 Mar 2024 13:22:31 -0700 Subject: [PATCH 3/5] address review comments --- .../deploy/master/clustermeta/AbstractMetaManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 86209afb14..96d78ef6ef 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -242,7 +242,7 @@ public void updateRegisterWorkerMeta( workerInfo.lastHeartbeat_$eq(System.currentTimeMillis()); if (networkLocation != null && !networkLocation.isEmpty() - && !networkLocation.equals(NetworkTopology.DEFAULT_RACK)) { + && !NetworkTopology.DEFAULT_RACK.equals(networkLocation)) { workerInfo.networkLocation_$eq(networkLocation); } else { workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation()); @@ -329,8 +329,9 @@ public void restoreMetaFromFile(File file) throws IOException { snapshotMetaInfo.getWorkersList().stream() .map(PbSerDeUtils::fromPbWorkerInfo) .collect(Collectors.toSet()); - List workerHostList = - workerInfoSet.stream().map(WorkerInfo::host).collect(Collectors.toList()); + List workerHostList = workerInfoSet.stream() + .filter(w -> NetworkTopology.DEFAULT_RACK.equals(w.networkLocation())) + .map(WorkerInfo::host).collect(Collectors.toList()); scala.collection.immutable.Map resolveMap = rackResolver.resolveToMap(workerHostList); workers.addAll( From f87c95bed4cbbd5330e9a03d8499ec2e4cde8760 Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Tue, 12 Mar 2024 13:23:46 -0700 Subject: [PATCH 4/5] update equals --- .../service/deploy/master/clustermeta/AbstractMetaManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 96d78ef6ef..372aed378b 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -339,7 +339,7 @@ public void restoreMetaFromFile(File file) throws IOException { .peek( workerInfo -> { // Reset worker's network location with current master's configuration. - if (workerInfo.networkLocation().equals(NetworkTopology.DEFAULT_RACK)) { + if (NetworkTopology.DEFAULT_RACK.equals(workerInfo.networkLocation())) { workerInfo.networkLocation_$eq( resolveMap.get(workerInfo.host()).get().getNetworkLocation()); } From 2d070a7b357651bd6672482c71569552df1100b5 Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Tue, 12 Mar 2024 13:24:48 -0700 Subject: [PATCH 5/5] run spotless --- .../deploy/master/clustermeta/AbstractMetaManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 372aed378b..b566ed0aae 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -329,9 +329,11 @@ public void restoreMetaFromFile(File file) throws IOException { snapshotMetaInfo.getWorkersList().stream() .map(PbSerDeUtils::fromPbWorkerInfo) .collect(Collectors.toSet()); - List workerHostList = workerInfoSet.stream() + List workerHostList = + workerInfoSet.stream() .filter(w -> NetworkTopology.DEFAULT_RACK.equals(w.networkLocation())) - .map(WorkerInfo::host).collect(Collectors.toList()); + .map(WorkerInfo::host) + .collect(Collectors.toList()); scala.collection.immutable.Map resolveMap = rackResolver.resolveToMap(workerHostList); workers.addAll(