Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;

import org.jline.keymap.KeyMap;
import org.jline.utils.AttributedString;
Expand Down Expand Up @@ -108,7 +108,7 @@ protected void display() {
@Override
protected void refresh() {
// retrieve change record
final TypedResult<List<Row>> result;
final TypedResult<List<RowData>> result;
try {
result =
client.getExecutor()
Expand All @@ -129,9 +129,9 @@ protected void refresh() {
stopRetrieval(false);
break;
default:
List<Row> changes = result.getPayload();
List<RowData> changes = result.getPayload();

for (Row change : changes) {
for (RowData change : changes) {
// convert row
final String[] row =
PrintUtils.rowToString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.ResultMode;
import org.apache.flink.table.client.config.SqlClientOptions;
Expand Down Expand Up @@ -622,15 +623,15 @@ private void callEndStatementSet() {
}

private void executeOperation(Operation operation) {
TableResult result = executor.executeOperation(sessionId, operation);
TableResultInternal result = executor.executeOperation(sessionId, operation);
if (TABLE_RESULT_OK == result) {
// print more meaningful message than tableau OK result
printInfo(MESSAGE_EXECUTE_STATEMENT);
} else {
// print tableau if result has content
PrintUtils.printAsTableauForm(
result.getResolvedSchema(),
result.collect(),
result.collectInternal(),
terminal.writer(),
Integer.MAX_VALUE,
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;

import org.jline.keymap.KeyMap;
import org.jline.utils.AttributedString;
Expand Down Expand Up @@ -296,7 +296,7 @@ protected List<AttributedString> computeFooterLines() {
private void updatePage() {
// retrieve page
final int retrievalPage = page == LAST_PAGE ? pageCount : page;
final List<Row> rows;
final List<RowData> rows;
try {
rows =
client.getExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.jline.terminal.Terminal;
Expand Down Expand Up @@ -124,7 +124,7 @@ private void checkAndCleanUpQuery(boolean cleanUpQuery) {
}

private void printBatchResults(AtomicInteger receivedRowCount) {
final List<Row> resultRows = waitBatchResults();
final List<RowData> resultRows = waitBatchResults();
receivedRowCount.addAndGet(resultRows.size());
PrintUtils.printAsTableauForm(
resultDescriptor.getResultSchema(),
Expand Down Expand Up @@ -157,7 +157,7 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
terminal.flush();

while (true) {
final TypedResult<List<Row>> result =
final TypedResult<List<RowData>> result =
sqlExecutor.retrieveResultChanges(sessionId, resultDescriptor.getResultId());

switch (result.getType()) {
Expand All @@ -184,8 +184,8 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
terminal.flush();
return;
case PAYLOAD:
List<Row> changes = result.getPayload();
for (Row change : changes) {
List<RowData> changes = result.getPayload();
for (RowData change : changes) {
final String[] row =
PrintUtils.rowToString(
change,
Expand All @@ -203,15 +203,15 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
}
}

private List<Row> waitBatchResults() {
List<Row> resultRows = new ArrayList<>();
private List<RowData> waitBatchResults() {
List<RowData> resultRows = new ArrayList<>();
do {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
TypedResult<List<Row>> result =
TypedResult<List<RowData>> result =
sqlExecutor.retrieveResultChanges(sessionId, resultDescriptor.getResultId());

if (result.getType() == TypedResult.ResultType.EOS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -106,19 +107,19 @@ void setSessionProperty(String sessionId, String key, String value)
List<String> completeStatement(String sessionId, String statement, int position);

/** Executes an operation, and return {@link TableResult} as execution result. */
TableResult executeOperation(String sessionId, Operation operation)
TableResultInternal executeOperation(String sessionId, Operation operation)
throws SqlExecutionException;

/** Executes modify operations, and return {@link TableResult} as execution result. */
TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations)
TableResultInternal executeModifyOperations(String sessionId, List<ModifyOperation> operations)
throws SqlExecutionException;

/** Submits a Flink SQL query job (detached) and returns the result descriptor. */
ResultDescriptor executeQuery(String sessionId, QueryOperation query)
throws SqlExecutionException;

/** Asks for the next changelog results (non-blocking). */
TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId)
TypedResult<List<RowData>> retrieveResultChanges(String sessionId, String resultId)
throws SqlExecutionException;

/**
Expand All @@ -132,7 +133,7 @@ TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageS
* Returns the rows that are part of the current page or throws an exception if the snapshot has
* been expired.
*/
List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
List<RowData> retrieveResultPage(String resultId, int page) throws SqlExecutionException;

/**
* Cancels a table program and stops the result retrieval. Blocking until cancellation command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
Expand All @@ -33,11 +33,11 @@
import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.types.Row;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -200,7 +200,7 @@ public List<String> completeStatement(String sessionId, String statement, int po
}

@Override
public TableResult executeOperation(String sessionId, Operation operation)
public TableResultInternal executeOperation(String sessionId, Operation operation)
throws SqlExecutionException {
final ExecutionContext context = getExecutionContext(sessionId);
final TableEnvironmentInternal tEnv =
Expand All @@ -213,8 +213,8 @@ public TableResult executeOperation(String sessionId, Operation operation)
}

@Override
public TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations)
throws SqlExecutionException {
public TableResultInternal executeModifyOperations(
String sessionId, List<ModifyOperation> operations) throws SqlExecutionException {
final ExecutionContext context = getExecutionContext(sessionId);
final TableEnvironmentInternal tEnv =
(TableEnvironmentInternal) context.getTableEnvironment();
Expand All @@ -228,7 +228,7 @@ public TableResult executeModifyOperations(String sessionId, List<ModifyOperatio
@Override
public ResultDescriptor executeQuery(String sessionId, QueryOperation query)
throws SqlExecutionException {
final TableResult tableResult = executeOperation(sessionId, query);
final TableResultInternal tableResult = executeOperation(sessionId, query);
final SessionContext context = getSessionContext(sessionId);
final ReadableConfig config = context.getReadableConfig();
final DynamicResult result = resultStore.createResult(config, tableResult);
Expand All @@ -241,7 +241,7 @@ public ResultDescriptor executeQuery(String sessionId, QueryOperation query)
}

@Override
public TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId)
public TypedResult<List<RowData>> retrieveResultChanges(String sessionId, String resultId)
throws SqlExecutionException {
final DynamicResult result = resultStore.getResult(resultId);
if (result == null) {
Expand Down Expand Up @@ -269,7 +269,8 @@ public TypedResult<Integer> snapshotResult(String sessionId, String resultId, in
}

@Override
public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
public List<RowData> retrieveResultPage(String resultId, int page)
throws SqlExecutionException {
final DynamicResult result = resultStore.getResult(resultId);
if (result == null) {
throw new SqlExecutionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.result.ChangelogCollectResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
Expand Down Expand Up @@ -50,7 +50,7 @@ public ResultStore() {
* Creates a result. Might start threads or opens sockets so every created result must be
* closed.
*/
public DynamicResult createResult(ReadableConfig config, TableResult tableResult) {
public DynamicResult createResult(ReadableConfig config, TableResultInternal tableResult) {
// validate
if (config.get(EXECUTION_RESULT_MODE).equals(CHANGELOG)
&& config.get(RUNTIME_MODE).equals(RuntimeExecutionMode.BATCH)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
package org.apache.flink.table.client.gateway.local.result;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.table.data.RowData;

import java.util.ArrayList;
import java.util.List;

/** Collects results and returns them as a changelog. */
public class ChangelogCollectResult extends CollectResultBase implements ChangelogResult {

private final List<Row> changeRecordBuffer;
private final List<RowData> changeRecordBuffer;
@VisibleForTesting protected static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;

public ChangelogCollectResult(TableResult tableResult) {
public ChangelogCollectResult(TableResultInternal tableResult) {
super(tableResult);
// prepare for changelog
changeRecordBuffer = new ArrayList<>();
Expand All @@ -45,23 +45,23 @@ public boolean isMaterialized() {
}

@Override
public TypedResult<List<Row>> retrieveChanges() {
public TypedResult<List<RowData>> retrieveChanges() {
synchronized (resultLock) {
// retrieval thread is alive return a record if available
// but the program must not have failed
if (isRetrieving() && executionException.get() == null) {
if (changeRecordBuffer.isEmpty()) {
return TypedResult.empty();
} else {
final List<Row> change = new ArrayList<>(changeRecordBuffer);
final List<RowData> change = new ArrayList<>(changeRecordBuffer);
changeRecordBuffer.clear();
resultLock.notify();
return TypedResult.payload(change);
}
}
// retrieval thread is dead but there is still a record to be delivered
else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
final List<Row> change = new ArrayList<>(changeRecordBuffer);
final List<RowData> change = new ArrayList<>(changeRecordBuffer);
changeRecordBuffer.clear();
return TypedResult.payload(change);
}
Expand All @@ -75,7 +75,7 @@ else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
// --------------------------------------------------------------------------------------------

@Override
protected void processRecord(Row row) {
protected void processRecord(RowData row) {
synchronized (resultLock) {
// wait if the buffer is full
if (changeRecordBuffer.size() >= CHANGE_RECORD_BUFFER_SIZE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.table.client.gateway.local.result;

import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.table.data.RowData;

import java.util.List;

/** A result that is represented as a changelog consisting of insert and delete records. */
public interface ChangelogResult extends DynamicResult {

/** Retrieves the available result records. */
TypedResult<List<Row>> retrieveChanges();
TypedResult<List<RowData>> retrieveChanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@
package org.apache.flink.table.client.gateway.local.result;

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.CloseableIterator;

import java.util.concurrent.atomic.AtomicReference;

/** A result that works through {@link TableResult#collect()}. */
public abstract class CollectResultBase implements DynamicResult {
private final CloseableIterator<Row> result;
private final CloseableIterator<RowData> result;

protected final Object resultLock;
protected AtomicReference<SqlExecutionException> executionException = new AtomicReference<>();
protected final ResultRetrievalThread retrievalThread;

public CollectResultBase(TableResult tableResult) {
result = tableResult.collect();
public CollectResultBase(TableResultInternal tableResult) {
result = tableResult.collectInternal();
resultLock = new Object();
retrievalThread = new ResultRetrievalThread();
}
Expand All @@ -56,7 +57,7 @@ protected <T> TypedResult<T> handleMissingResult() {
return TypedResult.endOfStream();
}

protected abstract void processRecord(Row row);
protected abstract void processRecord(RowData row);

protected boolean isRetrieving() {
return retrievalThread.isRunning;
Expand Down
Loading