Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35197][table] Support the execution of supsend&resume materialized table in continuous refresh mode #24765

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
Expand All @@ -46,17 +49,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.configuration.DeploymentOptions.TARGET;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.configuration.PipelineOptions.NAME;
import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
Expand All @@ -78,6 +87,12 @@ public static ResultFetcher callMaterializedTableOperation(
} else if (op instanceof AlterMaterializedTableRefreshOperation) {
return callAlterMaterializedTableRefreshOperation(
operationExecutor, handle, (AlterMaterializedTableRefreshOperation) op);
} else if (op instanceof AlterMaterializedTableSuspendOperation) {
return callAlterMaterializedTableSuspend(
operationExecutor, handle, (AlterMaterializedTableSuspendOperation) op);
} else if (op instanceof AlterMaterializedTableResumeOperation) {
return callAlterMaterializedTableResume(
operationExecutor, handle, (AlterMaterializedTableResumeOperation) op);
}

throw new SqlExecutionException(
Expand Down Expand Up @@ -115,6 +130,105 @@ private static void createMaterializedInContinuousMode(
CatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();

try {
executeContinuousRefreshJob(
operationExecutor,
handle,
catalogMaterializedTable,
materializedTableIdentifier,
Collections.emptyMap(),
Optional.empty());
} catch (Exception e) {
// drop materialized table while submit flink streaming job occur exception. Thus, weak
// atomicity is guaranteed
LOG.warn(
"Submit continuous refresh job occur exception, drop materialized table {}.",
materializedTableIdentifier,
e);
operationExecutor.callExecutableOperation(
handle,
new DropMaterializedTableOperation(materializedTableIdentifier, true, false));
throw e;
hackergin marked this conversation as resolved.
Show resolved Hide resolved
hackergin marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static ResultFetcher callAlterMaterializedTableSuspend(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableSuspendOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS != materializedTable.getRefreshMode()) {
throw new SqlExecutionException(
"Only support suspend continuous refresh job currently.");
}

ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(
materializedTable.getSerializedRefreshHandler(),
operationExecutor.getSessionContext().getUserClassloader());

String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler);

ContinuousRefreshHandler updateRefreshHandler =
new ContinuousRefreshHandler(
refreshHandler.getExecutionTarget(),
refreshHandler.getJobId(),
savepointPath);

CatalogMaterializedTable updatedMaterializedTable =
materializedTable.copy(
CatalogMaterializedTable.RefreshStatus.SUSPENDED,
materializedTable.getRefreshHandlerDescription().orElse(null),
serializeContinuousHandler(updateRefreshHandler));
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, Collections.emptyList(), updatedMaterializedTable);

operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation);

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private static ResultFetcher callAlterMaterializedTableResume(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableResumeOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable catalogMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
!= catalogMaterializedTable.getRefreshMode()) {
throw new SqlExecutionException(
"Only support resume continuous refresh job currently.");
}

ContinuousRefreshHandler continuousRefreshHandler =
deserializeContinuousHandler(
catalogMaterializedTable.getSerializedRefreshHandler(),
operationExecutor.getSessionContext().getUserClassloader());
Optional<String> restorePath = continuousRefreshHandler.getRestorePath();
executeContinuousRefreshJob(
operationExecutor,
handle,
catalogMaterializedTable,
tableIdentifier,
op.getDynamicOptions(),
restorePath);

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private static void executeContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
CatalogMaterializedTable catalogMaterializedTable,
ObjectIdentifier materializedTableIdentifier,
Map<String, String> dynamicOptions,
Optional<String> restorePath) {
// Set job name, runtime mode, checkpoint interval
// TODO: Set minibatch related optimization options.
Configuration customConfig = new Configuration();
Expand All @@ -125,11 +239,13 @@ private static void createMaterializedInContinuousMode(
customConfig.set(NAME, jobName);
customConfig.set(RUNTIME_MODE, STREAMING);
customConfig.set(CHECKPOINTING_INTERVAL, catalogMaterializedTable.getFreshness());
restorePath.ifPresent(s -> customConfig.set(SAVEPOINT_PATH, s));

String insertStatement =
String.format(
"INSERT INTO %s %s",
materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery());
getInsertStatement(
materializedTableIdentifier,
catalogMaterializedTable.getDefinitionQuery(),
dynamicOptions);
try {
// submit flink streaming job
ResultFetcher resultFetcher =
Expand All @@ -143,8 +259,7 @@ private static void createMaterializedInContinuousMode(
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
ContinuousRefreshHandler continuousRefreshHandler =
new ContinuousRefreshHandler(executeTarget, jobId);
byte[] serializedBytes =
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
byte[] serializedBytes = serializeContinuousHandler(continuousRefreshHandler);

// update RefreshHandler to Catalog
CatalogMaterializedTable updatedMaterializedTable =
Expand Down Expand Up @@ -190,21 +305,13 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation(
AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) {
ObjectIdentifier materializedTableIdentifier =
alterMaterializedTableRefreshOperation.getTableIdentifier();
ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier);
if (MATERIALIZED_TABLE != table.getTableKind()) {
throw new ValidationException(
String.format(
"The table '%s' is not a materialized table.",
materializedTableIdentifier));
}

ResolvedCatalogMaterializedTable materializedTable =
(ResolvedCatalogMaterializedTable) table;
getCatalogMaterializedTable(operationExecutor, materializedTableIdentifier);

Map<String, String> partitionSpec =
alterMaterializedTableRefreshOperation.getPartitionSpec();

validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table);
validatePartitionSpec(partitionSpec, materializedTable);

// Set job name, runtime mode
Configuration customConfig = new Configuration();
Expand Down Expand Up @@ -307,6 +414,84 @@ protected static String getManuallyRefreshStatement(
return insertStatement.toString();
}

private static String stopJobWithSavepoint(
OperationExecutor executor,
OperationHandle handle,
ContinuousRefreshHandler refreshHandler) {
// check savepoint dir is configured
Optional<String> savepointDir =
executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY);
if (!savepointDir.isPresent()) {
throw new ValidationException(
"Savepoint directory is not configured, can't stop job with savepoint.");
}
ResultFetcher resultFetcher =
executor.callStopJobOperation(
hackergin marked this conversation as resolved.
Show resolved Hide resolved
executor.getTableEnvironment(),
handle,
new StopJobOperation(refreshHandler.getJobId(), true, false));
List<RowData> results = fetchAllResults(resultFetcher);
return results.get(0).getString(0).toString();
}

private static ContinuousRefreshHandler deserializeContinuousHandler(
byte[] serializedRefreshHandler, ClassLoader classLoader) {
try {
return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
serializedRefreshHandler, classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new SqlExecutionException(
"Deserialize ContinuousRefreshHandler occur exception.", e);
}
}

private static byte[] serializeContinuousHandler(ContinuousRefreshHandler refreshHandler) {
try {
return ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler);
} catch (IOException e) {
throw new SqlExecutionException(
"Serialize ContinuousRefreshHandler occur exception.", e);
}
}

private static ResolvedCatalogMaterializedTable getCatalogMaterializedTable(
OperationExecutor operationExecutor, ObjectIdentifier tableIdentifier) {
ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
operationExecutor.getTable(tableIdentifier);
if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
throw new ValidationException(
String.format(
"Table %s is not a materialized table, does not support materialized table related operation.",
tableIdentifier));
}

return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
}

/** Generate insert statement for materialized table. */
@VisibleForTesting
protected static String getInsertStatement(
ObjectIdentifier materializedTableIdentifier,
String definitionQuery,
Map<String, String> dynamicOptions) {
StringBuilder builder =
new StringBuilder(
String.format(
"INSERT INTO %s",
materializedTableIdentifier.asSerializableString()));

if (!dynamicOptions.isEmpty()) {
String hints =
dynamicOptions.entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(", "));
builder.append(String.format(" /*+ OPTIONS(%s) */", hints));
}

builder.append("\n").append(definitionQuery);
return builder.toString();
}

private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
Long token = 0L;
List<RowData> results = new ArrayList<>();
Expand Down
Loading