Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HoodieTableFileSystemView using FileGroups & FileSlices #201

Merged
merged 3 commits into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ class DedupeSparkJob (basePath: String,
val dedupeTblName = s"${tmpTableName}_dupeKeys"

val metadata = new HoodieTableMetaClient(fs, basePath)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants())

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}"))
val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestVersions(allFiles).collect(Collectors.toList[HoodieDataFile]())
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
val filteredStatuses = latestFiles.map(f => f.getPath)
LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}")

Expand Down Expand Up @@ -126,10 +126,11 @@ class DedupeSparkJob (basePath: String,

def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetaClient(fs, basePath)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants())

val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}"))
val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestVersions(allFiles).collect(Collectors.toList[HoodieDataFile]())
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)

val latestFiles:java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())

val fileNameToPathMap = latestFiles.map(f => (f.getFileId, new Path(f.getPath))).toMap
val dupeFixPlan = planDuplicateFix()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
Expand Down Expand Up @@ -167,7 +168,6 @@ public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism)
public Dataset<Row> read(String... paths) {
assertSqlContext();
List<String> filteredPaths = new ArrayList<>();
TableFileSystemView fileSystemView = hoodieTable.getFileSystemView();

try {
for (String path : paths) {
Expand All @@ -177,7 +177,9 @@ public Dataset<Row> read(String... paths) {
+ hoodieTable.getMetaClient().getBasePath());
}

List<HoodieDataFile> latestFiles = fileSystemView.getLatestVersions(fs.globStatus(new Path(path))).collect(
TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(),
hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path)));
List<HoodieDataFile> latestFiles = fileSystemView.getLatestDataFiles().collect(
Collectors.toList());
for (HoodieDataFile file : latestFiles) {
filteredPaths.add(file.getPath());
Expand Down
50 changes: 20 additions & 30 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -260,17 +259,18 @@ private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRD
}

private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
JavaRDD<HoodieRecord<T>> records, int parallelism) {
JavaRDD<HoodieRecord<T>> records,
int parallelism) {
if(condition) {
return deduplicateRecords(records, parallelism);
}
return records;
}

private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords,
String commitTime,
HoodieTable<T> hoodieTable,
final boolean isUpsert) {
String commitTime,
HoodieTable<T> hoodieTable,
final boolean isUpsert) {

// Cache the tagged records, so we don't end up computing both
preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
Expand Down Expand Up @@ -318,10 +318,10 @@ private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> w

private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords
.mapToPair((PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>) record ->
.mapToPair(record ->
new Tuple2<>(new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record))
.partitionBy(partitioner)
.map((Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>) tuple -> tuple._2());
.map(tuple -> tuple._2());
}

/**
Expand All @@ -347,7 +347,7 @@ public boolean commit(String commitTime,

List<Tuple2<String, HoodieWriteStat>> stats = writeStatuses
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
new Tuple2<String, HoodieWriteStat>(writeStatus.getPartitionPath(), writeStatus.getStat()))
new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()))
.collect();

HoodieCommitMetadata metadata = new HoodieCommitMetadata();
Expand Down Expand Up @@ -479,9 +479,9 @@ public boolean savepoint(String commitTime, String user, String comment) {
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
// Scan all partitions files with this commit time
logger.info("Collecting latest files in partition path " + partitionPath);
TableFileSystemView view = table.getFileSystemView();
TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView();
List<String> latestFiles =
view.getLatestVersionInPartition(partitionPath, commitTime)
view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
.map(HoodieDataFile::getFileName).collect(Collectors.toList());
return new Tuple2<>(partitionPath, latestFiles);
}).collectAsMap();
Expand Down Expand Up @@ -800,26 +800,16 @@ public static SparkConf registerClasses(SparkConf conf) {
* Deduplicate Hoodie records, using the given deduplication funciton.
*/
private JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
return records.mapToPair(new PairFunction<HoodieRecord<T>, HoodieKey, HoodieRecord<T>>() {
@Override
public Tuple2<HoodieKey, HoodieRecord<T>> call(HoodieRecord<T> record) {
return new Tuple2<>(record.getKey(), record);
}
}).reduceByKey(new Function2<HoodieRecord<T>, HoodieRecord<T>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(HoodieRecord<T> rec1, HoodieRecord<T> rec2) {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}
}, parallelism).map(new Function<Tuple2<HoodieKey, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(Tuple2<HoodieKey, HoodieRecord<T>> recordTuple) {
return recordTuple._2();
}
});
return records
.mapToPair(record -> new Tuple2<>(record.getKey(), record))
.reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}, parallelism)
.map(recordTuple -> recordTuple._2());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,27 @@
*/
@Immutable
public class HoodieIndexConfig extends DefaultHoodieConfig {

public static final String INDEX_TYPE_PROP = "hoodie.index.type";
public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();

// ***** Bloom Index configs *****
public static final String BLOOM_FILTER_NUM_ENTRIES = "hoodie.index.bloom.num_entries";
public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000";
public static final String BLOOM_FILTER_FPP = "hoodie.index.bloom.fpp";
public static final String DEFAULT_BLOOM_FILTER_FPP = "0.000000001";
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism";
// Disable explicit bloom index parallelism setting by default - hoodie auto computes
public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0";

// ***** HBase Index Configs *****
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";

// ***** Bucketed Index Configs *****
public final static String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";

private HoodieIndexConfig(Properties props) {
super(props);
}
Expand Down Expand Up @@ -104,6 +112,11 @@ public Builder bloomIndexParallelism(int parallelism) {
return this;
}

public Builder numBucketsPerPartition(int numBuckets) {
props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets));
return this;
}

public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ public int getBloomIndexParallelism() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}

public int getNumBucketsPerPartition() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP));
}

/**
* storage properties
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package com.uber.hoodie.func;

import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;

import com.uber.hoodie.io.HoodieIOHandle;
import com.uber.hoodie.io.HoodieInsertHandle;
import com.uber.hoodie.io.HoodieCreateHandle;
import com.uber.hoodie.table.HoodieTable;
import org.apache.spark.TaskContext;

Expand All @@ -43,7 +42,7 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> extends LazyItera
private final String commitTime;
private final HoodieTable<T> hoodieTable;
private Set<String> partitionsCleaned;
private HoodieInsertHandle handle;
private HoodieCreateHandle handle;

public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String commitTime, HoodieTable<T> hoodieTable) {
Expand Down Expand Up @@ -79,7 +78,7 @@ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWrite
// lazily initialize the handle, for the first time
if (handle == null) {
handle =
new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable,
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
record.getPartitionPath());
}

Expand All @@ -91,7 +90,7 @@ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWrite
statuses.add(handle.close());
// Need to handle the rejected record & open new handle
handle =
new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable,
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
record.getPartitionPath());
handle.write(record); // we should be able to write 1 record.
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/

package com.uber.hoodie.index;

import com.google.common.base.Optional;

import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

/**
* An `stateless` index implementation that will using a deterministic mapping function to
* determine the fileID for a given record.
*
* Pros:
* - Fast
*
* Cons :
* - Need to tune the number of buckets per partition path manually (FIXME: Need to autotune this)
* - Could increase write amplification on copy-on-write storage since inserts always rewrite files
* - Not global.
*
*/
public class BucketedIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {

private static Logger logger = LogManager.getLogger(BucketedIndex.class);

public BucketedIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
}

private String getBucket(String recordKey) {
return String.valueOf(recordKey.hashCode() % config.getNumBucketsPerPartition());
}

@Override
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> table) {
return hoodieKeys.mapToPair(hk -> new Tuple2<>(hk, Optional.of(getBucket(hk.getRecordKey()))));
}

@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException {
return recordRDD.map(record -> {
String bucket = getBucket(record.getRecordKey());
//HACK(vc) a non-existent commit is provided here.
record.setCurrentLocation(new HoodieRecordLocation("000", bucket));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make commitTime an Optional in HoodieRecordLocation ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to just make the location the fileGroupID going forward.

return record;
});
}

@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException {
return writeStatusRDD;
}

@Override
public boolean rollbackCommit(String commitTime) {
// nothing to rollback in the index.
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public HBaseIndex(HoodieWriteConfig config, JavaSparkContext jsc) {

@Override
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) {
JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> table) {
throw new UnsupportedOperationException("HBase index does not implement check exist yet");
}

Expand Down Expand Up @@ -234,7 +234,8 @@ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,

@Override
public boolean rollbackCommit(String commitTime) {
// TODO (weiy)
// Can't really rollback here. HBase only can let you go from recordKey to fileID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially take snapshots and restore hbase to snapshot before commit started. But this has to be understood well before implementing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you cant snapshot every commit. its pretty expensive. anyways :)

// not the other way around
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
}

public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> hoodieTable) {
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> table) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));

// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
lookupIndex(partitionRecordKeyPairRDD, table);

JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
Expand All @@ -103,7 +103,7 @@ public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional.of(new Path(
new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath),
new Path(table.getMetaClient().getBasePath(), partitionPath),
fileName).toUri().getPath());
} else {
recordLocationPath = Optional.absent();
Expand Down Expand Up @@ -184,7 +184,7 @@ JavaPairRDD<String, String> loadInvolvedFiles(List<String> partitions,
List<Tuple2<String, String>> list = new ArrayList<>();
if (latestCommitTime.isPresent()) {
List<HoodieDataFile> filteredFiles =
hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath,
hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
for (HoodieDataFile file : filteredFiles) {
list.add(new Tuple2<>(partitionPath, file.getFileName()));
Expand Down