Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Small file corrections with inserts log files #402

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
private final HoodieTimeline commitTimeline;
private HoodieTable hoodieTable;
private transient Optional<SQLContext> sqlContextOpt;
private String basePath;

/**
* @param basePath path to Hoodie dataset
Expand Down Expand Up @@ -94,10 +95,12 @@ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this.jsc = jsc;
this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true),
clientConfig);
this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants();
.getHoodieTable(metaClient,
clientConfig, jsc);
this.basePath = basePath;
this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.sqlContextOpt = Optional.absent();
}
Expand Down Expand Up @@ -128,7 +131,7 @@ public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws

assertSqlContext();
JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
.fetchRecordLocation(hoodieKeys, hoodieTable);
.fetchRecordLocation(hoodieKeys, jsc, hoodieTable.getMetaClient());
List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();

Expand Down Expand Up @@ -156,7 +159,7 @@ public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws
* file
*/
public JavaPairRDD<HoodieKey, Optional<String>> checkExists(JavaRDD<HoodieKey> hoodieKeys) {
return index.fetchRecordLocation(hoodieKeys, hoodieTable);
return index.fetchRecordLocation(hoodieKeys, jsc, hoodieTable.getMetaClient());
}

/**
Expand All @@ -180,6 +183,6 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
*/
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecords)
throws HoodieIndexException {
return index.tagLocation(hoodieRecords, hoodieTable);
return index.tagLocation(hoodieRecords, jsc, this.hoodieTable.getMetaClient());
}
}
137 changes: 92 additions & 45 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public class WriteStatus implements Serializable {
* aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
* objects are collected in Spark Driver.
*
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies
* it.
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies
* it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link
* HoodieRecord} before deflation.
* HoodieRecord} before deflation.
*/
public void markSuccess(HoodieRecord record,
Optional<Map<String, String>> optionalRecordMetadata) {
Expand All @@ -69,10 +69,10 @@ public void markSuccess(HoodieRecord record,
* aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus
* objects are collected in Spark Driver.
*
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies
* it.
* @param record deflated {@code HoodieRecord} containing information that uniquely identifies
* it.
* @param optionalRecordMetadata optional metadata related to data contained in {@link
* HoodieRecord} before deflation.
* HoodieRecord} before deflation.
*/
public void markFailure(HoodieRecord record, Throwable t,
Optional<Map<String, String>> optionalRecordMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String PARQUET_COMPRESSION_RATIO = "hoodie.parquet.compression.ratio";
// Default compression ratio for parquet
public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1);
public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio";
// Default compression ratio for log file to parquet, general 3x
public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35);

private HoodieStorageConfig(Properties props) {
super(props);
Expand Down Expand Up @@ -102,6 +105,11 @@ public Builder parquetCompressionRatio(double parquetCompressionRatio) {
return this;
}

public Builder logFileToParquetCompressionRatio(double logFileToParquetCompressionRatio) {
props.setProperty(LOGFILE_TO_PARQUET_COMPRESSION_RATIO, String.valueOf(logFileToParquetCompressionRatio));
return this;
}

public HoodieStorageConfig build() {
HoodieStorageConfig config = new HoodieStorageConfig(props);
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES),
Expand All @@ -116,6 +124,8 @@ public HoodieStorageConfig build() {
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
setDefaultOnCondition(props, !props.containsKey(PARQUET_COMPRESSION_RATIO),
PARQUET_COMPRESSION_RATIO, DEFAULT_STREAM_COMPRESSION_RATIO);
setDefaultOnCondition(props, !props.containsKey(LOGFILE_TO_PARQUET_COMPRESSION_RATIO),
LOGFILE_TO_PARQUET_COMPRESSION_RATIO, DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO);
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ public double getParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
}

public double getLogFileToParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
}

/**
* metrics properties
**/
Expand All @@ -345,7 +349,7 @@ public int getGraphiteServerPort() {
public String getGraphiteMetricPrefix() {
return props.getProperty(HoodieMetricsConfig.GRAPHITE_METRIC_PREFIX);
}

/**
* memory configs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> sortedRecordItr) throws Exception {
return new LazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -43,15 +44,15 @@
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
* files.
*/
public class LazyInsertIterable<T extends HoodieRecordPayload> extends
public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extends
LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {

private final HoodieWriteConfig hoodieConfig;
private final String commitTime;
private final HoodieTable<T> hoodieTable;
private Set<String> partitionsCleaned;
protected final HoodieWriteConfig hoodieConfig;
protected final String commitTime;
protected final HoodieTable<T> hoodieTable;
protected Set<String> partitionsCleaned;

public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String commitTime, HoodieTable<T> hoodieTable) {
super(sortedRecordItr);
this.partitionsCleaned = new HashSet<>();
Expand Down Expand Up @@ -89,7 +90,7 @@ protected List<WriteStatus> computeNext() {
final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig);
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr,
new InsertHandler(), getTransformFunction(schema));
getInsertHandler(), getTransformFunction(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand All @@ -107,15 +108,19 @@ protected void end() {

}

protected InsertHandler getInsertHandler() {
return new InsertHandler();
}

/**
* Consumes stream of hoodie records from in-memory queue and
* writes to one or more create-handles
*/
private class InsertHandler extends
protected class InsertHandler extends
BoundedInMemoryQueueConsumer<Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>, List<WriteStatus>> {

private final List<WriteStatus> statuses = new ArrayList<>();
private HoodieCreateHandle handle;
protected final List<WriteStatus> statuses = new ArrayList<>();
protected HoodieIOHandle handle;

@Override
protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>> payload) {
Expand All @@ -132,7 +137,8 @@ protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>

// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath());
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
.randomUUID().toString());
}

if (handle.canWrite(payload._1())) {
Expand All @@ -142,7 +148,8 @@ protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath());
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
.randomUUID().toString());
handle.write(insertPayload, payload._2()); // we should be able to write 1 payload.
}
}
Expand All @@ -161,4 +168,4 @@ protected List<WriteStatus> getResult() {
return statuses;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.func;

import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.io.HoodieAppendHandle;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.avro.generic.IndexedRecord;
import scala.Tuple2;

/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
* log files.
*/
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends
CopyOnWriteLazyInsertIterable<T> {

public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String commitTime, HoodieTable<T> hoodieTable) {
super(sortedRecordItr, config, commitTime, hoodieTable);
}

@Override
protected InsertHandler getInsertHandler() {
return new MergeOnReadInsertHandler();
}

protected class MergeOnReadInsertHandler extends InsertHandler {

@Override
protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>> payload) {
final HoodieRecord insertPayload = payload._1();
List<WriteStatus> statuses = new ArrayList<>();
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
handle.write(insertPayload, payload._2);
} else {
// handle is full.
handle.close();
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
handle.write(insertPayload, payload._2); // we should be able to write 1 payload.
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.index.bucketed.BucketedIndex;
import com.uber.hoodie.index.hbase.HBaseIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.Serializable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -38,13 +38,12 @@
public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Serializable {

protected final HoodieWriteConfig config;
protected transient JavaSparkContext jsc = null;

protected HoodieIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
this.config = config;
this.jsc = jsc;
}


public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
JavaSparkContext jsc) throws HoodieIndexException {
switch (config.getIndexType()) {
Expand All @@ -68,22 +67,24 @@ public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieW
* file
*/
public abstract JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> table);
JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc,
HoodieTableMetaClient metaClient);

/**
* Looks up the index and tags each incoming record with a location of a file that contains the
* row (if it is actually present)
*/
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
HoodieTable<T> hoodieTable) throws HoodieIndexException;
JavaSparkContext jsc, HoodieTableMetaClient metaClient) throws HoodieIndexException;

/**
* Extracts the location of written records, and updates the index.
* <p>
* TODO(vc): We may need to propagate the record as well in a WriteStatus class
*/
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> hoodieTable) throws HoodieIndexException;
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTableMetaClient metaClient)
throws HoodieIndexException;

/**
* Rollback the efffects of the commit made at commitTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -50,19 +50,19 @@ public InMemoryHashIndex(HoodieWriteConfig config, JavaSparkContext jsc) {

@Override
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
final HoodieTable<T> table) {
JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
throw new UnsupportedOperationException("InMemory index does not implement check exist yet");
}

@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
HoodieTable<T> hoodieTable) {
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTableMetaClient metaClient) {
return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(), true);
}

@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> hoodieTable) {
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTableMetaClient metaClient) {
return writeStatusRDD.map(new Function<WriteStatus, WriteStatus>() {
@Override
public WriteStatus call(WriteStatus writeStatus) {
Expand Down