diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 92d28447d6..67de0ca690 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -173,7 +173,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (authEnabled) { logInfo(s"Authentication is enabled; setting up master and worker RPC environments") val appSecret = createSecret() - applicationMeta = ApplicationMeta(appUniqueId, appSecret) + applicationMeta = ApplicationMeta(appUniqueId, appSecret, userIdentifier) val registrationInfo = new RegistrationInfo() masterRpcEnvInUse = RpcEnv.create( @@ -192,6 +192,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends 0, conf, createRpcSecurityContext(appSecret)) + } else { + applicationMeta = ApplicationMeta(appUniqueId, "", userIdentifier) } private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false) @@ -207,6 +209,12 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private val changePartitionManager = new ChangePartitionManager(conf, this) private val releasePartitionManager = new ReleasePartitionManager(conf, this) + private def updateApplicationMeta(): Unit = { + Utils.tryLogNonFatalError(masterClient.askSync[PbApplicationMetaUpdateResponse]( + PbSerDeUtils.toPbApplicationMeta(applicationMeta), + classOf[PbApplicationMetaUpdateResponse])) + } + // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and // `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains // a reference to `masterClient`, there may be cases where `masterClient` is null when @@ -214,6 +222,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // method at the end of the construction of the class to perform the initialization operations. private def initialize(): Unit = { // noinspection ConvertExpressionToSAM + updateApplicationMeta() commitManager.start() heartbeater.start() changePartitionManager.start() diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 7f190a3bbb..d1c5030152 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -102,6 +102,7 @@ enum MessageType { APPLICATION_META_REQUEST = 79; BATCH_OPEN_STREAM = 80; BATCH_OPEN_STREAM_RESPONSE = 81; + APPLICATION_META_UPDATE_RESPONSE = 82; } enum StreamType { @@ -753,8 +754,13 @@ message PbRegisterApplicationResponse { message PbApplicationMeta { string appId = 1; string secret = 2; + PbUserIdentifier userIdentifier = 3; } message PbApplicationMetaRequest { string appId = 1; } + +message PbApplicationMetaUpdateResponse { + bool status = 1; +} diff --git a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala index cac8b258ae..3a542e7844 100644 --- a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala +++ b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala @@ -44,6 +44,9 @@ case class UserIdentifier(tenantId: String, name: String) { object UserIdentifier extends Logging { val USER_IDENTIFIER = "^\\`(.+)\\`\\.\\`(.+)\\`$".r + val UNKNOWN_USER_IDENTIFIER = + UserIdentifier(IdentityProvider.DEFAULT_USERNAME, IdentityProvider.DEFAULT_TENANT_ID) + def apply(userIdentifier: String): UserIdentifier = { if (USER_IDENTIFIER.findPrefixOf(userIdentifier).isDefined) { val USER_IDENTIFIER(tenantId, name) = userIdentifier diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala index 6911d5bb57..8a8083a42b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala @@ -17,7 +17,15 @@ package org.apache.celeborn.common.meta +import org.apache.celeborn.common.identity.UserIdentifier + /** * Application meta */ -case class ApplicationMeta(appId: String, secret: String) +case class ApplicationMeta( + appId: String, + secret: String, + userIdentifier: UserIdentifier = UserIdentifier.UNKNOWN_USER_IDENTIFIER) { + def this(appId: String, secret: String) = + this(appId, secret, UserIdentifier.UNKNOWN_USER_IDENTIFIER) +} diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 82946dfb7b..150cc4dc31 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -891,6 +891,12 @@ object ControlMessages extends Logging { case pb: PbCheckWorkersAvailableResponse => new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE, pb.toByteArray) + + case pb: PbApplicationMeta => + new TransportMessage(MessageType.APPLICATION_META, pb.toByteArray) + + case pb: PbApplicationMetaUpdateResponse => + new TransportMessage(MessageType.APPLICATION_META_UPDATE_RESPONSE, pb.toByteArray) } // TODO change return type to GeneratedMessageV3 @@ -1238,6 +1244,12 @@ object ControlMessages extends Logging { case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE => PbCheckWorkersAvailableResponse.parseFrom(message.getPayload) + + case APPLICATION_META_VALUE => + PbApplicationMeta.parseFrom(message.getPayload) + + case APPLICATION_META_UPDATE_RESPONSE_VALUE => + PbApplicationMetaUpdateResponse.parseFrom(message.getPayload) } } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 1ca5759d2f..0db0d44f36 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -460,11 +460,16 @@ object PbSerDeUtils { def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = { PbApplicationMeta.newBuilder() .setAppId(meta.appId) - .setSecret(meta.secret).build() + .setSecret(meta.secret) + .setUserIdentifier(Option(meta.userIdentifier).map(toPbUserIdentifier).orNull).build() } def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = { - new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret) + new ApplicationMeta( + pbApplicationMeta.getAppId, + pbApplicationMeta.getSecret, + Option(pbApplicationMeta.getUserIdentifier).map(fromPbUserIdentifier).getOrElse( + UserIdentifier.UNKNOWN_USER_IDENTIFIER)) } def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index c1cd37b284..9444eb9131 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -35,6 +35,7 @@ import scala.Option; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.slf4j.Logger; @@ -472,6 +473,14 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { } public void updateApplicationMeta(ApplicationMeta applicationMeta) { - applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); + ApplicationMeta existing = applicationMetas.get(applicationMeta.appId()); + if (null == existing) { + applicationMetas.put(applicationMeta.appId(), applicationMeta); + } else if (StringUtils.isNotBlank(existing.secret()) + && existing.secret().equals(applicationMeta.secret())) { + applicationMetas.put( + applicationMeta.appId(), + existing.copy(existing.appId(), existing.secret(), applicationMeta.userIdentifier())); + } } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 00eb00b607..7eb08663b8 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -390,6 +390,11 @@ public void handleApplicationMeta(ApplicationMeta applicationMeta) { ResourceProtos.ApplicationMetaRequest.newBuilder() .setAppId(applicationMeta.appId()) .setSecret(applicationMeta.secret()) + .setUserIdentifier( + ResourceProtos.UserIdentifier.newBuilder() + .setTenantId(applicationMeta.userIdentifier().tenantId()) + .setName(applicationMeta.userIdentifier().name()) + .build()) .build()) .build()); } catch (CelebornRuntimeException e) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 948cca6ef0..8f116f4068 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -273,7 +273,11 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques case ApplicationMeta: appId = request.getApplicationMetaRequest().getAppId(); String secret = request.getApplicationMetaRequest().getSecret(); - metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret)); + UserIdentifier userIdentifier = + new UserIdentifier( + request.getApplicationMetaRequest().getUserIdentifier().getTenantId(), + request.getApplicationMetaRequest().getUserIdentifier().getName()); + metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret, userIdentifier)); break; default: diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 660a30cb9f..e46626f854 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -229,4 +229,5 @@ message ResourceResponse { message ApplicationMetaRequest { required string appId = 1; optional string secret = 2; + required UserIdentifier userIdentifier = 3; } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 4942dac871..0d1358b50d 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -36,7 +36,7 @@ import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{ApplicationMeta, DiskInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.network.CelebornRackResolver @@ -49,7 +49,7 @@ import org.apache.celeborn.common.rpc._ import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, ServerSaslContextBuilder} import org.apache.celeborn.common.util.{CelebornHadoopUtils, CollectionUtils, JavaUtils, PbSerDeUtils, SignalUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.{HttpService, Service} -import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager +import org.apache.celeborn.service.deploy.master.clustermeta.{ResourceProtos, SingleMasterMetaManager} import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} import org.apache.celeborn.service.deploy.master.quota.QuotaManager @@ -543,6 +543,9 @@ private[celeborn] class Master( case pb: PbApplicationMetaRequest => executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb)) + + case pb: PbApplicationMeta => + executeWithLeaderChecker(context, handleApplicationMetaUpdate(context, pb)) } private def timeoutDeadWorkers(): Unit = { @@ -1132,6 +1135,23 @@ private[celeborn] class Master( } } + private def handleApplicationMetaUpdate( + context: RpcCallContext, + pb: PbApplicationMeta): Unit = { + statusSystem.handleApplicationMeta(ApplicationMeta( + pb.getAppId, + pb.getSecret, + Option(pb.getUserIdentifier).map(PbSerDeUtils.fromPbUserIdentifier).getOrElse( + UserIdentifier.UNKNOWN_USER_IDENTIFIER))) + val pbApplicationMetaUpdateResponse = PbApplicationMetaUpdateResponse.newBuilder() + .setStatus(true).build() + val transportMessage = + new TransportMessage( + MessageType.APPLICATION_META_UPDATE_RESPONSE, + pbApplicationMetaUpdateResponse.toByteArray) + context.reply(transportMessage) + } + override def getMasterGroupInfo: String = { val sb = new StringBuilder sb.append("====================== Master Group INFO ==============================\n") @@ -1226,7 +1246,9 @@ private[celeborn] class Master( val sb = new StringBuilder sb.append("================= LifecycleManager Application List ======================\n") statusSystem.appHeartbeatTime.asScala.toSeq.sortBy(_._2).foreach { case (appId, time) => - sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time)}\n") + val userIdentifier = Option(statusSystem.applicationMetas.get(appId)).map(_.userIdentifier) + .getOrElse(UserIdentifier.UNKNOWN_USER_IDENTIFIER) + sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time).padTo(40, " ")}$userIdentifier\n") } sb.toString() } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 64fae0b9bd..b026c0844a 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -763,5 +763,20 @@ public void testHandleApplicationMeta() { statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2)); assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret()); assertEquals(2, statusSystem.applicationMetas.size()); + assertTrue( + statusSystem.applicationMetas.get(appId2).userIdentifier() + == UserIdentifier.UNKNOWN_USER_IDENTIFIER()); + + UserIdentifier userIdentifier = new UserIdentifier("celeborn", "celeborn"); + statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2, userIdentifier)); + assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret()); + assertEquals(2, statusSystem.applicationMetas.size()); + assertTrue(statusSystem.applicationMetas.get(appId2).userIdentifier() == userIdentifier); + + String appId3 = "app03"; + statusSystem.handleApplicationMeta(new ApplicationMeta(appId3, "", userIdentifier)); + assertEquals("", statusSystem.applicationMetas.get(appId3).secret()); + assertEquals(3, statusSystem.applicationMetas.size()); + assertTrue(statusSystem.applicationMetas.get(appId3).userIdentifier() == userIdentifier); } } diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerApplicationMetaSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerApplicationMetaSuite.scala new file mode 100644 index 0000000000..c296da88e4 --- /dev/null +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerApplicationMetaSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.tests.client + +import java.util + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.client.LifecycleManager +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.identity.UserIdentifier +import org.apache.celeborn.service.deploy.MiniClusterFeature + +class LifecycleManagerApplicationMetaSuite extends CelebornFunSuite with MiniClusterFeature { + protected var celebornConf: CelebornConf = _ + + private val APP = "app-" + System.currentTimeMillis() + private val userIdentifier = UserIdentifier("test", "celeborn") + + override protected def beforeAll(): Unit = { + super.beforeAll() + System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_TENANT.key, userIdentifier.tenantId) + System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_USERNAME.key, userIdentifier.name) + celebornConf = new CelebornConf() + val (master, _) = setupMiniClusterWithRandomPorts() + logInfo(s"master address is: ${master.conf.get(CelebornConf.MASTER_ENDPOINTS.key)}") + celebornConf.set( + CelebornConf.MASTER_ENDPOINTS.key, + master.conf.get(CelebornConf.MASTER_ENDPOINTS.key)) + } + + test("application meta") { + val lifecycleManager: LifecycleManager = new LifecycleManager(APP, celebornConf) + + val arrayList = new util.ArrayList[Integer]() + (0 to 10).foreach(i => { + arrayList.add(i) + }) + + lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList) + + assert(masterInfo._1.getApplicationList.contains(userIdentifier.toString)) + } + + override def afterAll(): Unit = { + logInfo("all test complete , stop celeborn mini cluster") + shutdownMiniCluster() + } +}