diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala index 0aca308..121be8a 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala @@ -5,6 +5,6 @@ import org.apache.spark.util.kvstore.KVStore class DataflintStore(val store: KVStore) { def icebergCommits(offset: Int, length: Int): Seq[IcebergCommitInfo] = { - KVUtils.mapToSeq(store.view(classOf[IcebergCommitWrapper]).skip(offset).max(length))(_.info).sortBy(_.executionId) + KVUtils.mapToSeq(store.view(classOf[IcebergCommitWrapper]))(_.info).filter(_.executionId >= offset).take(length).sortBy(_.executionId) } } diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index dc7304f..c02b1a3 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -340,7 +340,7 @@ export function calculateSqlStore( // case 1: SQL does not exist, we add it if (currentSql === undefined) { updatedSqls.push(calculateSql(newSql, plan, icebergCommit)); - // From here currentSql must not be null, and currentSql can't be COMPLETED as it would not be requested by API + // From here currentSql must not be null // case 2: plan status changed from running to completed, so we need to update the SQL } else if ( newSql.status === SqlStatus.Completed.valueOf() || diff --git a/spark-ui/src/services/SparkApi.tsx b/spark-ui/src/services/SparkApi.tsx index 66cde10..069bc9d 100644 --- a/spark-ui/src/services/SparkApi.tsx +++ b/spark-ui/src/services/SparkApi.tsx @@ -21,12 +21,19 @@ import { updateDuration, } from "../reducers/SparkSlice"; import { AppDispatch } from "../Store"; +import { timeStrToEpocTime } from "../utils/FormatUtils"; import { IS_HISTORY_SERVER_MODE } from "../utils/UrlConsts"; import { isDataFlintSaaSUI } from "../utils/UrlUtils"; import { MixpanelService } from "./MixpanelService"; const POLL_TIME = 1000; const SQL_QUERY_LENGTH = 1000; +const SQL_LOOKBACK_QUERY_TIME = 5000; + +type SQLFinishTime = { + id: number; + finishTime: number; +}; class SparkAPI { basePath: string; @@ -39,10 +46,23 @@ class SparkAPI { applicationsPath: string; dispatch: AppDispatch; lastCompletedSqlId: number = -1; + sqlIdToFinishTime: Record = {}; pollingStopped: boolean = false; historyServerMode: boolean = false; icebergEnabled: boolean = false; + private findSqlIdToQueryFrom(): number { + const currentTime = Date.now(); + const ids = Object.keys(this.sqlIdToFinishTime).map(id => parseInt(id)); + const lookbackSqls = ids.filter(id => this.sqlIdToFinishTime[id] !== null && this.sqlIdToFinishTime[id].finishTime + SQL_LOOKBACK_QUERY_TIME > currentTime); + + if (lookbackSqls.length > 0) { + return Math.min(...lookbackSqls); + } + + return this.lastCompletedSqlId + 1; + } + private get applicationPath(): string { return ( `${this.apiPath}/applications/${this.appId}` + @@ -241,15 +261,16 @@ class SparkAPI { const sparkJobs: SparkJobs = await this.queryData(this.jobsPath); this.dispatch(setSparkJobs({ value: sparkJobs })); + const sqlIdToQueryFrom = this.findSqlIdToQueryFrom(); const sparkSQLs: SparkSQLs = await this.queryData( - this.buildSqlPath(this.lastCompletedSqlId + 1), + this.buildSqlPath(sqlIdToQueryFrom), ); const sparkPlans: SQLPlans = await this.queryData( - this.buildSqlPlanPath(this.lastCompletedSqlId + 1), + this.buildSqlPlanPath(sqlIdToQueryFrom), ); let icebergInfo: IcebergInfo = { commitsInfo: [] }; if (this.icebergEnabled) { - icebergInfo = await this.queryData(this.buildIcebergPath(this.lastCompletedSqlId + 1)); + icebergInfo = await this.queryData(this.buildIcebergPath(sqlIdToQueryFrom)); } if (sparkSQLs.length !== 0) { @@ -264,9 +285,13 @@ class SparkAPI { if (finishedSqls.length > 0) { // in cases of SQLs out of order, like id 2 is running and 3 is completed, we will try to ask from id 2 again finishedSqls.forEach((sql) => { - if (parseInt(sql.id) === this.lastCompletedSqlId + 1) { + const idAsNumber = parseInt(sql.id) + if (idAsNumber === this.lastCompletedSqlId + 1) { this.lastCompletedSqlId += 1; } + if (this.sqlIdToFinishTime[idAsNumber] === undefined) { + this.sqlIdToFinishTime[idAsNumber] = { id: idAsNumber, finishTime: timeStrToEpocTime(sql.submissionTime) + sql.duration }; + } }); }