Skip to content

Commit

Permalink
add query lookback to check queries that have ended for cases where d…
Browse files Browse the repository at this point in the history
…ata updates after completion
  • Loading branch information
menishmueli committed Mar 25, 2024
1 parent 84b0444 commit c43c870
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import org.apache.spark.util.kvstore.KVStore

class DataflintStore(val store: KVStore) {
def icebergCommits(offset: Int, length: Int): Seq[IcebergCommitInfo] = {
KVUtils.mapToSeq(store.view(classOf[IcebergCommitWrapper]).skip(offset).max(length))(_.info).sortBy(_.executionId)
KVUtils.mapToSeq(store.view(classOf[IcebergCommitWrapper]))(_.info).filter(_.executionId >= offset).take(length).sortBy(_.executionId)
}
}
2 changes: 1 addition & 1 deletion spark-ui/src/reducers/SqlReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ export function calculateSqlStore(
// case 1: SQL does not exist, we add it
if (currentSql === undefined) {
updatedSqls.push(calculateSql(newSql, plan, icebergCommit));
// From here currentSql must not be null, and currentSql can't be COMPLETED as it would not be requested by API
// From here currentSql must not be null
// case 2: plan status changed from running to completed, so we need to update the SQL
} else if (
newSql.status === SqlStatus.Completed.valueOf() ||
Expand Down
33 changes: 29 additions & 4 deletions spark-ui/src/services/SparkApi.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ import {
updateDuration,
} from "../reducers/SparkSlice";
import { AppDispatch } from "../Store";
import { timeStrToEpocTime } from "../utils/FormatUtils";
import { IS_HISTORY_SERVER_MODE } from "../utils/UrlConsts";
import { isDataFlintSaaSUI } from "../utils/UrlUtils";
import { MixpanelService } from "./MixpanelService";

const POLL_TIME = 1000;
const SQL_QUERY_LENGTH = 1000;
const SQL_LOOKBACK_QUERY_TIME = 5000;

type SQLFinishTime = {
id: number;
finishTime: number;
};

class SparkAPI {
basePath: string;
Expand All @@ -39,10 +46,23 @@ class SparkAPI {
applicationsPath: string;
dispatch: AppDispatch;
lastCompletedSqlId: number = -1;
sqlIdToFinishTime: Record<number, SQLFinishTime> = {};
pollingStopped: boolean = false;
historyServerMode: boolean = false;
icebergEnabled: boolean = false;

private findSqlIdToQueryFrom(): number {
const currentTime = Date.now();
const ids = Object.keys(this.sqlIdToFinishTime).map(id => parseInt(id));
const lookbackSqls = ids.filter(id => this.sqlIdToFinishTime[id] !== null && this.sqlIdToFinishTime[id].finishTime + SQL_LOOKBACK_QUERY_TIME > currentTime);

if (lookbackSqls.length > 0) {
return Math.min(...lookbackSqls);
}

return this.lastCompletedSqlId + 1;
}

private get applicationPath(): string {
return (
`${this.apiPath}/applications/${this.appId}` +
Expand Down Expand Up @@ -241,15 +261,16 @@ class SparkAPI {
const sparkJobs: SparkJobs = await this.queryData(this.jobsPath);
this.dispatch(setSparkJobs({ value: sparkJobs }));

const sqlIdToQueryFrom = this.findSqlIdToQueryFrom();
const sparkSQLs: SparkSQLs = await this.queryData(
this.buildSqlPath(this.lastCompletedSqlId + 1),
this.buildSqlPath(sqlIdToQueryFrom),
);
const sparkPlans: SQLPlans = await this.queryData(
this.buildSqlPlanPath(this.lastCompletedSqlId + 1),
this.buildSqlPlanPath(sqlIdToQueryFrom),
);
let icebergInfo: IcebergInfo = { commitsInfo: [] };
if (this.icebergEnabled) {
icebergInfo = await this.queryData(this.buildIcebergPath(this.lastCompletedSqlId + 1));
icebergInfo = await this.queryData(this.buildIcebergPath(sqlIdToQueryFrom));
}

if (sparkSQLs.length !== 0) {
Expand All @@ -264,9 +285,13 @@ class SparkAPI {
if (finishedSqls.length > 0) {
// in cases of SQLs out of order, like id 2 is running and 3 is completed, we will try to ask from id 2 again
finishedSqls.forEach((sql) => {
if (parseInt(sql.id) === this.lastCompletedSqlId + 1) {
const idAsNumber = parseInt(sql.id)
if (idAsNumber === this.lastCompletedSqlId + 1) {
this.lastCompletedSqlId += 1;
}
if (this.sqlIdToFinishTime[idAsNumber] === undefined) {
this.sqlIdToFinishTime[idAsNumber] = { id: idAsNumber, finishTime: timeStrToEpocTime(sql.submissionTime) + sql.duration };
}
});
}

Expand Down

0 comments on commit c43c870

Please sign in to comment.