Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim
return timeline;
}

@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
Expand All @@ -101,22 +99,27 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);

if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
}
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
}
}
}
}

LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));

@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
// sanity check
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);

return new HoodieRealtimeRecordReader((RealtimeSplit) split, jobConf,
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
addProjectionToJobConf(realtimeSplit, jobConf);
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
if (!result) {
// if the result is false, then there are no more records
return false;
} else {
}
if (!deltaRecordMap.isEmpty()) {
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
// would be true until we have a way to index logs too)
// return from delta records map if we have some match.
Expand Down Expand Up @@ -134,8 +135,8 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
throw new RuntimeException(errMsg, re);
}
}
return true;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -58,6 +60,7 @@
import java.util.stream.Collectors;

public class HoodieRealtimeRecordReaderUtils {
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);

/**
* Reads the schema from the base file.
Expand Down Expand Up @@ -246,10 +249,10 @@ public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : fieldOrderCsv.split(",");
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
Expand Down