Skip to content

Commit

Permalink
[HUDI-2547] Schedule Flink compaction in service (#4254)
Browse files Browse the repository at this point in the history
Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
  • Loading branch information
yuzhaojing and yuzhaojing committed Dec 22, 2021
1 parent f1286c2 commit 15eb7e8
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public class FlinkCompactionConfig extends Configuration {
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String compactionSeq = SEQ_LIFO;

@Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
public Boolean serviceMode = false;

@Parameter(names = {"--min-compaction-interval-seconds"},
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;

/**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,141 +18,287 @@

package org.apache.hudi.sink.compact;

import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

import com.beust.jcommander.JCommander;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Flink hudi compaction program that can be executed manually.
*/
public class HoodieFlinkCompactor {

protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);

/**
* Flink Execution Environment.
*/
private final AsyncCompactionService compactionScheduleService;

public HoodieFlinkCompactor(AsyncCompactionService service) {
this.compactionScheduleService = service;
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);

AsyncCompactionService service = new AsyncCompactionService(cfg, conf, env);

new HoodieFlinkCompactor(service).start(cfg.serviceMode);
}

/**
* Main method to start compaction service.
*/
public void start(boolean serviceMode) throws Exception {
if (serviceMode) {
compactionScheduleService.start(null);
try {
compactionScheduleService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
} finally {
LOG.info("Shut down hoodie flink compactor");
}
} else {
LOG.info("Hoodie Flink Compactor running only single round");
try {
compactionScheduleService.compact();
} catch (Exception e) {
LOG.error("Got error running delta sync once. Shutting down", e);
throw e;
} finally {
LOG.info("Shut down hoodie flink compactor");
}
}
}

public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) {
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
return cfg;
}

Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/**
* Schedules compaction in service.
*/
public static class AsyncCompactionService extends HoodieAsyncService {
private static final long serialVersionUID = 1L;

/**
* Flink Compaction Config.
*/
private final FlinkCompactionConfig cfg;

/**
* Flink Config.
*/
private final Configuration conf;

/**
* Meta Client.
*/
private final HoodieTableMetaClient metaClient;

/**
* Write Client.
*/
private final HoodieFlinkWriteClient<?> writeClient;

/**
* The hoodie table.
*/
private final HoodieFlinkTable<?> table;

/**
* Flink Execution Environment.
*/
private final StreamExecutionEnvironment env;

/**
* Executor Service.
*/
private final ExecutorService executor;

public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
this.cfg = cfg;
this.conf = conf;
this.env = env;
this.executor = Executors.newFixedThreadPool(1);

// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
// create metaClient
this.metaClient = StreamerUtil.createMetaClient(conf);

// get the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
// get the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());

// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);

// infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient);
// infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient);

HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
this.writeClient = StreamerUtil.createWriteClient(conf);
this.table = writeClient.getHoodieTable();
}

@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;

try {
while (!isShutdownRequested()) {
try {
compact();
Thread.sleep(cfg.minCompactionIntervalSeconds * 1000);
} catch (Exception e) {
LOG.error("Shutting down compaction service due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
}
} finally {
shutdownAsyncService(error);
}
return true;
}, executor), executor);
}

private void compact() throws Exception {
table.getMetaClient().reloadActiveTimeline();

// judge whether have operation
// to compute the compaction instant time and do compaction.
if (cfg.schedule) {
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) {
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
// checks the compaction plan and do compaction.
if (cfg.schedule) {
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) {
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
table.getMetaClient().reloadActiveTimeline();
}
}

// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return;
}

String compactionInstantTime = requested.get().getTimestamp();

HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
}

// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return;
}
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);

String compactionInstantTime = requested.get().getTimestamp();
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// No compaction plan, do nothing and return.
LOG.info("No compaction plan for instant " + compactionInstantTime);
return;
}

HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.

// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
// clean the compaction plan in auxiliary path and cancels the compaction.

if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// No compaction plan, do nothing and return.
LOG.info("No compaction plan for instant " + compactionInstantTime);
return;
}
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
}

HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);

// clean the compaction plan in auxiliary path and cancels the compaction.
LOG.info("Start to compaction for instant " + compactionInstantTime);

LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();

env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
.setParallelism(1);

env.execute("flink_hudi_compaction_" + compactionInstantTime);
}

// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);

// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);

env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
.setParallelism(1);

env.execute("flink_hudi_compaction");
writeClient.close();
/**
* Shutdown async services like compaction/clustering as DeltaSync is shutdown.
*/
public void shutdownAsyncService(boolean error) {
LOG.info("Gracefully shutting down compactor. Error ?" + error);
executor.shutdown();
writeClient.close();
}

@VisibleForTesting
public void shutDown() {
shutdownAsyncService(false);
}
}
}
Loading

0 comments on commit 15eb7e8

Please sign in to comment.