Skip to content

Commit

Permalink
[FLINK-35194][table] Support describe job with job id
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyangzhong committed Apr 28, 2024
1 parent f886687 commit 83116a8
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
Expand Down Expand Up @@ -481,6 +482,8 @@ private ResultFetcher executeOperation(
return callStopJobOperation(tableEnv, handle, (StopJobOperation) op);
} else if (op instanceof ShowJobsOperation) {
return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) op);
} else if (op instanceof DescribeJobOperation) {
return callDescribeJobOperation(tableEnv, handle, (DescribeJobOperation) op);
} else if (op instanceof RemoveJarOperation) {
return callRemoveJar(handle, ((RemoveJarOperation) op).getPath());
} else if (op instanceof AddJarOperation
Expand Down Expand Up @@ -774,6 +777,53 @@ public ResultFetcher callShowJobsOperation(
resultRows);
}

public ResultFetcher callDescribeJobOperation(
TableEnvironmentInternal tableEnv,
OperationHandle operationHandle,
DescribeJobOperation describeJobOperation)
throws SqlExecutionException {
Configuration configuration = tableEnv.getConfig().getConfiguration();
Duration clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
String jobId = describeJobOperation.getJobId();
Optional<JobStatusMessage> jobStatusOp =
runClusterAction(
configuration,
operationHandle,
clusterClient -> {
try {
JobID expectedJobId = JobID.fromHexString(jobId);
return clusterClient.listJobs()
.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS)
.stream()
.filter(job -> expectedJobId.equals(job.getJobId()))
.findFirst();
} catch (Exception e) {
throw new SqlExecutionException(
"Failed to get jobs in the cluster.", e);
}
});

if (!jobStatusOp.isPresent()) {
throw new SqlExecutionException("The job described by " + jobId + " does not exist.");
}
JobStatusMessage job = jobStatusOp.get();

RowData resultRow =
GenericRowData.of(
StringData.fromString(jobId),
StringData.fromString(job.getJobName()),
StringData.fromString(job.getJobState().toString()),
DateTimeUtils.toTimestampData(job.getStartTime(), 3));
return ResultFetcher.fromResults(
operationHandle,
ResolvedSchema.of(
Column.physical(JOB_ID, DataTypes.STRING()),
Column.physical(JOB_NAME, DataTypes.STRING()),
Column.physical(STATUS, DataTypes.STRING()),
Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
Collections.singletonList(resultRow));
}

/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,57 @@ void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restCluster
.isBetween(timeOpStart, timeOpSucceed);
}

@Test
void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());

String pipelineName = "test-describe-job";
configuration.set(PipelineOptions.NAME, pipelineName);

// running jobs
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
String insertSql = "INSERT INTO sink SELECT * FROM source;";

service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
service.executeStatement(sessionHandle, sinkDdl, -1, configuration);

long timeOpStart = System.currentTimeMillis();
OperationHandle insertsOperationHandle =
service.executeStatement(sessionHandle, insertSql, -1, configuration);
String jobId =
fetchAllResults(sessionHandle, insertsOperationHandle)
.get(0)
.getString(0)
.toString();

TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
long timeOpSucceed = System.currentTimeMillis();

OperationHandle describeJobOperationHandle =
service.executeStatement(
sessionHandle,
String.format("DESCRIBE JOB '%s'", jobId),
-1,
configuration);

List<RowData> result = fetchAllResults(sessionHandle, describeJobOperationHandle);
RowData jobRow =
result.stream()
.filter(row -> jobId.equals(row.getString(0).toString()))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Test job " + jobId + " not found."));
assertThat(jobRow.getString(1)).hasToString(pipelineName);
assertThat(jobRow.getString(2)).hasToString("RUNNING");
assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
.isBetween(timeOpStart, timeOpSucceed);
}

// --------------------------------------------------------------------------------------------
// Catalog API tests
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
"org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
"org.apache.flink.sql.parser.ddl.SqlStopJob"
"org.apache.flink.sql.parser.dql.SqlShowJobs"
"org.apache.flink.sql.parser.dql.SqlDescribeJob"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
Expand Down Expand Up @@ -554,6 +555,7 @@
# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
# Note: move SqlRichDescribeTable at last, otherwise all DESCRIBE syntax will fall into this method
statementParserMethods: [
"RichSqlInsert()"
"SqlBeginStatementSet()"
Expand All @@ -573,7 +575,6 @@
"SqlShowColumns()"
"SqlShowCreate()"
"SqlReplaceTable()"
"SqlRichDescribeTable()"
"SqlAlterTable()"
"SqlAlterView()"
"SqlShowModules()"
Expand All @@ -596,6 +597,8 @@
"SqlStopJob()"
"SqlShowJobs()"
"SqlTruncateTable()"
"SqlDescribeJob()"
"SqlRichDescribeTable()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,24 @@ SqlShowJobs SqlShowJobs() :
}
}

/**
* Parse a "DESCRIBE JOB" statement:
* DESCRIBE | DESC JOB <JOB_ID>
*/
SqlDescribeJob SqlDescribeJob() :
{
SqlCharStringLiteral jobId;
SqlParserPos pos;
}
{
( <DESCRIBE> | <DESC> ) <JOB> <QUOTED_STRING>
{
String id = SqlParserUtil.parseString(token.image);
jobId = SqlLiteral.createCharString(id, getPos());
return new SqlDescribeJob(getPos(), jobId);
}
}

/**
* Parses a STOP JOB statement:
* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.NlsString;

import java.util.Collections;
import java.util.List;

/** DESCRIBE | DESC &lt;JOB_ID&gt; sql call. */
public class SqlDescribeJob extends SqlCall {

public static final SqlOperator OPERATOR =
new SqlSpecialOperator("DESCRIBE JOB", SqlKind.OTHER);

private final SqlCharStringLiteral jobId;

public SqlDescribeJob(SqlParserPos pos, SqlCharStringLiteral jobId) {
super(pos);
this.jobId = jobId;
}

public String getJobId() {
return jobId.getValueAs(NlsString.class).getValue();
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.singletonList(jobId);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DESCRIBE JOB");
jobId.unparse(writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2953,6 +2953,12 @@ void testStopJob() {
.fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
}

@Test
void testDescribeJob() {
sql("DESCRIBE JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
sql("DESC JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
}

@Test
void testTruncateTable() {
sql("truncate table t1").ok("TRUNCATE TABLE `T1`");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.Operation;

/** Operation to describe a DESCRIBE JOB statement. */
@Internal
public class DescribeJobOperation implements Operation, ExecutableOperation {

private final String jobId;

public DescribeJobOperation(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

@Override
public String asSummaryString() {
return String.format("DESCRIBE JOB '%s'", jobId);
}

@Override
public TableResultInternal execute(Context ctx) {
// TODO: We may need to migrate the execution for ShowJobsOperation from SQL Gateway
// OperationExecutor to here.
throw new UnsupportedOperationException(
"DescribeJobOperation does not support ExecutableOperation yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.sql.parser.dml.SqlExecute;
import org.apache.flink.sql.parser.dml.SqlExecutePlan;
import org.apache.flink.sql.parser.dml.SqlStatementSet;
import org.apache.flink.sql.parser.dql.SqlDescribeJob;
import org.apache.flink.sql.parser.dql.SqlLoadModule;
import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlRichExplain;
Expand Down Expand Up @@ -141,6 +142,7 @@
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
Expand Down Expand Up @@ -345,6 +347,8 @@ private static Optional<Operation> convertValidatedSqlNode(
return Optional.of(converter.convertShowJars((SqlShowJars) validated));
} else if (validated instanceof SqlShowJobs) {
return Optional.of(converter.convertShowJobs((SqlShowJobs) validated));
} else if (validated instanceof SqlDescribeJob) {
return Optional.of(converter.convertDescribeJob((SqlDescribeJob) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlBeginStatementSet) {
Expand Down Expand Up @@ -1273,6 +1277,10 @@ private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
return new ShowJobsOperation();
}

private Operation convertDescribeJob(SqlDescribeJob sqlDescribeJob) {
return new DescribeJobOperation(sqlDescribeJob.getJobId());
}

private Operation convertStopJob(SqlStopJob sqlStopJob) {
return new StopJobOperation(
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, SqlUseModules}
import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlCompileAndExecutePlan, SqlEndStatementSet, SqlExecute, SqlExecutePlan, SqlStatementSet, SqlTruncateTable}
import org.apache.flink.sql.parser.dml._
import org.apache.flink.sql.parser.dql._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.hint.FlinkHints
Expand Down Expand Up @@ -146,6 +146,7 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlShowProcedures]
|| sqlNode.isInstanceOf[SqlShowJobs]
|| sqlNode.isInstanceOf[SqlDescribeJob]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]
Expand Down

0 comments on commit 83116a8

Please sign in to comment.