Skip to content

Commit

Permalink
[KYUUBI #1681] Extend Flink ExecuteStatement to support more operations
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

ExecuteStatement of Flink engine now supports QueryOperation only. We can extend it to support more operations that Flink Executor already supports. e.g. ShowTableOperation,CreateTableOperation, ExplainOperation.

This is a sub-task of KPIP-2 #1322 .

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1693 from link3280/feature/extend_flink_operation.

Closes #1681

520d1f4 [Paul Lin] [KYUUBI #1681] Extend Flink ExecuteStatement to support more operations

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: yanghua <yanghua1127@gmail.com>
  • Loading branch information
link3280 authored and yanghua committed Jan 10, 2022
1 parent 6f4427b commit e2fb7d6
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 165 deletions.

This file was deleted.

Expand Up @@ -20,6 +20,10 @@

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;

Expand All @@ -40,9 +44,14 @@ public static ResultSet stringListToResultSet(List<String> strings, String colum
}
}

DataType dataType = DataTypes.VARCHAR(maxLength);
if (!isNullable) {
dataType.notNull();
}

return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(ColumnInfo.create(columnName, new VarCharType(isNullable, maxLength)))
.columns(Column.physical(columnName, dataType))
.data(data.toArray(new Row[0]))
.build();
}
Expand Down

This file was deleted.

Expand Up @@ -19,24 +19,29 @@
package org.apache.kyuubi.engine.flink.result;

import com.google.common.collect.Iterators;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.kyuubi.operation.ArrayFetchIterator;
import org.apache.kyuubi.operation.FetchIterator;

/**
* A set of one statement execution result containing result kind, column infos, rows of data and
* change flags for streaming mode.
* A set of one statement execution result containing result kind, columns, rows of data and change
* flags for streaming mode.
*/
public class ResultSet {

private final ResultKind resultKind;
private final List<ColumnInfo> columns;
private final List<Column> columns;
private final FetchIterator<Row> data;

// null in batch mode
Expand All @@ -47,7 +52,7 @@ public class ResultSet {

private ResultSet(
ResultKind resultKind,
List<ColumnInfo> columns,
List<Column> columns,
FetchIterator<Row> data,
@Nullable List<Boolean> changeFlags) {
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
Expand All @@ -61,7 +66,7 @@ private ResultSet(
}
}

public List<ColumnInfo> getColumns() {
public List<Column> getColumns() {
return columns;
}

Expand Down Expand Up @@ -103,14 +108,27 @@ public String toString() {
+ '}';
}

public static ResultSet fromTableResult(TableResult tableResult) {
ResolvedSchema schema = tableResult.getResolvedSchema();
// collect all rows from table result as list
// this is ok as TableResult contains limited rows
List<Row> rows = new ArrayList<>();
tableResult.collect().forEachRemaining(rows::add);
return builder()
.resultKind(tableResult.getResultKind())
.columns(schema.getColumns())
.data(rows.toArray(new Row[0]))
.build();
}

public static Builder builder() {
return new Builder();
}

/** Builder for {@link ResultSet}. */
public static class Builder {
private ResultKind resultKind = null;
private List<ColumnInfo> columns = null;
private List<Column> columns = null;
private FetchIterator<Row> data = null;
private List<Boolean> changeFlags = null;

Expand All @@ -122,14 +140,14 @@ public Builder resultKind(ResultKind resultKind) {
return this;
}

/** Set {@link ColumnInfo}s. */
public Builder columns(ColumnInfo... columns) {
/** Set columns. */
public Builder columns(Column... columns) {
this.columns = Arrays.asList(columns);
return this;
}

/** Set {@link ColumnInfo}s. */
public Builder columns(List<ColumnInfo> columns) {
/** Set columns. */
public Builder columns(List<Column> columns) {
this.columns = columns;
return this;
}
Expand Down
Expand Up @@ -17,19 +17,19 @@

package org.apache.kyuubi.engine.flink.operation

import java.util
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.google.common.annotations.VisibleForTesting
import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.types.Row

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand All @@ -45,10 +45,6 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)

private var resultDescriptor: ResultDescriptor = _

private var columnInfos: util.List[ColumnInfo] = _

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

override def getOperationLog: Option[OperationLog] = Option(operationLog)
Expand Down Expand Up @@ -101,46 +97,55 @@ class ExecuteStatement(
try {
setState(OperationState.RUNNING)

columnInfos = new util.ArrayList[ColumnInfo]

val operation = executor.parseStatement(sessionId, statement)
resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
case operation: Operation => runOperation(operation)
}

val resultID = resultDescriptor.getResultId

val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down

val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}

resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
}
}

private def runQueryOperation(operation: QueryOperation): Unit = {
val resultDescriptor = executor.executeQuery(sessionId, operation)

val resultID = resultDescriptor.getResultId

val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down

val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}

resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
}

private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
result.await()
resultSet = ResultSet.fromTableResult(result)
setState(OperationState.FINISHED)
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
Expand Down

0 comments on commit e2fb7d6

Please sign in to comment.