-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor HoodieTableFileSystemView using FileGroups & FileSlices #201
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* | ||
*/ | ||
|
||
package com.uber.hoodie.index; | ||
|
||
import com.google.common.base.Optional; | ||
|
||
import com.uber.hoodie.WriteStatus; | ||
import com.uber.hoodie.common.model.HoodieKey; | ||
import com.uber.hoodie.common.model.HoodieRecord; | ||
import com.uber.hoodie.common.model.HoodieRecordLocation; | ||
import com.uber.hoodie.common.model.HoodieRecordPayload; | ||
import com.uber.hoodie.common.util.FSUtils; | ||
import com.uber.hoodie.config.HoodieWriteConfig; | ||
import com.uber.hoodie.exception.HoodieIndexException; | ||
import com.uber.hoodie.table.HoodieTable; | ||
|
||
import org.apache.log4j.LogManager; | ||
import org.apache.log4j.Logger; | ||
import org.apache.spark.api.java.JavaPairRDD; | ||
import org.apache.spark.api.java.JavaRDD; | ||
import org.apache.spark.api.java.JavaSparkContext; | ||
|
||
import scala.Tuple2; | ||
|
||
/** | ||
* An `stateless` index implementation that will using a deterministic mapping function to | ||
* determine the fileID for a given record. | ||
* | ||
* Pros: | ||
* - Fast | ||
* | ||
* Cons : | ||
* - Need to tune the number of buckets per partition path manually (FIXME: Need to autotune this) | ||
* - Could increase write amplification on copy-on-write storage since inserts always rewrite files | ||
* - Not global. | ||
* | ||
*/ | ||
public class BucketedIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { | ||
|
||
private static Logger logger = LogManager.getLogger(BucketedIndex.class); | ||
|
||
public BucketedIndex(HoodieWriteConfig config, JavaSparkContext jsc) { | ||
super(config, jsc); | ||
} | ||
|
||
private String getBucket(String recordKey) { | ||
return String.valueOf(recordKey.hashCode() % config.getNumBucketsPerPartition()); | ||
} | ||
|
||
@Override | ||
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> table) { | ||
return hoodieKeys.mapToPair(hk -> new Tuple2<>(hk, Optional.of(getBucket(hk.getRecordKey())))); | ||
} | ||
|
||
@Override | ||
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException { | ||
return recordRDD.map(record -> { | ||
String bucket = getBucket(record.getRecordKey()); | ||
//HACK(vc) a non-existent commit is provided here. | ||
record.setCurrentLocation(new HoodieRecordLocation("000", bucket)); | ||
return record; | ||
}); | ||
} | ||
|
||
@Override | ||
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException { | ||
return writeStatusRDD; | ||
} | ||
|
||
@Override | ||
public boolean rollbackCommit(String commitTime) { | ||
// nothing to rollback in the index. | ||
return true; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,7 +67,7 @@ public HBaseIndex(HoodieWriteConfig config, JavaSparkContext jsc) { | |
|
||
@Override | ||
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation( | ||
JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) { | ||
JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> table) { | ||
throw new UnsupportedOperationException("HBase index does not implement check exist yet"); | ||
} | ||
|
||
|
@@ -234,7 +234,8 @@ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, | |
|
||
@Override | ||
public boolean rollbackCommit(String commitTime) { | ||
// TODO (weiy) | ||
// Can't really rollback here. HBase only can let you go from recordKey to fileID, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could potentially take snapshots and restore hbase to snapshot before commit started. But this has to be understood well before implementing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but you cant snapshot every commit. its pretty expensive. anyways :) |
||
// not the other way around | ||
return true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make commitTime an Optional in HoodieRecordLocation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to just make the location the fileGroupID going forward.