Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,31 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

-------------------------------------------------------------------------------

This product includes code from org.apache.hadoop.
Comment thread
nsivabalan marked this conversation as resolved.
Outdated

* org.apache.hudi.common.bloom.filter.InternalDynamicBloomFilter.java adapted from org.apache.hadoop.util.bloom.DynamicBloomFilter.java

* org.apache.hudi.common.bloom.filter.InternalFilter copied from classes in org.apache.hadoop.util.bloom package

Comment thread
nsivabalan marked this conversation as resolved.
Outdated
with the following license

Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-------------------------------------------------------------------------------
10 changes: 4 additions & 6 deletions hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@

package org.apache.hudi.cli

import java.util
import java.util.Map

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.common.{BloomFilter, HoodieJsonPayload}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}

Expand All @@ -44,7 +41,8 @@ object SparkHelpers {
def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble,
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.config;

import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.index.HoodieIndex;

import javax.annotation.concurrent.Immutable;
Expand All @@ -27,7 +28,6 @@
import java.io.IOException;
import java.util.Properties;


/**
* Indexing related config.
*/
Expand All @@ -54,6 +54,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
// TODO: On by default. Once stable, we will remove the other mode.
public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking";
public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true";
public static final String BLOOM_INDEX_FILTER_TYPE = "hoodie.bloom.index.filter.type";
public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name();
public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries";
public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000";

// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket";
Expand Down Expand Up @@ -194,6 +199,10 @@ public HoodieIndexConfig build() {
BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP),
BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET);
setDefaultOnCondition(props, !props.contains(BLOOM_INDEX_FILTER_TYPE),
BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;


private ConsistencyGuardConfig consistencyGuardConfig;

// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
Expand Down Expand Up @@ -363,6 +364,14 @@ public int getHBaseIndexDesiredPutsTime() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
}

public String getBloomFilterType() {
return props.getProperty(HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE);
}

public int getDynamicBloomFilterMaxNumEntries() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES));
}

/**
* Fraction of the global share of QPS that should be allocated to this job. Let's say there are 3 jobs which have
* input size in terms of number of rows required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
private long totalKeysChecked;

public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
Pair<String, String> partitionPathFilePair) {
Pair<String, String> partitionPathFilePair) {
super(config, null, hoodieTable, partitionPathFilePair);
this.tableType = hoodieTable.getMetaClient().getTableType();
this.candidateRecordKeys = new ArrayList<>();
Expand All @@ -70,7 +70,7 @@ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTabl
* Given a list of row keys and one file, return only row keys existing in that file.
*/
public static List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
Path filePath) throws HoodieIndexException {
Path filePath) throws HoodieIndexException {
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
Expand Down Expand Up @@ -134,7 +134,7 @@ public static class KeyLookupResult {
private final String partitionPath;

public KeyLookupResult(String fileId, String partitionPath, String baseInstantTime,
List<String> matchingRecordKeys) {
List<String> matchingRecordKeys) {
this.fileId = fileId;
this.partitionPath = partitionPath;
this.baseInstantTime = baseInstantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.hudi.io.storage;

import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -51,7 +52,10 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieSto
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
String commitTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable)
throws IOException {
BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP());
BloomFilter filter = BloomFilterFactory
.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.hudi.HoodieReadClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -147,7 +150,7 @@ public static SparkConf getSparkConfForTest(String appName) {
}

public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline,
List<HoodieInstant> commitsToReturn) throws IOException {
List<HoodieInstant> commitsToReturn) throws IOException {
HashMap<String, String> fileIdToFullPath = new HashMap<>();
for (HoodieInstant commit : commitsToReturn) {
HoodieCommitMetadata metadata =
Expand All @@ -158,7 +161,7 @@ public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath
}

public static Dataset<Row> readCommit(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
String commitTime) {
String commitTime) {
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
if (!commitTimeline.containsInstant(commitInstant)) {
new HoodieException("No commit exists at " + commitTime);
Expand All @@ -178,7 +181,7 @@ public static Dataset<Row> readCommit(String basePath, SQLContext sqlContext, Ho
* Obtain all new data written into the Hoodie dataset since the given timestamp.
*/
public static Dataset<Row> readSince(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
String lastCommitTime) {
String lastCommitTime) {
List<HoodieInstant> commitsToReturn =
commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList());
try {
Expand All @@ -195,7 +198,7 @@ public static Dataset<Row> readSince(String basePath, SQLContext sqlContext, Hoo
* Reads the paths under the a hoodie dataset out as a DataFrame.
*/
public static Dataset<Row> read(JavaSparkContext jsc, String basePath, SQLContext sqlContext, FileSystem fs,
String... paths) {
String... paths) {
List<String> filteredPaths = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
Expand All @@ -214,10 +217,11 @@ public static Dataset<Row> read(JavaSparkContext jsc, String basePath, SQLContex
}

public static String writeParquetFile(String basePath, String partitionPath, String filename,
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {

if (filter == null) {
filter = new BloomFilter(10000, 0.0000001);
filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
}
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
Expand Down Expand Up @@ -245,7 +249,7 @@ public static String writeParquetFile(String basePath, String partitionPath, Str
}

public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records,
Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException {
Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException {
Thread.sleep(1000);
String commitTime = HoodieTestUtils.makeNewCommitTime();
String fileId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hudi.index.bloom;

import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -248,7 +250,7 @@ public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedExcept

// We write record1, record2 to a parquet file, but the bloom filter contains (record1,
// record2, record3).
BloomFilter filter = new BloomFilter(10000, 0.0000001);
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record3.getRecordKey());
String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2),
schema, filter, true);
Expand Down Expand Up @@ -451,7 +453,8 @@ public void testBloomFilterFalseError() throws IOException, InterruptedException
HoodieRecord record2 =
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);

BloomFilter filter = new BloomFilter(10000, 0.0000001);
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1,
BloomFilterTypeCode.SIMPLE.name());
filter.add(record2.getRecordKey());
String filename =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, filter, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.HoodieDynamicBoundedBloomFilter;

import org.apache.avro.Schema;
import org.apache.parquet.avro.AvroWriteSupport;
Expand All @@ -40,6 +41,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";

public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) {
super(schema, avroSchema);
Expand All @@ -55,6 +57,9 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(extraMetaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,35 @@
* limitations under the License.
*/

package org.apache.hudi.common;

import org.junit.Test;

import java.io.IOException;
package org.apache.hudi.common.bloom.filter;

/**
* Tests bloom filter {@link BloomFilter}.
* A Bloom filter interface.
*/
public class TestBloomFilter {
public interface BloomFilter {
Comment thread
nsivabalan marked this conversation as resolved.
Outdated

/**
* Add a key to the {@link BloomFilter}.
*
* @param key the key to the added to the {@link BloomFilter}
*/
void add(String key);

@Test
public void testAddKey() {
BloomFilter filter = new BloomFilter(100, 0.0000001);
filter.add("key1");
assert (filter.mightContain("key1"));
}
/**
* Tests for key membership.
*
* @param key the key to be checked for membership
* @return {@code true} if key may be found, {@code false} if key is not found for sure.
*/
boolean mightContain(String key);

@Test
public void testSerialize() throws IOException, ClassNotFoundException {
BloomFilter filter = new BloomFilter(1000, 0.0000001);
filter.add("key1");
filter.add("key2");
String filterStr = filter.serializeToString();
/**
* Serialize the bloom filter as a string.
*/
String serializeToString();

// Rebuild
BloomFilter newFilter = new BloomFilter(filterStr);
assert (newFilter.mightContain("key1"));
assert (newFilter.mightContain("key2"));
}
/**
* @return the bloom index type code.
**/
BloomFilterTypeCode getBloomFilterTypeCode();
}
Loading