From ecab5c25d027944c445f1a1886ca33ec9294ec56 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 2 Dec 2015 17:56:10 +0900 Subject: [PATCH] TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. --- .../java/org/apache/tajo/storage/RawFile.java | 5 +- .../storage/rawfile/DirectRawFileScanner.java | 46 ++++++++----------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 26bd135e6e..f31b85cef8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -453,10 +453,11 @@ public float getProgress() { return 1.0f; } - if (filePosition - startOffset == 0) { + long readBytes = filePosition - startOffset; + if (readBytes == 0) { return 0.0f; } else { - return Math.min(1.0f, ((float) filePosition / endOffset)); + return Math.min(1.0f, ((float) readBytes / fragment.getLength())); } } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java index 550de63d75..1e2380e2eb 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.*; @@ -50,7 +51,6 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner private SeekableInputChannel channel; private boolean eos = false; - private long fileSize; private long recordCount; private long filePosition; private long endOffset; @@ -95,10 +95,8 @@ private void initChannel() throws IOException { } channel = new LocalFileInputChannel(new FileInputStream(file)); - fileSize = channel.size(); } else { channel = new FSDataInputChannel(fs.open(fragment.getPath())); - fileSize = channel.size(); } // initial set position @@ -106,10 +104,6 @@ private void initChannel() throws IOException { channel.seek(fragment.getStartKey()); } - if (tableStats != null) { - tableStats.setNumBytes(fileSize); - } - filePosition = fragment.getStartKey(); endOffset = fragment.getStartKey() + fragment.getLength(); if (LOG.isDebugEnabled()) { @@ -178,7 +172,7 @@ public void reset() throws IOException { @Override public void close() throws IOException { if (tableStats != null) { - tableStats.setReadBytes(fileSize); + tableStats.setReadBytes(filePosition - fragment.getStartKey()); tableStats.setNumRows(recordCount); } if(tupleBuffer != null) { @@ -210,31 +204,29 @@ public boolean isSplittable(){ return false; } + @Override + public TableStats getInputStats() { + if(tableStats != null){ + tableStats.setNumRows(recordCount); + tableStats.setReadBytes(filePosition - fragment.getStartKey()); // actual read bytes (scan + rescan * n) + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + @Override public float getProgress() { if(!inited) return 0.0f; - try { - tableStats.setNumRows(recordCount); - long filePos = 0; - if (channel != null) { - filePos = channel.position(); - tableStats.setReadBytes(filePos); - } - - if(eos || channel == null) { - tableStats.setReadBytes(fileSize); - return 1.0f; - } + if(eos) { + return 1.0f; + } - if (filePos == 0) { - return 0.0f; - } else { - return Math.min(1.0f, ((float)filePos / (float)fileSize)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + long readBytes = filePosition - fragment.getStartKey(); + if (readBytes == 0) { return 0.0f; + } else { + return Math.min(1.0f, ((float) readBytes / fragment.getLength())); } } }