Skip to content

Commit

Permalink
app meta
Browse files Browse the repository at this point in the history
flatten

fix ut

catch error for app meta update

address comments

deser user identifier
  • Loading branch information
turboFei committed Mar 25, 2024
1 parent fc23800 commit a9a4761
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (authEnabled) {
logInfo(s"Authentication is enabled; setting up master and worker RPC environments")
val appSecret = createSecret()
applicationMeta = ApplicationMeta(appUniqueId, appSecret)
applicationMeta = ApplicationMeta(appUniqueId, appSecret, userIdentifier)
val registrationInfo = new RegistrationInfo()
masterRpcEnvInUse =
RpcEnv.create(
Expand All @@ -192,6 +192,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
0,
conf,
createRpcSecurityContext(appSecret))
} else {
applicationMeta = ApplicationMeta(appUniqueId, "", userIdentifier)
}

private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false)
Expand All @@ -207,13 +209,20 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)

private def updateApplicationMeta(): Unit = {
Utils.tryLogNonFatalError(masterClient.askSync[PbApplicationMetaUpdateResponse](
PbSerDeUtils.toPbApplicationMeta(applicationMeta),
classOf[PbApplicationMetaUpdateResponse]))
}

// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is null when
// `masterClient` is called. Therefore, it's necessary to uniformly execute the initialization
// method at the end of the construction of the class to perform the initialization operations.
private def initialize(): Unit = {
// noinspection ConvertExpressionToSAM
updateApplicationMeta()
commitManager.start()
heartbeater.start()
changePartitionManager.start()
Expand Down
6 changes: 6 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ enum MessageType {
APPLICATION_META_REQUEST = 79;
BATCH_OPEN_STREAM = 80;
BATCH_OPEN_STREAM_RESPONSE = 81;
APPLICATION_META_UPDATE_RESPONSE = 82;
}

enum StreamType {
Expand Down Expand Up @@ -753,8 +754,13 @@ message PbRegisterApplicationResponse {
message PbApplicationMeta {
string appId = 1;
string secret = 2;
PbUserIdentifier userIdentifier = 3;
}

message PbApplicationMetaRequest {
string appId = 1;
}

message PbApplicationMetaUpdateResponse {
bool status = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ case class UserIdentifier(tenantId: String, name: String) {
object UserIdentifier extends Logging {
val USER_IDENTIFIER = "^\\`(.+)\\`\\.\\`(.+)\\`$".r

val UNKNOWN_USER_IDENTIFIER =
UserIdentifier(IdentityProvider.DEFAULT_USERNAME, IdentityProvider.DEFAULT_TENANT_ID)

def apply(userIdentifier: String): UserIdentifier = {
if (USER_IDENTIFIER.findPrefixOf(userIdentifier).isDefined) {
val USER_IDENTIFIER(tenantId, name) = userIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@

package org.apache.celeborn.common.meta

import org.apache.celeborn.common.identity.UserIdentifier

/**
* Application meta
*/
case class ApplicationMeta(appId: String, secret: String)
case class ApplicationMeta(
appId: String,
secret: String,
userIdentifier: UserIdentifier = UserIdentifier.UNKNOWN_USER_IDENTIFIER) {
def this(appId: String, secret: String) =
this(appId, secret, UserIdentifier.UNKNOWN_USER_IDENTIFIER)
}
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,12 @@ object ControlMessages extends Logging {

case pb: PbCheckWorkersAvailableResponse =>
new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE, pb.toByteArray)

case pb: PbApplicationMeta =>
new TransportMessage(MessageType.APPLICATION_META, pb.toByteArray)

case pb: PbApplicationMetaUpdateResponse =>
new TransportMessage(MessageType.APPLICATION_META_UPDATE_RESPONSE, pb.toByteArray)
}

// TODO change return type to GeneratedMessageV3
Expand Down Expand Up @@ -1238,6 +1244,12 @@ object ControlMessages extends Logging {

case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)

case APPLICATION_META_VALUE =>
PbApplicationMeta.parseFrom(message.getPayload)

case APPLICATION_META_UPDATE_RESPONSE_VALUE =>
PbApplicationMetaUpdateResponse.parseFrom(message.getPayload)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,16 @@ object PbSerDeUtils {
def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = {
PbApplicationMeta.newBuilder()
.setAppId(meta.appId)
.setSecret(meta.secret).build()
.setSecret(meta.secret)
.setUserIdentifier(Option(meta.userIdentifier).map(toPbUserIdentifier).orNull).build()
}

def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = {
new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
new ApplicationMeta(
pbApplicationMeta.getAppId,
pbApplicationMeta.getSecret,
Option(pbApplicationMeta.getUserIdentifier).map(fromPbUserIdentifier).getOrElse(
UserIdentifier.UNKNOWN_USER_IDENTIFIER))
}

def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import scala.Option;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
Expand Down Expand Up @@ -472,6 +473,14 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) {
}

public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
ApplicationMeta existing = applicationMetas.get(applicationMeta.appId());
if (null == existing) {
applicationMetas.put(applicationMeta.appId(), applicationMeta);
} else if (StringUtils.isNotBlank(existing.secret())
&& existing.secret().equals(applicationMeta.secret())) {
applicationMetas.put(
applicationMeta.appId(),
existing.copy(existing.appId(), existing.secret(), applicationMeta.userIdentifier()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ public void handleApplicationMeta(ApplicationMeta applicationMeta) {
ResourceProtos.ApplicationMetaRequest.newBuilder()
.setAppId(applicationMeta.appId())
.setSecret(applicationMeta.secret())
.setUserIdentifier(
ResourceProtos.UserIdentifier.newBuilder()
.setTenantId(applicationMeta.userIdentifier().tenantId())
.setName(applicationMeta.userIdentifier().name())
.build())
.build())
.build());
} catch (CelebornRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
case ApplicationMeta:
appId = request.getApplicationMetaRequest().getAppId();
String secret = request.getApplicationMetaRequest().getSecret();
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
UserIdentifier userIdentifier =
new UserIdentifier(
request.getApplicationMetaRequest().getUserIdentifier().getTenantId(),
request.getApplicationMetaRequest().getUserIdentifier().getName());
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret, userIdentifier));
break;

default:
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,5 @@ message ResourceResponse {
message ApplicationMetaRequest {
required string appId = 1;
optional string secret = 2;
required UserIdentifier userIdentifier = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationMeta, DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.CelebornRackResolver
Expand All @@ -49,7 +49,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornHadoopUtils, CollectionUtils, JavaUtils, PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.{ResourceProtos, SingleMasterMetaManager}
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager

Expand Down Expand Up @@ -543,6 +543,9 @@ private[celeborn] class Master(

case pb: PbApplicationMetaRequest =>
executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb))

case pb: PbApplicationMeta =>
executeWithLeaderChecker(context, handleApplicationMetaUpdate(context, pb))
}

private def timeoutDeadWorkers(): Unit = {
Expand Down Expand Up @@ -1132,6 +1135,23 @@ private[celeborn] class Master(
}
}

private def handleApplicationMetaUpdate(
context: RpcCallContext,
pb: PbApplicationMeta): Unit = {
statusSystem.handleApplicationMeta(ApplicationMeta(
pb.getAppId,
pb.getSecret,
Option(pb.getUserIdentifier).map(PbSerDeUtils.fromPbUserIdentifier).getOrElse(
UserIdentifier.UNKNOWN_USER_IDENTIFIER)))
val pbApplicationMetaUpdateResponse = PbApplicationMetaUpdateResponse.newBuilder()
.setStatus(true).build()
val transportMessage =
new TransportMessage(
MessageType.APPLICATION_META_UPDATE_RESPONSE,
pbApplicationMetaUpdateResponse.toByteArray)
context.reply(transportMessage)
}

override def getMasterGroupInfo: String = {
val sb = new StringBuilder
sb.append("====================== Master Group INFO ==============================\n")
Expand Down Expand Up @@ -1226,7 +1246,9 @@ private[celeborn] class Master(
val sb = new StringBuilder
sb.append("================= LifecycleManager Application List ======================\n")
statusSystem.appHeartbeatTime.asScala.toSeq.sortBy(_._2).foreach { case (appId, time) =>
sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time)}\n")
val userIdentifier = Option(statusSystem.applicationMetas.get(appId)).map(_.userIdentifier)
.getOrElse(UserIdentifier.UNKNOWN_USER_IDENTIFIER)
sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time).padTo(40, " ")}$userIdentifier\n")
}
sb.toString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,5 +763,20 @@ public void testHandleApplicationMeta() {
statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2));
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
assertTrue(
statusSystem.applicationMetas.get(appId2).userIdentifier()
== UserIdentifier.UNKNOWN_USER_IDENTIFIER());

UserIdentifier userIdentifier = new UserIdentifier("celeborn", "celeborn");
statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2, userIdentifier));
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
assertTrue(statusSystem.applicationMetas.get(appId2).userIdentifier() == userIdentifier);

String appId3 = "app03";
statusSystem.handleApplicationMeta(new ApplicationMeta(appId3, "", userIdentifier));
assertEquals("", statusSystem.applicationMetas.get(appId3).secret());
assertEquals(3, statusSystem.applicationMetas.size());
assertTrue(statusSystem.applicationMetas.get(appId3).userIdentifier() == userIdentifier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.tests.client

import java.util

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.service.deploy.MiniClusterFeature

class LifecycleManagerApplicationMetaSuite extends CelebornFunSuite with MiniClusterFeature {
protected var celebornConf: CelebornConf = _

private val APP = "app-" + System.currentTimeMillis()
private val userIdentifier = UserIdentifier("test", "celeborn")

override protected def beforeAll(): Unit = {
super.beforeAll()
System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_TENANT.key, userIdentifier.tenantId)
System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_USERNAME.key, userIdentifier.name)
celebornConf = new CelebornConf()
val (master, _) = setupMiniClusterWithRandomPorts()
logInfo(s"master address is: ${master.conf.get(CelebornConf.MASTER_ENDPOINTS.key)}")
celebornConf.set(
CelebornConf.MASTER_ENDPOINTS.key,
master.conf.get(CelebornConf.MASTER_ENDPOINTS.key))
}

test("application meta") {
val lifecycleManager: LifecycleManager = new LifecycleManager(APP, celebornConf)

val arrayList = new util.ArrayList[Integer]()
(0 to 10).foreach(i => {
arrayList.add(i)
})

lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)

assert(masterInfo._1.getApplicationList.contains(userIdentifier.toString))
}

override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()
}
}

0 comments on commit a9a4761

Please sign in to comment.