Skip to content

Commit

Permalink
[FLINK-35197][table] Support the execution of suspend, resume materia…
Browse files Browse the repository at this point in the history
…lized table in continuous refresh mode

This closes #24765
  • Loading branch information
hackergin authored and lsyldliu committed May 13, 2024
1 parent 3b6e8db commit e4972c0
Show file tree
Hide file tree
Showing 4 changed files with 561 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
Expand All @@ -46,17 +49,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.configuration.DeploymentOptions.TARGET;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.configuration.PipelineOptions.NAME;
import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
Expand All @@ -78,6 +87,12 @@ public static ResultFetcher callMaterializedTableOperation(
} else if (op instanceof AlterMaterializedTableRefreshOperation) {
return callAlterMaterializedTableRefreshOperation(
operationExecutor, handle, (AlterMaterializedTableRefreshOperation) op);
} else if (op instanceof AlterMaterializedTableSuspendOperation) {
return callAlterMaterializedTableSuspend(
operationExecutor, handle, (AlterMaterializedTableSuspendOperation) op);
} else if (op instanceof AlterMaterializedTableResumeOperation) {
return callAlterMaterializedTableResume(
operationExecutor, handle, (AlterMaterializedTableResumeOperation) op);
}

throw new SqlExecutionException(
Expand Down Expand Up @@ -115,6 +130,105 @@ private static void createMaterializedInContinuousMode(
CatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();

try {
executeContinuousRefreshJob(
operationExecutor,
handle,
catalogMaterializedTable,
materializedTableIdentifier,
Collections.emptyMap(),
Optional.empty());
} catch (Exception e) {
// drop materialized table while submit flink streaming job occur exception. Thus, weak
// atomicity is guaranteed
LOG.warn(
"Submit continuous refresh job occur exception, drop materialized table {}.",
materializedTableIdentifier,
e);
operationExecutor.callExecutableOperation(
handle,
new DropMaterializedTableOperation(materializedTableIdentifier, true, false));
throw e;
}
}

private static ResultFetcher callAlterMaterializedTableSuspend(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableSuspendOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS != materializedTable.getRefreshMode()) {
throw new SqlExecutionException(
"Only support suspend continuous refresh job currently.");
}

ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(
materializedTable.getSerializedRefreshHandler(),
operationExecutor.getSessionContext().getUserClassloader());

String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler);

ContinuousRefreshHandler updateRefreshHandler =
new ContinuousRefreshHandler(
refreshHandler.getExecutionTarget(),
refreshHandler.getJobId(),
savepointPath);

CatalogMaterializedTable updatedMaterializedTable =
materializedTable.copy(
CatalogMaterializedTable.RefreshStatus.SUSPENDED,
materializedTable.getRefreshHandlerDescription().orElse(null),
serializeContinuousHandler(updateRefreshHandler));
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, Collections.emptyList(), updatedMaterializedTable);

operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation);

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private static ResultFetcher callAlterMaterializedTableResume(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableResumeOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable catalogMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
!= catalogMaterializedTable.getRefreshMode()) {
throw new SqlExecutionException(
"Only support resume continuous refresh job currently.");
}

ContinuousRefreshHandler continuousRefreshHandler =
deserializeContinuousHandler(
catalogMaterializedTable.getSerializedRefreshHandler(),
operationExecutor.getSessionContext().getUserClassloader());
Optional<String> restorePath = continuousRefreshHandler.getRestorePath();
executeContinuousRefreshJob(
operationExecutor,
handle,
catalogMaterializedTable,
tableIdentifier,
op.getDynamicOptions(),
restorePath);

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private static void executeContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
CatalogMaterializedTable catalogMaterializedTable,
ObjectIdentifier materializedTableIdentifier,
Map<String, String> dynamicOptions,
Optional<String> restorePath) {
// Set job name, runtime mode, checkpoint interval
// TODO: Set minibatch related optimization options.
Configuration customConfig = new Configuration();
Expand All @@ -125,11 +239,13 @@ private static void createMaterializedInContinuousMode(
customConfig.set(NAME, jobName);
customConfig.set(RUNTIME_MODE, STREAMING);
customConfig.set(CHECKPOINTING_INTERVAL, catalogMaterializedTable.getFreshness());
restorePath.ifPresent(s -> customConfig.set(SAVEPOINT_PATH, s));

String insertStatement =
String.format(
"INSERT INTO %s %s",
materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery());
getInsertStatement(
materializedTableIdentifier,
catalogMaterializedTable.getDefinitionQuery(),
dynamicOptions);
try {
// submit flink streaming job
ResultFetcher resultFetcher =
Expand All @@ -143,8 +259,7 @@ private static void createMaterializedInContinuousMode(
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
ContinuousRefreshHandler continuousRefreshHandler =
new ContinuousRefreshHandler(executeTarget, jobId);
byte[] serializedBytes =
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
byte[] serializedBytes = serializeContinuousHandler(continuousRefreshHandler);

// update RefreshHandler to Catalog
CatalogMaterializedTable updatedMaterializedTable =
Expand Down Expand Up @@ -190,21 +305,13 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation(
AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) {
ObjectIdentifier materializedTableIdentifier =
alterMaterializedTableRefreshOperation.getTableIdentifier();
ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier);
if (MATERIALIZED_TABLE != table.getTableKind()) {
throw new ValidationException(
String.format(
"The table '%s' is not a materialized table.",
materializedTableIdentifier));
}

ResolvedCatalogMaterializedTable materializedTable =
(ResolvedCatalogMaterializedTable) table;
getCatalogMaterializedTable(operationExecutor, materializedTableIdentifier);

Map<String, String> partitionSpec =
alterMaterializedTableRefreshOperation.getPartitionSpec();

validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table);
validatePartitionSpec(partitionSpec, materializedTable);

// Set job name, runtime mode
Configuration customConfig = new Configuration();
Expand Down Expand Up @@ -307,6 +414,84 @@ protected static String getManuallyRefreshStatement(
return insertStatement.toString();
}

private static String stopJobWithSavepoint(
OperationExecutor executor,
OperationHandle handle,
ContinuousRefreshHandler refreshHandler) {
// check savepoint dir is configured
Optional<String> savepointDir =
executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY);
if (!savepointDir.isPresent()) {
throw new ValidationException(
"Savepoint directory is not configured, can't stop job with savepoint.");
}
ResultFetcher resultFetcher =
executor.callStopJobOperation(
executor.getTableEnvironment(),
handle,
new StopJobOperation(refreshHandler.getJobId(), true, false));
List<RowData> results = fetchAllResults(resultFetcher);
return results.get(0).getString(0).toString();
}

private static ContinuousRefreshHandler deserializeContinuousHandler(
byte[] serializedRefreshHandler, ClassLoader classLoader) {
try {
return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
serializedRefreshHandler, classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new SqlExecutionException(
"Deserialize ContinuousRefreshHandler occur exception.", e);
}
}

private static byte[] serializeContinuousHandler(ContinuousRefreshHandler refreshHandler) {
try {
return ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler);
} catch (IOException e) {
throw new SqlExecutionException(
"Serialize ContinuousRefreshHandler occur exception.", e);
}
}

private static ResolvedCatalogMaterializedTable getCatalogMaterializedTable(
OperationExecutor operationExecutor, ObjectIdentifier tableIdentifier) {
ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
operationExecutor.getTable(tableIdentifier);
if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
throw new ValidationException(
String.format(
"Table %s is not a materialized table, does not support materialized table related operation.",
tableIdentifier));
}

return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
}

/** Generate insert statement for materialized table. */
@VisibleForTesting
protected static String getInsertStatement(
ObjectIdentifier materializedTableIdentifier,
String definitionQuery,
Map<String, String> dynamicOptions) {
StringBuilder builder =
new StringBuilder(
String.format(
"INSERT INTO %s",
materializedTableIdentifier.asSerializableString()));

if (!dynamicOptions.isEmpty()) {
String hints =
dynamicOptions.entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(", "));
builder.append(String.format(" /*+ OPTIONS(%s) */", hints));
}

builder.append("\n").append(definitionQuery);
return builder.toString();
}

private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
Long token = 0L;
List<RowData> results = new ArrayList<>();
Expand Down
Loading

0 comments on commit e4972c0

Please sign in to comment.