Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1258] Add UserIdentifier to the master application meta #2365

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (authEnabled) {
logInfo(s"Authentication is enabled; setting up master and worker RPC environments")
val appSecret = createSecret()
applicationMeta = ApplicationMeta(appUniqueId, appSecret)
applicationMeta = ApplicationMeta(appUniqueId, appSecret, userIdentifier)
val registrationInfo = new RegistrationInfo()
masterRpcEnvInUse =
RpcEnv.create(
Expand All @@ -192,6 +192,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
0,
conf,
createRpcSecurityContext(appSecret))
} else {
applicationMeta = ApplicationMeta(appUniqueId, "", userIdentifier)
}

private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false)
Expand All @@ -207,13 +209,20 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)

private def updateApplicationMeta(): Unit = {
Utils.tryLogNonFatalError(masterClient.askSync[PbApplicationMetaUpdateResponse](
PbSerDeUtils.toPbApplicationMeta(applicationMeta),
classOf[PbApplicationMetaUpdateResponse]))
}

// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is null when
// `masterClient` is called. Therefore, it's necessary to uniformly execute the initialization
// method at the end of the construction of the class to perform the initialization operations.
private def initialize(): Unit = {
// noinspection ConvertExpressionToSAM
updateApplicationMeta()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually conflicts with safely propagating the application secret to the Celeborn Master. When auth is enabled, this will transmit application secret to Celeborn Master without any Sasl client authentication. Currently, we have added Anonymous client authentication, but the plan was to add other mechanisms.
cc. @mridulm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename the existing ApplicationMeta to ApplicationAuthMeta and it should contain information specific to authentication. We can then use ApplicationMeta for general app info that needs to be sent to the Celeborn Master.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raised #2420 to separate application auth and general meta. cc @otterc @RexXiong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually conflicts with safely propagating the application secret to the Celeborn Master. When auth is enabled, this will transmit application secret to Celeborn Master without any Sasl client authentication. Currently, we have added Anonymous client authentication, but the plan was to add other mechanisms. cc. @mridulm

Based on my understanding, if authentication is enabled in Celeborn, it is not possible to access the Celeborn Master without SASL client authentication. This PR does not introduce new mechanisms; it merely adds an identifier to ApplicationMeta. Therefore, I believe this PR does not compromise security. @otterc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. The connection will only be established once the client is authenticated. However, I still believe that sending the secret in plain text multiple times to the Master may cause issues in the future. Eventually, we would want to encrypt the secret, and TLS support is for that purpose. If we are sending the secret multiple times, it would mean that not only registration has to be done with TLS but also that all messages should be sent via TLS. Therefore, I think having authentication metadata separated from general application metadata will be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with most messages shouldn't be sent via TLS. I believe that the most fundamental issue with the existing security authentication mechanism is that it only authenticates the connection, but does not verify the legitimacy of the messages sent by the authenticated client. At the very least, we need verify that the applicationId in the sent messages matches the applicationId provided during the initial authentication, Otherwise, an authenticated client could still access or modify with the data of other applications. I am not sure if this is in line with the expectations.

Copy link
Contributor

@otterc otterc Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a gap. We can validate the appId in the fetch and push messages of an application. I created https://issues.apache.org/jira/browse/CELEBORN-1360 for that and will create a PR soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a gap. We can validate the appId in the fetch and push messages of an application. I created https://issues.apache.org/jira/browse/CELEBORN-1360 for that and will create a PR soon.

If that's the case, then we won't have to send a secret; we'll only need to update the userIdentifier. I believe that extending the ApplicationMeta to contain more application-related information is acceptable.

commitManager.start()
heartbeater.start()
changePartitionManager.start()
Expand Down
6 changes: 6 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ enum MessageType {
APPLICATION_META_REQUEST = 79;
BATCH_OPEN_STREAM = 80;
BATCH_OPEN_STREAM_RESPONSE = 81;
APPLICATION_META_UPDATE_RESPONSE = 82;
}

enum StreamType {
Expand Down Expand Up @@ -753,8 +754,13 @@ message PbRegisterApplicationResponse {
message PbApplicationMeta {
string appId = 1;
string secret = 2;
PbUserIdentifier userIdentifier = 3;
}

message PbApplicationMetaRequest {
string appId = 1;
}

message PbApplicationMetaUpdateResponse {
bool status = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ case class UserIdentifier(tenantId: String, name: String) {
object UserIdentifier extends Logging {
val USER_IDENTIFIER = "^\\`(.+)\\`\\.\\`(.+)\\`$".r

val UNKNOWN_USER_IDENTIFIER =
UserIdentifier(IdentityProvider.DEFAULT_USERNAME, IdentityProvider.DEFAULT_TENANT_ID)

def apply(userIdentifier: String): UserIdentifier = {
if (USER_IDENTIFIER.findPrefixOf(userIdentifier).isDefined) {
val USER_IDENTIFIER(tenantId, name) = userIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@

package org.apache.celeborn.common.meta

import org.apache.celeborn.common.identity.UserIdentifier

/**
* Application meta
*/
case class ApplicationMeta(appId: String, secret: String)
case class ApplicationMeta(
appId: String,
secret: String,
userIdentifier: UserIdentifier = UserIdentifier.UNKNOWN_USER_IDENTIFIER) {
def this(appId: String, secret: String) =
this(appId, secret, UserIdentifier.UNKNOWN_USER_IDENTIFIER)
}
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,12 @@ object ControlMessages extends Logging {

case pb: PbCheckWorkersAvailableResponse =>
new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE, pb.toByteArray)

case pb: PbApplicationMeta =>
new TransportMessage(MessageType.APPLICATION_META, pb.toByteArray)

case pb: PbApplicationMetaUpdateResponse =>
new TransportMessage(MessageType.APPLICATION_META_UPDATE_RESPONSE, pb.toByteArray)
}

// TODO change return type to GeneratedMessageV3
Expand Down Expand Up @@ -1238,6 +1244,12 @@ object ControlMessages extends Logging {

case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)

case APPLICATION_META_VALUE =>
PbApplicationMeta.parseFrom(message.getPayload)

case APPLICATION_META_UPDATE_RESPONSE_VALUE =>
PbApplicationMetaUpdateResponse.parseFrom(message.getPayload)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,16 @@ object PbSerDeUtils {
def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = {
PbApplicationMeta.newBuilder()
.setAppId(meta.appId)
.setSecret(meta.secret).build()
.setSecret(meta.secret)
.setUserIdentifier(Option(meta.userIdentifier).map(toPbUserIdentifier).orNull).build()
}

def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = {
new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
new ApplicationMeta(
pbApplicationMeta.getAppId,
pbApplicationMeta.getSecret,
Option(pbApplicationMeta.getUserIdentifier).map(fromPbUserIdentifier).getOrElse(
UserIdentifier.UNKNOWN_USER_IDENTIFIER))
}

def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import scala.Option;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
Expand Down Expand Up @@ -472,6 +473,14 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) {
}

public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
ApplicationMeta existing = applicationMetas.get(applicationMeta.appId());
if (null == existing) {
applicationMetas.put(applicationMeta.appId(), applicationMeta);
} else if (StringUtils.isNotBlank(existing.secret())
&& existing.secret().equals(applicationMeta.secret())) {
applicationMetas.put(
applicationMeta.appId(),
existing.copy(existing.appId(), existing.secret(), applicationMeta.userIdentifier()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ public void handleApplicationMeta(ApplicationMeta applicationMeta) {
ResourceProtos.ApplicationMetaRequest.newBuilder()
.setAppId(applicationMeta.appId())
.setSecret(applicationMeta.secret())
.setUserIdentifier(
ResourceProtos.UserIdentifier.newBuilder()
.setTenantId(applicationMeta.userIdentifier().tenantId())
.setName(applicationMeta.userIdentifier().name())
.build())
.build())
.build());
} catch (CelebornRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
case ApplicationMeta:
appId = request.getApplicationMetaRequest().getAppId();
String secret = request.getApplicationMetaRequest().getSecret();
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
UserIdentifier userIdentifier =
new UserIdentifier(
request.getApplicationMetaRequest().getUserIdentifier().getTenantId(),
request.getApplicationMetaRequest().getUserIdentifier().getName());
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret, userIdentifier));
break;

default:
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,5 @@ message ResourceResponse {
message ApplicationMetaRequest {
required string appId = 1;
optional string secret = 2;
required UserIdentifier userIdentifier = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationMeta, DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.CelebornRackResolver
Expand All @@ -49,7 +49,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornHadoopUtils, CollectionUtils, JavaUtils, PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.{ResourceProtos, SingleMasterMetaManager}
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager

Expand Down Expand Up @@ -543,6 +543,9 @@ private[celeborn] class Master(

case pb: PbApplicationMetaRequest =>
executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb))

case pb: PbApplicationMeta =>
executeWithLeaderChecker(context, handleApplicationMetaUpdate(context, pb))
}

private def timeoutDeadWorkers(): Unit = {
Expand Down Expand Up @@ -1132,6 +1135,23 @@ private[celeborn] class Master(
}
}

private def handleApplicationMetaUpdate(
context: RpcCallContext,
pb: PbApplicationMeta): Unit = {
statusSystem.handleApplicationMeta(ApplicationMeta(
pb.getAppId,
pb.getSecret,
Option(pb.getUserIdentifier).map(PbSerDeUtils.fromPbUserIdentifier).getOrElse(
UserIdentifier.UNKNOWN_USER_IDENTIFIER)))
val pbApplicationMetaUpdateResponse = PbApplicationMetaUpdateResponse.newBuilder()
.setStatus(true).build()
val transportMessage =
new TransportMessage(
MessageType.APPLICATION_META_UPDATE_RESPONSE,
pbApplicationMetaUpdateResponse.toByteArray)
context.reply(transportMessage)
}

override def getMasterGroupInfo: String = {
val sb = new StringBuilder
sb.append("====================== Master Group INFO ==============================\n")
Expand Down Expand Up @@ -1226,7 +1246,9 @@ private[celeborn] class Master(
val sb = new StringBuilder
sb.append("================= LifecycleManager Application List ======================\n")
statusSystem.appHeartbeatTime.asScala.toSeq.sortBy(_._2).foreach { case (appId, time) =>
sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time)}\n")
val userIdentifier = Option(statusSystem.applicationMetas.get(appId)).map(_.userIdentifier)
.getOrElse(UserIdentifier.UNKNOWN_USER_IDENTIFIER)
sb.append(s"${appId.padTo(40, " ").mkString}${Utils.formatTimestamp(time).padTo(40, " ")}$userIdentifier\n")
}
sb.toString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,5 +763,20 @@ public void testHandleApplicationMeta() {
statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2));
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
assertTrue(
statusSystem.applicationMetas.get(appId2).userIdentifier()
== UserIdentifier.UNKNOWN_USER_IDENTIFIER());

UserIdentifier userIdentifier = new UserIdentifier("celeborn", "celeborn");
statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2, userIdentifier));
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
assertTrue(statusSystem.applicationMetas.get(appId2).userIdentifier() == userIdentifier);

String appId3 = "app03";
statusSystem.handleApplicationMeta(new ApplicationMeta(appId3, "", userIdentifier));
assertEquals("", statusSystem.applicationMetas.get(appId3).secret());
assertEquals(3, statusSystem.applicationMetas.size());
assertTrue(statusSystem.applicationMetas.get(appId3).userIdentifier() == userIdentifier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.tests.client

import java.util

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.service.deploy.MiniClusterFeature

class LifecycleManagerApplicationMetaSuite extends CelebornFunSuite with MiniClusterFeature {
protected var celebornConf: CelebornConf = _

private val APP = "app-" + System.currentTimeMillis()
private val userIdentifier = UserIdentifier("test", "celeborn")

override protected def beforeAll(): Unit = {
super.beforeAll()
System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_TENANT.key, userIdentifier.tenantId)
System.setProperty(CelebornConf.QUOTA_USER_SPECIFIC_USERNAME.key, userIdentifier.name)
celebornConf = new CelebornConf()
val (master, _) = setupMiniClusterWithRandomPorts()
logInfo(s"master address is: ${master.conf.get(CelebornConf.MASTER_ENDPOINTS.key)}")
celebornConf.set(
CelebornConf.MASTER_ENDPOINTS.key,
master.conf.get(CelebornConf.MASTER_ENDPOINTS.key))
}

test("application meta") {
val lifecycleManager: LifecycleManager = new LifecycleManager(APP, celebornConf)

val arrayList = new util.ArrayList[Integer]()
(0 to 10).foreach(i => {
arrayList.add(i)
})

lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)

assert(masterInfo._1.getApplicationList.contains(userIdentifier.toString))
}

override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()
}
}
Loading