Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1258] Separate application auth and general metadata and add UserIdentifier to the master application meta #2420

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,20 @@ private void initDataClientFactoryIfNeeded() {
logger.info("Initializing data client factory for {}.", appUniqueId);
dataClientFactory = context.createClientFactory();
} else if (lifecycleManagerRef != null) {
PbApplicationMetaRequest pbApplicationMetaRequest =
PbApplicationMetaRequest.newBuilder().setAppId(appUniqueId).build();
PbApplicationMeta pbApplicationMeta =
PbApplicationAuthMetaRequest pbApplicationAuthMetaRequest =
PbApplicationAuthMetaRequest.newBuilder().setAppId(appUniqueId).build();
PbApplicationAuthMeta pbApplicationAuthMeta =
lifecycleManagerRef.askSync(
pbApplicationMetaRequest,
pbApplicationAuthMetaRequest,
conf.clientRpcRegisterShuffleRpcAskTimeout(),
ClassTag$.MODULE$.apply(PbApplicationMeta.class));
ClassTag$.MODULE$.apply(PbApplicationAuthMeta.class));
logger.info("Initializing data client factory for secured {}.", appUniqueId);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
bootstraps.add(
new SaslClientBootstrap(
dataTransportConf,
appUniqueId,
new SaslCredentials(appUniqueId, pbApplicationMeta.getSecret())));
new SaslCredentials(appUniqueId, pbApplicationAuthMeta.getSecret())));
dataClientFactory = context.createClientFactory(bootstraps);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.meta.{ApplicationAuthMeta, ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
Expand Down Expand Up @@ -113,7 +113,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

private val mockDestroyFailure = conf.testMockDestroySlotsFailure
private val authEnabled = conf.authEnabledOnClient
private var applicationMeta: ApplicationMeta = _
private var applicationAuthMeta: ApplicationAuthMeta = _
private val applicationMeta = ApplicationMeta(appUniqueId, userIdentifier)
@VisibleForTesting
def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo, ShufflePartitionLocationInfo] =
shuffleAllocatedWorkers.get(shuffleId)
Expand Down Expand Up @@ -173,7 +174,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (authEnabled) {
logInfo(s"Authentication is enabled; setting up master and worker RPC environments")
val appSecret = createSecret()
applicationMeta = ApplicationMeta(appUniqueId, appSecret)
applicationAuthMeta = ApplicationAuthMeta(appUniqueId, appSecret)
val registrationInfo = new RegistrationInfo()
masterRpcEnvInUse =
RpcEnv.create(
Expand Down Expand Up @@ -207,13 +208,20 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)

private def updateApplicationMeta(): Unit = {
Utils.tryLogNonFatalError(masterClient.askSync[PbApplicationMetaUpdateResponse](
PbSerDeUtils.toPbApplicationMeta(applicationMeta),
classOf[PbApplicationMetaUpdateResponse]))
}

// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is null when
// `masterClient` is called. Therefore, it's necessary to uniformly execute the initialization
// method at the end of the construction of the class to perform the initialization operations.
private def initialize(): Unit = {
// noinspection ConvertExpressionToSAM
updateApplicationMeta()
commitManager.start()
heartbeater.start()
changePartitionManager.start()
Expand Down Expand Up @@ -414,13 +422,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logDebug(s"Received ReportShuffleFetchFailure request, appShuffleId $appShuffleId shuffleId $shuffleId")
handleReportShuffleFetchFailure(context, appShuffleId, shuffleId)

case pb: PbApplicationMetaRequest =>
case pb: PbApplicationAuthMetaRequest =>
logDebug(s"Received request for meta info ${pb.getAppId}.")
if (applicationMeta == null) {
if (applicationAuthMeta == null) {
context.sendFailure(
new IllegalArgumentException("Application meta is not initialized for this app."))
new IllegalArgumentException("Application auth meta is not initialized for this app."))
} else {
context.reply(PbSerDeUtils.toPbApplicationMeta(applicationMeta))
context.reply(PbSerDeUtils.toPbApplicationAuthMeta(applicationAuthMeta))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ public <T extends GeneratedMessageV3> T getParsedPayload() throws InvalidProtoco
return (T) PbRegisterApplicationRequest.parseFrom(payload);
case REGISTER_APPLICATION_RESPONSE_VALUE:
return (T) PbRegisterApplicationResponse.parseFrom(payload);
case APPLICATION_META_VALUE:
return (T) PbApplicationMeta.parseFrom(payload);
case APPLICATION_META_REQUEST_VALUE:
return (T) PbApplicationMetaRequest.parseFrom(payload);
case APPLICATION_AUTH_META_VALUE:
return (T) PbApplicationAuthMeta.parseFrom(payload);
case APPLICATION_AUTH_META_REQUEST_VALUE:
return (T) PbApplicationAuthMetaRequest.parseFrom(payload);
case BATCH_OPEN_STREAM_VALUE:
return (T) PbOpenStreamList.parseFrom(payload);
case BATCH_OPEN_STREAM_RESPONSE_VALUE:
Expand Down
22 changes: 17 additions & 5 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ enum MessageType {
REGISTER_APPLICATION_RESPONSE = 75;
WORKER_EVENT_REQUEST = 76;
WORKER_EVENT_RESPONSE = 77;
APPLICATION_META = 78;
APPLICATION_META_REQUEST = 79;
APPLICATION_AUTH_META = 78;
APPLICATION_AUTH_META_REQUEST = 79;
BATCH_OPEN_STREAM = 80;
BATCH_OPEN_STREAM_RESPONSE = 81;
APPLICATION_META = 82;
APPLICATION_META_UPDATE_RESPONSE = 83;
}

enum StreamType {
Expand Down Expand Up @@ -618,7 +620,8 @@ message PbSnapshotMetaInfo {
repeated PbWorkerInfo shutdownWorkers = 13;
repeated PbWorkerInfo manuallyExcludedWorkers = 14;
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
map<string, PbApplicationAuthMeta> applicationAuthMetas = 16;
map<string, PbApplicationMeta> applicationMetas = 17;
}

message PbOpenStream {
Expand Down Expand Up @@ -750,11 +753,20 @@ message PbRegisterApplicationResponse {
bool status = 1;
}

message PbApplicationMeta {
message PbApplicationAuthMeta {
string appId = 1;
string secret = 2;
}

message PbApplicationMetaRequest {
message PbApplicationAuthMetaRequest {
string appId = 1;
}

message PbApplicationMeta {
string appId = 1;
PbUserIdentifier userIdentifier = 2;
}

message PbApplicationMetaUpdateResponse {
bool status = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
authEnabled && internalPortEnabled
}

def masterSendApplicationMetaThreads: Int = get(MASTER_SEND_APPLICATION_META_THREADS)
def masterSendApplicationAuthMetaThreads: Int = get(MASTER_SEND_APPLICATION_AUTH_META_THREADS)

def authEnabledOnClient: Boolean = {
get(AUTH_ENABLED)
Expand Down Expand Up @@ -4697,10 +4697,10 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)

val MASTER_SEND_APPLICATION_META_THREADS: ConfigEntry[Int] =
buildConf("celeborn.master.send.applicationMeta.threads")
val MASTER_SEND_APPLICATION_AUTH_META_THREADS: ConfigEntry[Int] =
buildConf("celeborn.master.send.applicationAuthMeta.threads")
.categories("master")
.doc("Number of threads used by the Master to send ApplicationMeta to Workers.")
.doc("Number of threads used by the Master to send ApplicationAuthMeta to Workers.")
Copy link
Member Author

@turboFei turboFei Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced all the case sensitive keywords in IDEA.

ApplicationMeta -> ApplicationAuthMeta
applicationMeta -> applicationAuthMeta
APPLICATION_META -> APPLICATION_AUTH_META

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.version("0.5.0")
.intConf
.createWithDefault(8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ case class UserIdentifier(tenantId: String, name: String) {
object UserIdentifier extends Logging {
val USER_IDENTIFIER = "^\\`(.+)\\`\\.\\`(.+)\\`$".r

val UNKNOWN_USER_IDENTIFIER =
UserIdentifier(IdentityProvider.DEFAULT_USERNAME, IdentityProvider.DEFAULT_TENANT_ID)

def apply(userIdentifier: String): UserIdentifier = {
if (USER_IDENTIFIER.findPrefixOf(userIdentifier).isDefined) {
val USER_IDENTIFIER(tenantId, name) = userIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package org.apache.celeborn.common.meta

import org.apache.celeborn.common.identity.UserIdentifier

/**
* Application auth meta
*/
case class ApplicationAuthMeta(appId: String, secret: String)

/**
* Application meta
* Application general meta
*/
case class ApplicationMeta(appId: String, secret: String)
case class ApplicationMeta(appId: String, userIdentifier: UserIdentifier)
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,12 @@ object ControlMessages extends Logging {

case pb: PbCheckWorkersAvailableResponse =>
new TransportMessage(MessageType.CHECK_WORKERS_AVAILABLE_RESPONSE, pb.toByteArray)

case pb: PbApplicationMeta =>
new TransportMessage(MessageType.APPLICATION_META, pb.toByteArray)

case pb: PbApplicationMetaUpdateResponse =>
new TransportMessage(MessageType.APPLICATION_META_UPDATE_RESPONSE, pb.toByteArray)
}

// TODO change return type to GeneratedMessageV3
Expand Down Expand Up @@ -1238,6 +1244,12 @@ object ControlMessages extends Logging {

case CHECK_WORKERS_AVAILABLE_RESPONSE_VALUE =>
PbCheckWorkersAvailableResponse.parseFrom(message.getPayload)

case APPLICATION_META_VALUE =>
PbApplicationMeta.parseFrom(message.getPayload)

case APPLICATION_META_UPDATE_RESPONSE_VALUE =>
PbApplicationMetaUpdateResponse.parseFrom(message.getPayload)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException

import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationAuthMeta, ApplicationMeta, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
Expand Down Expand Up @@ -419,6 +419,7 @@ object PbSerDeUtils {
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
applicationAuthMetas: ConcurrentHashMap[String, ApplicationAuthMeta],
applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
Expand Down Expand Up @@ -448,6 +449,12 @@ object PbSerDeUtils {
builder.setCurrentAppDiskUsageMetricsSnapshot(
toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot))
}
val pbApplicationAuthMetas = applicationAuthMetas.asScala.map {
case (appId, applicationAuthMeta) => (appId, toPbApplicationAuthMeta(applicationAuthMeta))
}.asJava
if (localCollectionUtils.isNotEmpty(pbApplicationAuthMetas)) {
builder.putAllApplicationAuthMetas(pbApplicationAuthMetas)
}
val pbApplicationMetas = applicationMetas.asScala.map {
case (appId, applicationMeta) => (appId, toPbApplicationMeta(applicationMeta))
}.asJava
Expand All @@ -457,14 +464,28 @@ object PbSerDeUtils {
builder.build()
}

def toPbApplicationAuthMeta(meta: ApplicationAuthMeta): PbApplicationAuthMeta = {
PbApplicationAuthMeta.newBuilder()
.setAppId(meta.appId)
.setSecret(meta.secret).build()
}

def fromPbApplicationAuthMeta(pbApplicationAuthMeta: PbApplicationAuthMeta)
: ApplicationAuthMeta = {
new ApplicationAuthMeta(pbApplicationAuthMeta.getAppId, pbApplicationAuthMeta.getSecret)
}

def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = {
PbApplicationMeta.newBuilder()
.setAppId(meta.appId)
.setSecret(meta.secret).build()
.setUserIdentifier(Option(meta.userIdentifier).map(toPbUserIdentifier).orNull).build()
}

def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = {
new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
new ApplicationMeta(
pbApplicationMeta.getAppId,
Option(pbApplicationMeta.getUserIdentifier).map(fromPbUserIdentifier).getOrElse(
UserIdentifier.UNKNOWN_USER_IDENTIFIER))
}

def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationAuthMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import org.apache.celeborn.common.protocol.PartitionLocation
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
Expand Down Expand Up @@ -283,11 +283,11 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredWorkerStatus.equals(workerStatus))
}

test("fromAndToPbApplicationMeta") {
val applicationMeta = new ApplicationMeta("app1", "secret1")
val pbApplicationMeta = PbSerDeUtils.toPbApplicationMeta(applicationMeta)
val restoredApplicationMeta = PbSerDeUtils.fromPbApplicationMeta(pbApplicationMeta)
test("fromAndToPbApplicationAuthMeta") {
val applicationAuthMeta = new ApplicationAuthMeta("app1", "secret1")
val pbApplicationAuthMeta = PbSerDeUtils.toPbApplicationAuthMeta(applicationAuthMeta)
val restoredApplicationAuthMeta = PbSerDeUtils.fromPbApplicationAuthMeta(pbApplicationAuthMeta)

assert(restoredApplicationMeta.equals(applicationMeta))
assert(restoredApplicationAuthMeta.equals(applicationAuthMeta))
}
}
2 changes: 1 addition & 1 deletion docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ license: |
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | |
| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationMeta to Workers. | 0.5.0 | |
| celeborn.master.send.applicationAuthMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationAuthMeta to Workers. | 0.5.0 | |
| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots |
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.ApplicationAuthMeta;
import org.apache.celeborn.common.network.sasl.SecretRegistry;
import org.apache.celeborn.common.network.sasl.SecretRegistryImpl;
import org.apache.celeborn.service.deploy.master.clustermeta.IMetadataHandler;
Expand All @@ -40,7 +40,7 @@ public void register(String appId, String secret) {
super.register(appId, secret);
if (metadataHandler != null) {
LOG.info("Persisting metadata for appId: {}", appId);
metadataHandler.handleApplicationMeta(new ApplicationMeta(appId, secret));
metadataHandler.handleApplicationAuthMeta(new ApplicationAuthMeta(appId, secret));
}
}

Expand Down
Loading
Loading