Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,32 @@

package org.apache.celeborn.service.deploy.master.http.api.v1

import javax.ws.rs.{BadRequestException, Consumes, Path, POST, Produces}
import javax.ws.rs.core.MediaType
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import javax.ws.rs.{BadRequestException, Consumes, GET, NotFoundException, Path, POST, Produces}
import javax.ws.rs.core.{MediaType, Response}

import scala.collection.JavaConverters._

import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.commons.io.IOUtils
import org.apache.ratis.proto.RaftProtos.{LogEntryProto, RaftConfigurationProto, RaftPeerProto, RaftPeerRole}
import org.apache.ratis.protocol.{LeaderElectionManagementRequest, RaftClientReply, RaftPeer, SetConfigurationRequest, SnapshotManagementRequest, TransferLeadershipRequest}
import org.apache.ratis.rpc.CallId
import org.apache.ratis.server.storage.RaftStorageDirectory
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.rest.v1.model.{HandleResponse, RatisElectionTransferRequest, RatisPeerAddRequest, RatisPeerRemoveRequest, RatisPeerSetPriorityRequest}
import org.apache.celeborn.rest.v1.model.{HandleResponse, RatisElectionTransferRequest, RatisLocalRaftMetaConfRequest, RatisPeerAddRequest, RatisPeerRemoveRequest, RatisPeerSetPriorityRequest}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAMasterMetaManager, HARaftServer}
import org.apache.celeborn.service.deploy.master.http.api.MasterHttpResourceUtils.{ensureMasterHAEnabled, ensureMasterIsLeader}

@Tag(name = "Ratis")
@Produces(Array(MediaType.APPLICATION_JSON))
@Consumes(Array(MediaType.APPLICATION_JSON))
class RatisResource extends ApiRequestContext with Logging {
private def master = httpService.asInstanceOf[Master]
Expand All @@ -51,6 +56,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Transfer the group leader to the specified server.")
@POST
@Path("/election/transfer")
@Produces(Array(MediaType.APPLICATION_JSON))
def electionTransfer(request: RatisElectionTransferRequest): HandleResponse =
ensureMasterIsLeader(master) {
transferLeadership(request.getPeerAddress)
Expand All @@ -64,6 +70,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Make the group leader step down its leadership.")
@POST
@Path("/election/step_down")
@Produces(Array(MediaType.APPLICATION_JSON))
def electionStepDown(): HandleResponse = ensureMasterIsLeader(master) {
transferLeadership(null)
}
Expand All @@ -77,6 +84,7 @@ class RatisResource extends ApiRequestContext with Logging {
" Then, the current server would not start a leader election.")
@POST
@Path("/election/pause")
@Produces(Array(MediaType.APPLICATION_JSON))
def electionPause(): HandleResponse = ensureMasterHAEnabled(master) {
applyElectionOp(new LeaderElectionManagementRequest.Pause)
}
Expand All @@ -89,6 +97,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Resume leader election at the current server.")
@POST
@Path("/election/resume")
@Produces(Array(MediaType.APPLICATION_JSON))
def electionResume(): HandleResponse = ensureMasterHAEnabled(master) {
applyElectionOp(new LeaderElectionManagementRequest.Resume)
}
Expand All @@ -101,6 +110,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Add new peers to the raft group.")
@POST
@Path("/peer/add")
@Produces(Array(MediaType.APPLICATION_JSON))
def peerAdd(request: RatisPeerAddRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getPeers.isEmpty) {
Expand Down Expand Up @@ -146,6 +156,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Remove peers from the raft group.")
@POST
@Path("/peer/remove")
@Produces(Array(MediaType.APPLICATION_JSON))
def peerRemove(request: RatisPeerRemoveRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getPeers.isEmpty) {
Expand Down Expand Up @@ -183,6 +194,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Set the priority of the peers in the raft group.")
@POST
@Path("/peer/set_priority")
@Produces(Array(MediaType.APPLICATION_JSON))
def peerSetPriority(request: RatisPeerSetPriorityRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getAddressPriorities.isEmpty) {
Expand Down Expand Up @@ -218,6 +230,7 @@ class RatisResource extends ApiRequestContext with Logging {
description = "Trigger the current server to take snapshot.")
@POST
@Path("/snapshot/create")
@Produces(Array(MediaType.APPLICATION_JSON))
def createSnapshot(): HandleResponse = ensureMasterHAEnabled(master) {
val request = SnapshotManagementRequest.newCreate(
ratisServer.getClientId,
Expand All @@ -235,6 +248,75 @@ class RatisResource extends ApiRequestContext with Logging {
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_OCTET_STREAM,
schema = new Schema(implementation = classOf[Response]))),
description = "Get the raft-meta.conf file of the current server.")
@GET
@Path("/local/raft_meta_conf")
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
def getLocalRaftMetaConf(): Response = ensureMasterHAEnabled(master) {
val raftMetaConfFile = Paths.get(
master.conf.haMasterRatisStorageDir,
ratisServer.getGroupId.getUuid.toString,
RaftStorageDirectory.CURRENT_DIR_NAME,
"raft-meta.conf")

if (!raftMetaConfFile.toFile.exists()) {
throw new NotFoundException(s"File $raftMetaConfFile not found.")
}

Response.ok(IOUtils.toByteArray(raftMetaConfFile.toUri))
.header("Content-Disposition", "attachment; filename=\"raft-meta.conf\"")
.build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_OCTET_STREAM,
schema = new Schema(implementation = classOf[Response]))),
description = "Generate a new-raft-meta.conf file based on original raft-meta.conf" +
" and new peers, which is used to move a raft node to a new node.")
@POST
@Path("/local/raft_meta_conf")
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
Comment thread
turboFei marked this conversation as resolved.
def generateNewRaftMetaConf(request: RatisLocalRaftMetaConfRequest): Response =
ensureMasterHAEnabled(master) {
if (request.getPeers.isEmpty) {
throw new BadRequestException("No peers specified.")
}

val groupInfo = ratisServer.getGroupInfo

val existingPeers = getRaftPeers().map(_.getRaftPeerProto)
val newPeers = request.getPeers.asScala.map { peer =>
RaftPeerProto.newBuilder()
.setId(ByteString.copyFrom(peer.getId.getBytes(StandardCharsets.UTF_8)))
.setAddress(peer.getAddress)
.setStartupRole(RaftPeerRole.FOLLOWER)
.build()
}

val remainingPeers =
existingPeers.filterNot(p => newPeers.exists(_.getId.toStringUtf8 == p.getId.toStringUtf8))
val allPeers = remainingPeers ++ newPeers

val newIndex = groupInfo.getLogIndex + 1
logInfo(s"Generating new-raft-meta.conf with remaining peers:" +
s" $remainingPeers and new peers: $newPeers, index: $newIndex.")

val generateLogEntryProto = LogEntryProto.newBuilder()
.setConfigurationEntry(RaftConfigurationProto.newBuilder()
.addAllPeers(allPeers.asJava).build())
.setIndex(newIndex).build()
Response.ok(generateLogEntryProto.toByteArray)
.header("Content-Disposition", "attachment; filename=\"new-raft-meta.conf\"")
.build()
}

private def transferLeadership(peerAddress: String): HandleResponse = {
val newLeaderId = Option(peerAddress).map { addr =>
getRaftPeers().find(_.getAddress == addr).map(_.getId).getOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.celeborn.rest.v1.master.invoker.Configuration;
import org.apache.celeborn.rest.v1.master.invoker.Pair;

import java.io.File;
import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.RatisElectionTransferRequest;
import org.apache.celeborn.rest.v1.model.RatisLocalRaftMetaConfRequest;
import org.apache.celeborn.rest.v1.model.RatisPeerAddRequest;
import org.apache.celeborn.rest.v1.model.RatisPeerRemoveRequest;
import org.apache.celeborn.rest.v1.model.RatisPeerSetPriorityRequest;
Expand Down Expand Up @@ -186,6 +188,142 @@ public HandleResponse createRatisSnapshot(Map<String, String> additionalHeaders)
);
}

/**
*
* Generate a new-raft-meta.conf file based on original raft-meta.conf and new peers, which is used to move a raft node to a new node.
* @param ratisLocalRaftMetaConfRequest (optional)
* @return File
* @throws ApiException if fails to make API call
*/
public File generateNewRaftMetaConf(RatisLocalRaftMetaConfRequest ratisLocalRaftMetaConfRequest) throws ApiException {
return this.generateNewRaftMetaConf(ratisLocalRaftMetaConfRequest, Collections.emptyMap());
}


/**
*
* Generate a new-raft-meta.conf file based on original raft-meta.conf and new peers, which is used to move a raft node to a new node.
* @param ratisLocalRaftMetaConfRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return File
* @throws ApiException if fails to make API call
*/
public File generateNewRaftMetaConf(RatisLocalRaftMetaConfRequest ratisLocalRaftMetaConfRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = ratisLocalRaftMetaConfRequest;

// create path and map variables
String localVarPath = "/api/v1/ratis/local/raft_meta_conf";

StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();


localVarHeaderParams.putAll(additionalHeaders);



final String[] localVarAccepts = {
"application/octet-stream"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);

final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

String[] localVarAuthNames = new String[] { "basic" };

TypeReference<File> localVarReturnType = new TypeReference<File>() {};
return apiClient.invokeAPI(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}

/**
*
* Get the raft-meta.conf file of the current server.
* @return File
* @throws ApiException if fails to make API call
*/
public File getLocalRaftMetaConf() throws ApiException {
return this.getLocalRaftMetaConf(Collections.emptyMap());
}


/**
*
* Get the raft-meta.conf file of the current server.
* @param additionalHeaders additionalHeaders for this call
* @return File
* @throws ApiException if fails to make API call
*/
public File getLocalRaftMetaConf(Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = null;

// create path and map variables
String localVarPath = "/api/v1/ratis/local/raft_meta_conf";

StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();


localVarHeaderParams.putAll(additionalHeaders);



final String[] localVarAccepts = {
"application/octet-stream"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);

final String[] localVarContentTypes = {

};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

String[] localVarAuthNames = new String[] { "basic" };

TypeReference<File> localVarReturnType = new TypeReference<File>() {};
return apiClient.invokeAPI(
localVarPath,
"GET",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}

/**
*
* Pause leader election at the current server. Then, the current server would not start a leader election.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
ContainerInfo.JSON_PROPERTY_CONTAINER_CLUSTER,
ContainerInfo.JSON_PROPERTY_CONTAINER_TAGS
})
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.7.0")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0")
public class ContainerInfo {
public static final String JSON_PROPERTY_CONTAINER_NAME = "containerName";
private String containerName;
Expand Down
Loading