Skip to content

Commit

Permalink
[FLINK-35195][table] Support the execution of create materialized tab…
Browse files Browse the repository at this point in the history
…le in continuous refresh mode
  • Loading branch information
lsyldliu committed Apr 30, 2024
1 parent 9d52413 commit ebf15b9
Show file tree
Hide file tree
Showing 23 changed files with 1,192 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.service.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.DeploymentOptions.TARGET;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.configuration.PipelineOptions.NAME;
import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;

/** Manager is responsible for execute the {@link MaterializedTableOperation}. */
@Internal
public class MaterializedTableManager {

private final OperationExecutor operationExecutor;

public MaterializedTableManager(OperationExecutor operationExecutor) {
this.operationExecutor = operationExecutor;
}

public ResultFetcher callMaterializedTableOperation(
TableEnvironmentInternal tableEnv,
OperationHandle handle,
MaterializedTableOperation op,
String statement) {
if (op instanceof CreateMaterializedTableOperation) {
return callCreateMaterializedTableOperation(
handle, (CreateMaterializedTableOperation) op);
}
throw new SqlExecutionException(
String.format(
"Unsupported Operation %s for materialized table.", op.asSummaryString()));
}

private ResultFetcher callCreateMaterializedTableOperation(
OperationHandle handle,
CreateMaterializedTableOperation createMaterializedTableOperation) {
CatalogMaterializedTable materializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
createMaterializedInContinuousMode(handle, createMaterializedTableOperation);
} else {
throw new SqlExecutionException(
"Only support create materialized table in continuous refresh mode currently.");
}
// Just return ok for unify different refresh job info of continuous and full mode, user
// should get the refresh job info via desc table.
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private void createMaterializedInContinuousMode(
OperationHandle handle,
CreateMaterializedTableOperation createMaterializedTableOperation) {
// create materialized table first
operationExecutor.callExecutableOperation(handle, createMaterializedTableOperation);

ObjectIdentifier materializedTableIdentifier =
createMaterializedTableOperation.getTableIdentifier();
CatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
// Set job name
String jobName =
String.format(
"Materialized_table_%s_continuous_refresh_job",
materializedTableIdentifier.asSerializableString());
operationExecutor.getSessionContext().set(NAME.key(), jobName);
// Set execution mode in executionConfig, which is only work for this execution
operationExecutor.getSessionContext().set(RUNTIME_MODE.key(), STREAMING.name());
// TODO Set checkpoint & minibatch related options.

// Generate insert into statement
String insertStatement =
String.format(
"INSERT INTO %s %s",
materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery());
// submit flink streaming job
ResultFetcher resultFetcher = operationExecutor.executeStatement(handle, insertStatement);

// get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't get
// clusterId
List<RowData> results = fetchAllResults(resultFetcher);
String jobId = results.get(0).getString(0).toString();
String executeTarget = operationExecutor.getSessionContext().getSessionConf().get(TARGET);
ContinuousRefreshHandler continuousRefreshHandler =
new ContinuousRefreshHandler(executeTarget, jobId);
byte[] serializedBytes;
try {
serializedBytes =
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
} catch (IOException e) {
throw new TableException("Serialize ContinuousRefreshHandler occur exception.", e);
}

// update RefreshHandler to Catalog
CatalogMaterializedTable updatedMaterializedTable =
catalogMaterializedTable.copy(
CatalogMaterializedTable.RefreshStatus.ACTIVATED,
continuousRefreshHandler.asSummaryString(),
serializedBytes);
List<TableChange.MaterializedTableChange> tableChanges = new ArrayList<>();
tableChanges.add(
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
tableChanges.add(
TableChange.modifyRefreshHandler(
continuousRefreshHandler.asSummaryString(), serializedBytes));
AlterMaterializedTableChangeOperation alterDynamicTableChangeOperation =
new AlterMaterializedTableChangeOperation(
materializedTableIdentifier, tableChanges, updatedMaterializedTable);
operationExecutor.callExecutableOperation(handle, alterDynamicTableChangeOperation);
}

private List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
Long token = 0L;
List<RowData> results = new ArrayList<>();
while (token != null) {
ResultSet result = resultFetcher.fetchResults(token, Integer.MAX_VALUE);
results.addAll(result.getData());
token = result.getNextToken();
}
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.util.CollectionUtil;
Expand Down Expand Up @@ -142,11 +144,14 @@ public class OperationExecutor {

private final ClusterClientServiceLoader clusterClientServiceLoader;

private final MaterializedTableManager materializedTableManager;

@VisibleForTesting
public OperationExecutor(SessionContext context, Configuration executionConfig) {
this.sessionContext = context;
this.executionConfig = executionConfig;
this.clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
this.materializedTableManager = new MaterializedTableManager(this);
}

public ResultFetcher configureSession(OperationHandle handle, String statement) {
Expand Down Expand Up @@ -489,12 +494,15 @@ private ResultFetcher executeOperation(
|| op instanceof CreateCatalogFunctionOperation
|| op instanceof ShowFunctionsOperation) {
return callExecutableOperation(handle, (ExecutableOperation) op);
} else if (op instanceof MaterializedTableOperation) {
return materializedTableManager.callMaterializedTableOperation(
tableEnv, handle, (MaterializedTableOperation) op, statement);
} else {
return callOperation(tableEnv, handle, op);
}
}

private ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) {
public ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) {
TableResultInternal result =
op.execute(
new ExecutableOperationContextImpl(
Expand All @@ -518,6 +526,10 @@ private TableConfig tableConfig() {
return tableConfig;
}

public SessionContext getSessionContext() {
return sessionContext;
}

private ResultFetcher callSetOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
Expand Down Expand Up @@ -579,7 +591,7 @@ private ResultFetcher callEndStatementSetOperation(
}
}

private ResultFetcher callModifyOperations(
public ResultFetcher callModifyOperations(
TableEnvironmentInternal tableEnv,
OperationHandle handle,
List<ModifyOperation> modifyOperations) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.service;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND;

/**
* ITCase for materialized table related statement via {@link SqlGatewayServiceImpl}. Use a separate
* test class rather than adding test cases to {@link SqlGatewayServiceITCase}, both because the
* syntax related to Materialized table is relatively independent, and to try to avoid conflicts
* with the code in {@link SqlGatewayServiceITCase}.
*/
public class MaterializedTableStatementITCase {

private static final String FILE_CATALOG_STORE = "file_store";
private static final String TEST_CATALOG = "test_catalog";
private static final String TEST_DEFAULT_DATABASE = "test_db";

@RegisterExtension
@Order(1)
static final MiniClusterExtension MINI_CLUSTER =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.build());

@RegisterExtension
@Order(2)
static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);

@RegisterExtension
@Order(3)
static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
new TestExecutorExtension<>(
() ->
Executors.newCachedThreadPool(
new ExecutorThreadFactory(
"SqlGatewayService Test Pool",
IgnoreExceptionHandler.INSTANCE)));

@TempDir static File tempFile;

private static SqlGatewayServiceImpl service;

private static SessionEnvironment defaultSessionEnvironment;
private static String fileSystemCatalogPath;

@BeforeAll
static void setUp() {
service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService();

// initialize file catalog store path
File fileCatalogStore = new File(tempFile, FILE_CATALOG_STORE);
fileCatalogStore.mkdir();
Map<String, String> catalogStoreOptions = new HashMap<>();
catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file");
catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString());
// initialize test-filesystem catalog path
File fileCatalogPath = new File(tempFile, TEST_CATALOG);
fileCatalogPath.mkdir();
File defaultDBPth = new File(fileCatalogPath, TEST_DEFAULT_DATABASE);
defaultDBPth.mkdir();
fileSystemCatalogPath = fileCatalogPath.getPath();

defaultSessionEnvironment =
SessionEnvironment.newBuilder()
.addSessionConfig(catalogStoreOptions)
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.build();
}

@Test
void testCreateMaterializedTableInContinuousMode() throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
// create test-filesystem catalog
initializeTestFileSystemCatalog(sessionHandle);
}

private void initializeTestFileSystemCatalog(SessionHandle sessionHandle) {
String fileSystemCatalogDDL =
String.format(
"CREATE CATALOG %s\n"
+ "WITH (\n"
+ " 'type' = 'test-filesystem',\n"
+ " 'path' = '%s',\n"
+ " 'default-database' = '%s'\n"
+ " )",
TEST_CATALOG, fileSystemCatalogPath, TEST_DEFAULT_DATABASE);
service.executeStatement(sessionHandle, fileSystemCatalogDDL, -1, new Configuration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static void validateAndChangeColumnNullability(
}

/** Check table constraint. */
private static void validate(SqlTableConstraint constraint) throws SqlValidateException {
public static void validate(SqlTableConstraint constraint) throws SqlValidateException {
if (constraint.isUnique()) {
throw new SqlValidateException(
constraint.getParserPosition(), "UNIQUE constraint is not supported yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public SqlIntervalLiteral getFreshness() {
return freshness;
}

@Nullable
public Optional<SqlLiteral> getRefreshMode() {
return Optional.ofNullable(refreshMode);
}
Expand Down

0 comments on commit ebf15b9

Please sign in to comment.