Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24266 #1576

Merged
merged 2 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 32 additions & 6 deletions ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io.orc;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.common.NoDynamicValuesException;
import org.apache.hadoop.fs.PathFilter;
Expand Down Expand Up @@ -1156,13 +1157,36 @@ public List<OrcSplit> getSplits() throws IOException {
} else {
TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
if (entry.getKey() + entry.getValue().getLength() > logicalLen) {
long blockOffset = entry.getKey();
long blockLength = entry.getValue().getLength();
if(blockOffset > logicalLen) {
szlta marked this conversation as resolved.
Show resolved Hide resolved
//don't create splits for anything past logical EOF
continue;
//map is ordered, thus any possible entry in the iteration after this is bound to be > logicalLen
break;
}
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
deltas, -1, logicalLen, dir, offsetAndBucket);
long splitLength = blockLength;

long blockEndOvershoot = (blockOffset + blockLength) - logicalLen;
if (blockEndOvershoot > 0) {
// if logicalLen is placed within a block, we should make (this last) split out of the part of this block
// -> we should read less than block end
splitLength -= blockEndOvershoot;
} else if (blockOffsets.lastKey() == blockOffset && blockEndOvershoot < 0) {
// This is the last block but it ends before logicalLen
// This can happen with HDFS if hflush was called and blocks are not persisted to disk yet, but content
// is otherwise available for readers, as DNs have these buffers in memory at this time.
// -> we should read more than (persisted) block end, but surely not more than the whole block
if (fileStatus instanceof HdfsLocatedFileStatus) {
HdfsLocatedFileStatus hdfsFileStatus = (HdfsLocatedFileStatus)fileStatus;
if (hdfsFileStatus.getLocatedBlocks().isUnderConstruction()) {
// blockEndOvershoot is negative here...
splitLength = Math.min(splitLength - blockEndOvershoot, hdfsFileStatus.getBlockSize());
szlta marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, blockOffset,
splitLength, entry.getValue().getHosts(), null, isOriginal, true,
deltas, -1, logicalLen, dir, offsetAndBucket);
splits.add(orcSplit);
}
}
Expand Down Expand Up @@ -1431,6 +1455,7 @@ static final class SplitGenerator implements Callable<List<OrcSplit>> {
//this is the root of the partition in which the 'file' is located
private final Path rootDir;
OrcSplit.OffsetAndBucketProperty offsetAndBucket = null;
boolean isAcidTableScan;

public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
Expand All @@ -1452,6 +1477,7 @@ public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
this.deltaSplits = splitInfo.getSplits();
this.allowSyntheticFileIds = allowSyntheticFileIds;
this.ppdResult = splitInfo.ppdResult;
this.isAcidTableScan = AcidUtils.isFullAcidScan(context.conf);
}

public boolean isBlocking() {
Expand Down Expand Up @@ -1550,7 +1576,7 @@ OrcSplit createSplit(long offset, long length, OrcTail orcTail) throws IOExcepti
}

// scale the raw data size to split level based on ratio of split wrt to file length
final long fileLen = file.getLen();
final long fileLen = isAcidTableScan ? AcidUtils.getLogicalLength(fs, file) : file.getLen();
final double splitRatio = (double) length / (double) fileLen;
final long scaledProjSize = projColsUncompressedSize > 0 ?
(long) (splitRatio * projColsUncompressedSize) : fileLen;
Expand Down
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -36,6 +38,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -47,6 +50,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
Expand Down Expand Up @@ -1070,14 +1075,23 @@ public static class MockFile {
int length;
MockBlock[] blocks;
byte[] content;
// If true - it will simulate an HDFS file that was hflushed (i.e. might have content above what can be found as
// first block on disk) and is still being written to (e.g. isUnderConstruction is true)
boolean isHdfsHflushed = false;

public MockFile(String path, int blockSize, byte[] content,
MockBlock... blocks) {
this(path, blockSize, content, false, blocks);
}

public MockFile(String path, int blockSize, byte[] content, boolean isHdfsHflushed,
MockBlock... blocks) {
this.path = new Path(path);
this.blockSize = blockSize;
this.blocks = blocks;
this.content = content;
this.length = content.length;
this.isHdfsHflushed = isHdfsHflushed;
int offset = 0;
for(MockBlock block: blocks) {
block.offset = offset;
Expand Down Expand Up @@ -1148,6 +1162,11 @@ public int read() throws IOException {
}
return -1;
}

@Override
public int available() throws IOException {
return file.length - offset;
}
}

public static class MockPath extends Path {
Expand Down Expand Up @@ -1506,8 +1525,23 @@ private FileStatus createDirectory(Path dir) {

private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException {
FileStatus fileStatus = createStatus(file);
return new LocatedFileStatus(fileStatus,
getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
if (file.isHdfsHflushed) {
// Should work the same way as the local status except for having isUnderConstruction flag set to true and
// having HdfsLocatedFileStatus type
LocatedBlocks lb = new LocatedBlocks(fileStatus.getLen(), true, null, null, false, null, null);
HdfsLocatedFileStatus mockStatus = mock(HdfsLocatedFileStatus.class);
when(mockStatus.getLocatedBlocks()).thenReturn(lb);
when(mockStatus.getPath()).thenReturn(fileStatus.getPath());
when(mockStatus.getLen()).thenReturn(fileStatus.getLen());
when(mockStatus.isDirectory()).thenReturn(false);
when(mockStatus.isFile()).thenReturn(true);
when(mockStatus.getBlockSize()).thenReturn(fileStatus.getBlockSize());
when(mockStatus.getBlockLocations()).thenReturn(getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(),
false));
return mockStatus;
} else {
return new LocatedFileStatus(fileStatus, getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
}
}

private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException {
Expand Down Expand Up @@ -4221,4 +4255,94 @@ record += 1;

reader.close();
}


private static List<MockFile> mockDeltaWithSideFileForStreaming(String delta, int contentLength, int flush_length) {
final int blockSize = 1000;
boolean isDeltaHflushed = contentLength < flush_length;

List<MockFile> files = new LinkedList<>();

ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
bb.putLong(flush_length);
bb.array();

MockBlock[] blocks = new MockBlock[(contentLength / blockSize) + 1];
for (int i = 0; i < blocks.length; ++i) {
blocks[i] = new MockBlock("host1");
}

files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000", blockSize, new byte[contentLength], isDeltaHflushed,
blocks));
files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000_flush_length", blockSize, bb.array(), false,
new MockBlock("host1")));

return files;
}

private List<OrcSplit> splitsForStreamingAcidTable(List<MockFile> files) throws Exception {
try {
MockFileSystem fs = new MockFileSystem(conf);
files.forEach(f -> MockFileSystem.addGlobalFile(f));
conf.set("bucket_count", "1");
//set up props for read
conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
AcidUtils.setAcidOperationalProperties(conf, true, null);
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS,
TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES,
TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty());
conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
MockPath mockPath = new MockPath(fs, "mock:/streaming");
conf.set("mapred.input.dir", mockPath.toString());
conf.set("fs.defaultFS", "mock:///");
conf.set("fs.mock.impl", MockFileSystem.class.getName());
conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
OrcInputFormat.Context context = new OrcInputFormat.Context(conf, 0);

OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs,
"mock:/streaming"),
false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
return ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();

} finally {
MockFileSystem.clearGlobalFiles();
}
}

@Test
public void testAcidTableStreamingBISplitGeneration() throws Exception {
List<OrcSplit> result = null;
List<MockFile> files = new LinkedList<>();

// 1 complete delta file + 1 incomplete where more rows were committed than written to disk
// (1000) + (15/95)
files.addAll(mockDeltaWithSideFileForStreaming("delta_0000001_0000010_0000", 1000, 1000));
files.addAll(mockDeltaWithSideFileForStreaming("delta_0000011_0000020_0000", 15, 95));
result = splitsForStreamingAcidTable(files);
files.clear();
assertEquals(1000, result.get(0).getLength());
assertEquals(95, result.get(1).getLength());

// 1 incomplete delta with 2 complete and 1 incomplete blocks: (1000 + 1000 + 500/800)
files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 2500, 2800));
result = splitsForStreamingAcidTable(files);
files.clear();
assertEquals(1000, result.get(0).getLength());
assertEquals(1000, result.get(1).getLength());
assertEquals(800, result.get(2).getLength());

// 1 complete delta but shorter flush_length - though I think this is almost impossible
files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 1000, 450));
result = splitsForStreamingAcidTable(files);
files.clear();
assertEquals(450, result.get(0).getLength());

}
}