Skip to content

Commit

Permalink
Fixing small file handling, inline compaction defaults
Browse files Browse the repository at this point in the history
 - Small file limit is now 100MB by default
 - Turned on inline compaction by default for MOR
 - Changes take effect on DataSource and DeltaStreamer
  • Loading branch information
Vinoth Chandar authored and n3nash committed Apr 3, 2019
1 parent 51f4908 commit b34a204
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,7 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
// Save was a success
// Do a inline compaction if enabled
// Save was a success & Do a inline compaction if enabled
if (config.isInlineCompaction()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
forceCompact(extraMetadata);
Expand Down Expand Up @@ -1103,7 +1102,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Optional<Map<Stri
HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime,
HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
Preconditions.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant. Instants :"
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
Expand Down Expand Up @@ -1343,8 +1342,7 @@ private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatus
}

/**
* Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed
* asynchronously. Please always use this serially before or after an insert/upsert action.
* Performs a compaction operation on a dataset, serially before or after an insert/upsert action.
*/
private Optional<String> forceCompact(Optional<Map<String, String>> extraMetadata) throws IOException {
Optional<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
// Upsert uses this file size to compact new data onto existing files..
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
// Turned off by default
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(0);
// By default, treat any file <= 100MB as a small file.
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600);
/**
* Configs related to specific table types
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public static void checkRequiredProperties(TypedProperties props,

public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr,
String basePath, String tblName, Map<String, String> parameters) throws Exception {

// inline compaction is on by default for MOR
boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
&& parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions
.MOR_STORAGE_TYPE_OPT_VAL());

HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
.withPath(basePath).withAutoCommit(false)
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
Expand All @@ -134,6 +140,7 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String
.withPayloadClass(parameters.get(
DataSourceWriteOptions
.PAYLOAD_CLASS_OPT_KEY()))
.withInlineCompaction(inlineCompact)
.build())
// override above with Hoodie configs specified as options.
.withProps(parameters).build();
Expand Down
3 changes: 2 additions & 1 deletion hoodie-spark/src/test/scala/DataSourceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import org.junit.{Before, Test}
import org.scalatest.junit.AssertionsForJUnit

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

/**
* Basic tests on the spark datasource
Expand Down Expand Up @@ -131,6 +131,7 @@ class DataSourceTest extends AssertionsForJUnit {
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("com.uber.hoodie")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -323,17 +324,22 @@ private void registerAvroSchemas(SchemaProvider schemaProvider) {
}
}

private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) throws Exception {
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// turn on inline compaction by default, for MOR tables
.withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ)
.build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(props);
if (null != schemaProvider) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}

return builder.build();
}

Expand Down

0 comments on commit b34a204

Please sign in to comment.