Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return app submission time for batch #4119

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ private[ctl] object Render {

private def buildBatchAppInfo(batch: Batch, showDiagnostic: Boolean = true): List[String] = {
val batchAppInfo = ListBuffer[String]()
if (batch.getAppSubmissionTime > 0) {
batchAppInfo += s"App Submission Time:" +
s" ${millisToDateString(batch.getAppSubmissionTime, "yyyy-MM-dd HH:mm:ss")}"
}
Option(batch.getAppId).foreach { _ =>
batchAppInfo += s"App Id: ${batch.getAppId}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class Batch {
private String user;
private String batchType;
private String name;
private long appSubmissionTime;
private String appId;
private String appUrl;
private String appState;
Expand All @@ -42,6 +43,7 @@ public Batch(
String user,
String batchType,
String name,
long appSubmissionTime,
String appId,
String appUrl,
String appState,
Expand All @@ -54,6 +56,7 @@ public Batch(
this.user = user;
this.batchType = batchType;
this.name = name;
this.appSubmissionTime = appSubmissionTime;
this.appId = appId;
this.appUrl = appUrl;
this.appState = appState;
Expand Down Expand Up @@ -152,6 +155,14 @@ public void setCreateTime(long createTime) {
this.createTime = createTime;
}

public long getAppSubmissionTime() {
return appSubmissionTime;
}

public void setAppSubmissionTime(long appSubmissionTime) {
this.appSubmissionTime = appSubmissionTime;
}

public long getEndTime() {
return endTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void testNoPasswordBasicClient() {
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getAppSubmissionTime(), expectedBatch.getAppSubmissionTime());
assertEquals(result.getAppId(), expectedBatch.getAppId());
assertEquals(result.getAppUrl(), expectedBatch.getAppUrl());
assertEquals(result.getAppState(), expectedBatch.getAppState());
Expand Down Expand Up @@ -145,6 +146,7 @@ public void testAnonymousBasicClient() {
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getAppSubmissionTime(), expectedBatch.getAppSubmissionTime());
assertEquals(result.getAppId(), expectedBatch.getAppId());
assertEquals(result.getAppUrl(), expectedBatch.getAppUrl());
assertEquals(result.getAppState(), expectedBatch.getAppState());
Expand Down Expand Up @@ -176,6 +178,7 @@ public void createBatchTest() {
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getAppSubmissionTime(), expectedBatch.getAppSubmissionTime());
assertEquals(result.getAppId(), expectedBatch.getAppId());
assertEquals(result.getAppUrl(), expectedBatch.getAppUrl());
assertEquals(result.getAppState(), expectedBatch.getAppState());
Expand Down Expand Up @@ -207,6 +210,7 @@ public void getBatchByIdTest() {
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getAppSubmissionTime(), expectedBatch.getAppSubmissionTime());
assertEquals(result.getAppId(), expectedBatch.getAppId());
assertEquals(result.getAppUrl(), expectedBatch.getAppUrl());
assertEquals(result.getAppState(), expectedBatch.getAppState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static Batch generateTestBatch(String id) {
TEST_USERNAME,
"spark",
"batch_name",
0,
id,
null,
"RUNNING",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE metadata ADD COLUMN engine_open_time bigint;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ CREATE TABLE metadata(
create_time BIGINT NOT NULL, -- the metadata create time
engine_type varchar(32) NOT NULL, -- the engine type
cluster_manager varchar(128), -- the engine cluster manager
engine_open_time bigint, -- the engine open time
engine_id varchar(128), -- the engine application id
engine_name clob, -- the engine application name
engine_url varchar(1024), -- the engine tracking url
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
RUN '001-KYUUBI-3967.derby.sql';
RUN '002-KYUUBI-4119.derby.sql';
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT '< KYUUBI-4119: Return app submission time for batch >' AS ' ';

ALTER TABLE metadata ADD COLUMN engine_open_time bigint COMMENT 'the engine open time';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE TABLE IF NOT EXISTS metadata(
create_time BIGINT NOT NULL COMMENT 'the metadata create time',
engine_type varchar(32) NOT NULL COMMENT 'the engine type',
cluster_manager varchar(128) COMMENT 'the engine cluster manager',
engine_open_time bigint COMMENT 'the engine open time',
engine_id varchar(128) COMMENT 'the engine application id',
engine_name mediumtext COMMENT 'the engine application name',
engine_url varchar(1024) COMMENT 'the engine tracking url',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SELECT '< Upgrading MetaStore schema from 1.6.0 to 1.7.0 >' AS ' ';
SOURCE 001-KYUUBI-3967.mysql.sql;
SOURCE 002-KYUUBI-4119.mysql.sql;
SELECT '< Finished upgrading MetaStore schema from 1.6.0 to 1.7.0 >' AS ' ';
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class BatchJobSubmission(
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage

@volatile private var _appSubmissionTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
def appSubmissionTime: Long = _appSubmissionTime

@VisibleForTesting
private[kyuubi] val builder: ProcBuilder = {
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
Expand All @@ -96,7 +99,14 @@ class BatchJobSubmission(

override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
// only the ApplicationInfo with non-empty id is valid for the operation
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
val applicationInfo =
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
applicationInfo.foreach { _ =>
if (_appSubmissionTime <= 0) {
_appSubmissionTime = System.currentTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's the appStartTime ? Submisstion time is the time before submiting succeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we donot set it in submitAndMonitorBatchJob method ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we donot set it in submitAndMonitorBatchJob method ?

Because in the submitAndMonitorBatchJob method, there is interval to track the app state.
And for the getBatch api, it will also call currentApplicationState to build the batch report, to keep the state align, it is better to update the app submission time in currentApplicationState.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make state machine and the time trace consistent.

If the submitssion time means when the state changes to running, then we should update the time around this code

setStateIfNotCanceled(OperationState.RUNNING)

Otherwise the client may get the batch info with submisstion time = 0 but state is running or submisstion time != 0 but state is pending.

Copy link
Member Author

@turboFei turboFei Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for batch report, it is consistent with the appId/appUrl/appState now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can treat it as a filed of batch application info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the pr for followup #4129

}
}
applicationInfo
}

private[kyuubi] def killBatchApplication(): KillResponse = {
Expand Down Expand Up @@ -127,6 +137,7 @@ class BatchJobSubmission(
val metadataToUpdate = Metadata(
identifier = batchId,
state = state.toString,
engineOpenTime = appSubmissionTime,
engineId = status.id,
engineName = status.name,
engineUrl = status.url.orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
session.user,
batchOp.batchType,
name,
batchOp.appSubmissionTime,
appId,
appUrl,
appState,
Expand Down Expand Up @@ -130,6 +131,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
metadata.username,
metadata.engineType,
name,
metadata.engineOpenTime,
appId,
appUrl,
appState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ object MetadataManager extends Logging {
batchMetadata.username,
batchMetadata.engineType,
name,
batchMetadata.engineOpenTime,
batchMetadata.engineId,
batchMetadata.engineUrl,
batchMetadata.engineState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.kyuubi.session.SessionType.SessionType
* @param createTime the create time.
* @param engineType the engine type.
* @param clusterManager the engine cluster manager.
* @param engineOpenTime the engine open time
* @param engineId the engine id.
* @param engineName the engine name.
* @param engineUrl the engine tracking url.
Expand All @@ -65,6 +66,7 @@ case class Metadata(
createTime: Long = 0L,
engineType: String = null,
clusterManager: Option[String] = None,
engineOpenTime: Long = 0L,
engineId: String = null,
engineName: String = null,
engineUrl: String = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
setClauses += " end_time = ? "
params += metadata.endTime
}
if (metadata.engineOpenTime > 0) {
setClauses += " engine_open_time = ? "
params += metadata.engineOpenTime
}
Option(metadata.engineId).foreach { _ =>
setClauses += " engine_id = ? "
params += metadata.engineId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class KyuubiBatchSessionImpl(

private[kyuubi] def onEngineOpened(): Unit = {
if (sessionEvent.openedTime <= 0) {
sessionEvent.openedTime = System.currentTimeMillis()
sessionEvent.openedTime = batchJobSubmissionOp.appSubmissionTime
EventBus.post(sessionEvent)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
assert(batch.getName === sparkBatchTestAppName)
assert(batch.getCreateTime > 0)
assert(batch.getEndTime === 0)
if (batch.getAppId != null) {
assert(batch.getAppSubmissionTime > 0)
}

// invalid batchId
getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId")
Expand Down