Skip to content

Commit

Permalink
YARN-11478. [Federation] SQLFederationStateStore Support Store Applic…
Browse files Browse the repository at this point in the history
…ationSubmitData. (apache#5663)
  • Loading branch information
slfan1989 committed May 24, 2023
1 parent e6b54f7 commit b977065
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

-- Script to generate all the stored procedures for the Federation StateStore in MySQL

USE FederationStateStore
USE FederationStateStore;

DELIMITER //

Expand Down Expand Up @@ -89,11 +89,12 @@ END //

CREATE PROCEDURE sp_addApplicationHomeSubCluster(
IN applicationId_IN varchar(64), IN homeSubCluster_IN varchar(256),
IN applicationContext_IN BLOB,
OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)
BEGIN
INSERT INTO applicationsHomeSubCluster
(applicationId,homeSubCluster)
(SELECT applicationId_IN, homeSubCluster_IN
(applicationId, homeSubCluster, createTime, applicationContext)
(SELECT applicationId_IN, homeSubCluster_IN, NOW(), applicationContext_IN
FROM applicationsHomeSubCluster
WHERE applicationId = applicationId_IN
HAVING COUNT(*) = 0 );
Expand All @@ -105,19 +106,23 @@ END //

CREATE PROCEDURE sp_updateApplicationHomeSubCluster(
IN applicationId_IN varchar(64),
IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)
IN homeSubCluster_IN varchar(256), IN applicationContext_IN BLOB, OUT rowCount_OUT int)
BEGIN
UPDATE applicationsHomeSubCluster
SET homeSubCluster = homeSubCluster_IN
SET homeSubCluster = homeSubCluster_IN,
applicationContext = applicationContext_IN
WHERE applicationId = applicationId_IN;
SELECT ROW_COUNT() INTO rowCount_OUT;
END //

CREATE PROCEDURE sp_getApplicationHomeSubCluster(
IN applicationId_IN varchar(64),
OUT homeSubCluster_OUT varchar(256))
OUT homeSubCluster_OUT varchar(256),
OUT createTime_OUT datetime,
OUT applicationContext_OUT BLOB)
BEGIN
SELECT homeSubCluster INTO homeSubCluster_OUT
SELECT homeSubCluster, applicationContext, createTime
INTO homeSubCluster_OUT, applicationContext_OUT, createTime_OUT
FROM applicationsHomeSubCluster
WHERE applicationId = applicationID_IN;
END //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

-- Script to generate all the tables for the Federation StateStore in MySQL

USE FederationStateStore
USE FederationStateStore;

CREATE TABLE applicationsHomeSubCluster(
applicationId varchar(64) NOT NULL,
homeSubCluster varchar(256) NOT NULL,
createTime datetime NOT NULL,
applicationContext BLOB NULL,
CONSTRAINT pk_applicationId PRIMARY KEY (applicationId)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ GO
CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256),
@applicationContext_IN VARBINARY(MAX),
@storedHomeSubCluster_OUT VARCHAR(256) OUTPUT,
@rowCount_OUT int OUTPUT
AS BEGIN
Expand All @@ -41,10 +42,14 @@ AS BEGIN

INSERT INTO [dbo].[applicationsHomeSubCluster] (
[applicationId],
[homeSubCluster])
[homeSubCluster],
[createTime],
[applicationContext])
VALUES (
@applicationId_IN,
@homeSubCluster_IN);
@homeSubCluster_IN,
GETUTCDATE(),
@applicationContext_IN);
-- End of the IF block

SELECT @rowCount_OUT = @@ROWCOUNT;
Expand Down Expand Up @@ -77,6 +82,7 @@ GO
CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64),
@homeSubCluster_IN VARCHAR(256),
@applicationContext_IN VARBINARY(MAX),
@rowCount_OUT int OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
Expand All @@ -85,7 +91,8 @@ AS BEGIN
BEGIN TRAN

UPDATE [dbo].[applicationsHomeSubCluster]
SET [homeSubCluster] = @homeSubCluster_IN
SET [homeSubCluster] = @homeSubCluster_IN,
[applicationContext] = @applicationContext_IN
WHERE [applicationId] = @applicationId_IN;
SELECT @rowCount_OUT = @@ROWCOUNT;

Expand Down Expand Up @@ -151,13 +158,17 @@ GO

CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster]
@applicationId_IN VARCHAR(64),
@homeSubCluster_OUT VARCHAR(256) OUTPUT
@homeSubCluster_OUT VARCHAR(256) OUTPUT,
@createTime_OUT datetime OUT,
@applicationContext_OUT VARBINARY(MAX) OUTPUT
AS BEGIN
DECLARE @errorMessage nvarchar(4000)

BEGIN TRY

SELECT @homeSubCluster_OUT = [homeSubCluster]
SELECT @homeSubCluster_OUT = [homeSubCluster],
@createTime_OUT = [createTime],
@applicationContext_OUT = [applicationContext]
FROM [dbo].[applicationsHomeSubCluster]
WHERE [applicationId] = @applicationId_IN;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL,
homeSubCluster VARCHAR(256) NOT NULL,
createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(),

applicationContext VARBINARY(MAX) NULL,
CONSTRAINT [pk_applicationId] PRIMARY KEY
(
[applicationId]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Blob;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
Expand All @@ -35,10 +38,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
Expand Down Expand Up @@ -145,16 +151,16 @@ public class SQLFederationStateStore implements FederationStateStore {
"{call sp_subClusterHeartbeat(?, ?, ?, ?)}";

private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER =
"{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}";
"{call sp_addApplicationHomeSubCluster(?, ?, ?, ?, ?)}";

private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_updateApplicationHomeSubCluster(?, ?, ?)}";
"{call sp_updateApplicationHomeSubCluster(?, ?, ?, ?)}";

private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER =
"{call sp_deleteApplicationHomeSubCluster(?, ?)}";

private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER =
"{call sp_getApplicationHomeSubCluster(?, ?)}";
"{call sp_getApplicationHomeSubCluster(?, ?, ?, ?)}";

private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster(?, ?)}";
Expand Down Expand Up @@ -610,17 +616,25 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
CallableStatement cstmt = null;

String subClusterHome = null;
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
SubClusterId subClusterId =
request.getApplicationHomeSubCluster().getHomeSubCluster();
ApplicationHomeSubCluster applicationHomeSubCluster =
request.getApplicationHomeSubCluster();
ApplicationId appId = applicationHomeSubCluster.getApplicationId();
SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
ApplicationSubmissionContext appSubmissionContext =
applicationHomeSubCluster.getApplicationSubmissionContext();

try {
cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);

// Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
if (appSubmissionContext != null) {
cstmt.setBlob("applicationContext_IN", new ByteArrayInputStream(
((ApplicationSubmissionContextPBImpl) appSubmissionContext).getProto().toByteArray()));
} else {
cstmt.setNull("applicationContext_IN", Types.BLOB);
}
cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR);
cstmt.registerOutParameter("rowCount_OUT", INTEGER);

Expand Down Expand Up @@ -687,17 +701,25 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(

CallableStatement cstmt = null;

ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
SubClusterId subClusterId =
request.getApplicationHomeSubCluster().getHomeSubCluster();
ApplicationHomeSubCluster applicationHomeSubCluster =
request.getApplicationHomeSubCluster();
ApplicationId appId = applicationHomeSubCluster.getApplicationId();
SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
ApplicationSubmissionContext appSubmissionContext =
applicationHomeSubCluster.getApplicationSubmissionContext();

try {
cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);

// Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", appId.toString());
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
if (appSubmissionContext != null) {
cstmt.setBlob("applicationContext_IN", new ByteArrayInputStream(
((ApplicationSubmissionContextPBImpl) appSubmissionContext).getProto().toByteArray()));
} else {
cstmt.setNull("applicationContext_IN", Types.BLOB);
}
cstmt.registerOutParameter("rowCount_OUT", INTEGER);

// Execute the query
Expand Down Expand Up @@ -742,15 +764,18 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
CallableStatement cstmt = null;

SubClusterId homeRM = null;

Long createTime = 0L;
ApplicationId applicationId = request.getApplicationId();
ApplicationSubmissionContext appSubmissionContext = null;

try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);

// Set the parameters for the stored procedure
cstmt.setString("applicationId_IN", applicationId.toString());
cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR);
cstmt.registerOutParameter("createTime_OUT", java.sql.Types.TIMESTAMP);
cstmt.registerOutParameter("applicationContext_OUT", Types.BLOB);

// Execute the query
long startTime = clock.getTime();
Expand All @@ -765,6 +790,15 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
"Application %s does not exist.", applicationId);
}

Timestamp createTimeStamp = cstmt.getTimestamp("createTime_OUT", utcCalendar);
createTime = createTimeStamp != null ? createTimeStamp.getTime() : 0;

Blob blobAppContextData = cstmt.getBlob("applicationContext_OUT");
if (blobAppContextData != null && request.getContainsAppSubmissionContext()) {
appSubmissionContext = new ApplicationSubmissionContextPBImpl(
ApplicationSubmissionContextProto.parseFrom(blobAppContextData.getBinaryStream()));
}

LOG.debug("Got the information about the specified application {}."
+ " The AM is running in {}", applicationId, homeRM);

Expand All @@ -775,11 +809,17 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the application information for the specified application %s.",
applicationId);
} catch (IOException e) {
FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
"Unable to obtain the application information for the specified application %s.",
applicationId);
} finally {
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetApplicationHomeSubClusterResponse.newInstance(request.getApplicationId(), homeRM);
return GetApplicationHomeSubClusterResponse.newInstance(applicationId, homeRM,
createTime, appSubmissionContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ public void setCreateTime(long time) {
@Override
public void setApplicationSubmissionContext(ApplicationSubmissionContext context) {
maybeInitBuilder();
if (applicationSubmissionContext == null) {
if (context == null) {
builder.clearAppSubmitContext();
return;
}
this.applicationSubmissionContext = context;
builder.setAppSubmitContext(convertToProtoFormat(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
Expand Down Expand Up @@ -1086,4 +1088,28 @@ public void testLoadVersion() throws Exception {
public void testCheckVersion() throws Exception {
stateStore.checkVersion();
}

@Test
public void testGetApplicationHomeSubClusterWithContext() throws Exception {
FederationStateStore federationStateStore = this.getStateStore();

ApplicationId appId = ApplicationId.newInstance(1, 3);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationSubmissionContext context =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
addApplicationHomeSC(appId, subClusterId, context);

GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId, true);
GetApplicationHomeSubClusterResponse result =
federationStateStore.getApplicationHomeSubCluster(getRequest);

ApplicationHomeSubCluster applicationHomeSubCluster = result.getApplicationHomeSubCluster();

assertEquals(appId, applicationHomeSubCluster.getApplicationId());
assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
}
}
Loading

0 comments on commit b977065

Please sign in to comment.