Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Sep 22, 2022
1 parent 0ea0766 commit 063f3cf
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
+ "Default is 10%. If set to 100%, with lot of failures, this can cause memory pressure, cause OOMs and "
+ "mask actual data errors.");

public static final ConfigProperty<Boolean> USE_TABLE_PATH_FOR_SPILLABLE_MAP = ConfigProperty
.key("hoodie.memory.use.table.path.for.spillable.map")
.defaultValue(false)
.withDocumentation("Use table's path for spillable map base path. If not, use Value set for " + SPILLABLE_MAP_BASE_PATH.key() + ". Default is set to false.");

/** @deprecated Use {@link #MAX_MEMORY_FRACTION_FOR_MERGE} and its methods instead */
@Deprecated
public static final String MAX_MEMORY_FRACTION_FOR_MERGE_PROP = MAX_MEMORY_FRACTION_FOR_MERGE.key();
Expand Down Expand Up @@ -176,6 +181,11 @@ public Builder withWriteStatusFailureFraction(double failureFraction) {
return this;
}

public Builder useTablePathForSpillableMap(boolean useTablePathForSpillableMap) {
memoryConfig.setValue(USE_TABLE_PATH_FOR_SPILLABLE_MAP, String.valueOf(useTablePathForSpillableMap));
return this;
}

public HoodieMemoryConfig build() {
memoryConfig.setDefaults(HoodieMemoryConfig.class.getName());
return memoryConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -1865,7 +1866,12 @@ public int getMaxDFSStreamBufferSize() {
}

public String getSpillableMapBasePath() {
return getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH);
boolean useTableBasePathForSpillableMap = getBooleanOrDefault(HoodieMemoryConfig.USE_TABLE_PATH_FOR_SPILLABLE_MAP);
if (useTableBasePathForSpillableMap) {
return getBasePath() + HoodieTableMetaClient.SPILLABLE_MAP_RELATIVE_PATH;
} else {
return getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH);
}
}

public double getWriteStatusFailureFraction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
import org.apache.hudi.index.HoodieIndex;
Expand Down Expand Up @@ -95,6 +96,16 @@ public void testDefaultIndexAccordingToEngineType() {
EngineType.JAVA, HoodieIndex.IndexType.INMEMORY));
}

@Test
public void testSpillableMapBasePath() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("tmp").build();
assertEquals(writeConfig.getSpillableMapBasePath(), HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue());

String tableBasePath = "base_path";
writeConfig = HoodieWriteConfig.newBuilder().withPath(tableBasePath).withMemoryConfig(HoodieMemoryConfig.newBuilder().useTablePathForSpillableMap(true).build()).build();
assertEquals(writeConfig.getSpillableMapBasePath(), tableBasePath + HoodieTableMetaClient.SPILLABLE_MAP_RELATIVE_PATH);
}

@Test
public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() {
testEngineSpecificConfig(HoodieWriteConfig::getClusteringPlanStrategyClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class HoodieTableMetaClient implements Serializable {
public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".temp";
public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux";
public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
public static final String SPILLABLE_MAP_RELATIVE_PATH = METAFOLDER_NAME + Path.SEPARATOR + ".spillable_map" + Path.SEPARATOR;
public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata";
public static final String HASHING_METADATA_FOLDER_NAME = ".bucket_index" + Path.SEPARATOR + "consistent_hashing_metadata";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
forceFullScan, partitionName, internalSchema);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, basePath + spillableMapBasePath, new DefaultSizeEstimator(),
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);

this.maxMemorySizeInBytes = maxMemorySizeInBytes;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + basePath + spillableMapBasePath, e);
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}

if (forceFullScan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,7 @@ public static void checkWrittenDataMOR(
.lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
assertNotNull(latestInstant, "No completed commit under table path" + basePath);

File spillableMapBasePath = new File(config.getBasePath() + config.getSpillableMapBasePath());
File[] partitionDirs = baseFile.listFiles(file -> !(file.getName().startsWith(".") || file.equals(spillableMapBasePath))
&& file.isDirectory());
File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory());
assertNotNull(partitionDirs);
assertThat("The partitions number should be: " + partitions, partitionDirs.length, is(partitions));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ private object HoodieMergeOnReadRDD {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(tableState.metadataConfig.getProps).enable(true).build()
val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val spillableMapBasePath = tablePath + hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
val metadataTable = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(hadoopConf), metadataConfig, dataTableBasePath, spillableMapBasePath)
new HoodieLocalEngineContext(hadoopConf), metadataConfig,
dataTableBasePath,
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))

// We have to force full-scan for the MT log record reader, to make sure
// we can iterate over all of the partitions, since by default some of the partitions (Column Stats,
Expand Down

0 comments on commit 063f3cf

Please sign in to comment.