Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Tajo Change Log


Release 0.12.0 - unreleased

NEW FEATURES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,15 @@ public void setFilter(EvalNode filter) {
public boolean isSplittable() {
return false;
}

@Override
public float getProgress() {
if (!inited) return super.getProgress();

if (!dataFileReader.hasNext()) {
return 1.0f;
} else {
return 0.0f;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ public void init() throws IOException {
targets = schema.toArray();
}

super.init();

outTuple = new VTuple(targets.length);

Path path = fragment.getPath();
Expand Down Expand Up @@ -163,6 +161,7 @@ public void init() throws IOException {
recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE,
fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone));

super.init();
LOG.debug("file fragment { path: " + fragment.getPath() +
", start offset: " + fragment.getStartKey() +
", length: " + fragment.getLength() + "}");
Expand Down Expand Up @@ -307,6 +306,8 @@ private void getNextBatch() throws IOException {

@Override
public float getProgress() {
if(!inited) return super.getProgress();

return recordReader.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,14 @@ public void setFilter(EvalNode filter) {
public boolean isSplittable() {
return false;
}

@Override
public float getProgress() {

if (!inited) {
return super.getProgress();
} else {
return reader.getProgress();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,29 @@ public void setFilter(EvalNode filter) {
public boolean isSplittable(){
return true;
}

@Override
public float getProgress() {
if (!inited) return super.getProgress();

if (!more) {
return 1.0f;
} else {
long filePos;
float progress;
try {
filePos = reader.getPosition();
if (start == filePos) {
progress = 0.0f;
} else {
long readBytes = filePos - start;
long remainingBytes = Math.max(end - filePos, 0);
progress = Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
}
} catch (IOException e) {
progress = 0.0f;
}
return progress;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,50 +300,19 @@ public DelimitedTextFileScanner(Configuration conf, final Schema schema, final T

@Override
public void init() throws IOException {
if (reader != null) {
reader.close();
}

if(deserializer != null) {
deserializer.release();
}

reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
reader.init();
recordCount = 0;

if (targets == null) {
targets = schema.toArray();
}

outTuple = new VTuple(targets.length);
reset();

super.init();
if (LOG.isDebugEnabled()) {
LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
}

// skip first line if it reads from middle of file
if (startOffset > 0) {
reader.readLine();
} else { // skip header lines if it is defined

// initialization for skipping header(max 20)
int headerLineNum = Math.min(Integer.parseInt(meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20);
if (headerLineNum > 0) {
LOG.info(String.format("Skip %d header lines", headerLineNum));
for (int i = 0; i < headerLineNum; i++) {
if (!reader.isReadable()) {
return;
}

reader.readLine();
}
}
}

deserializer = getLineSerde().createDeserializer(schema, meta, targets);
deserializer.init();
}

public TextLineSerDe getLineSerde() {
Expand Down Expand Up @@ -436,7 +405,44 @@ public Tuple next() throws IOException {

@Override
public void reset() throws IOException {
init();
recordCount = 0;

if (reader.getReadBytes() > 0) {
reader.close();

reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
reader.init();
}

if(deserializer != null) {
deserializer.release();
}

deserializer = getLineSerde().createDeserializer(schema, meta, targets);
deserializer.init();

outTuple = new VTuple(targets.length);

// skip first line if it reads from middle of file
if (startOffset > 0) {
reader.readLine();
} else { // skip header lines if it is defined

// initialization for skipping header(max 20)
int headerLineNum = Math.min(Integer.parseInt(
meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20);

if (headerLineNum > 0) {
LOG.info(String.format("Skip %d header lines", headerLineNum));
for (int i = 0; i < headerLineNum; i++) {
if (!reader.isReadable()) {
return;
}

reader.readLine();
}
}
}
}

@Override
Expand All @@ -446,16 +452,16 @@ public void close() throws IOException {
deserializer.release();
}

if (tableStats != null && reader != null) {
if (reader != null) {
tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
tableStats.setNumRows(recordCount);
}

if (LOG.isDebugEnabled()) {
LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
}
} finally {
IOUtils.cleanup(LOG, reader);
reader = null;
outTuple = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public T getCurrentValue() throws IOException,
return currentValue;
}

public float getProgress() throws IOException, InterruptedException {
public float getProgress() {
return (float) current / total;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,12 @@ public void close() throws IOException {
reader.close();
}
}

public float getProgress() {
if (!footersIterator.hasNext()) {
return 1.0f;
} else {
return reader != null ? reader.getProgress() : 0.0f;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.JavaResourceUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -154,6 +155,11 @@ public static Collection<Object[]> generateParameters() {
});
}

@After
public void tearDown() throws IOException {
fs.delete(testDir, true);
}

@Test
public void testSplitable() throws IOException {
if (splitable) {
Expand Down Expand Up @@ -1303,4 +1309,51 @@ public void testFileAlreadyExists() throws IOException {
IOUtils.cleanup(null, appender);
}
}

@Test
public void testProgress() throws IOException {

Schema schema = new Schema();
schema.addColumn("col1", Type.FLOAT4);
schema.addColumn("col2", Type.FLOAT8);
schema.addColumn("col3", Type.INT2);
schema.addColumn("col4", Type.INT4);
schema.addColumn("col5", Type.INT8);

KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA);
}

FileTablespace sm = TablespaceManager.getLocalFs();
Path tablePath = new Path(testDir, "testProgress.data");
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();

VTuple tuple = new VTuple(new Datum[]{
DatumFactory.createFloat4(Float.MAX_VALUE),
DatumFactory.createFloat8(Double.MAX_VALUE),
DatumFactory.createInt2(Short.MAX_VALUE),
DatumFactory.createInt4(Integer.MAX_VALUE),
DatumFactory.createInt8(Long.MAX_VALUE)
});

appender.addTuple(tuple);
appender.flush();
appender.close();

FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
Scanner scanner = sm.getScanner(meta, schema, fragment, null);

assertEquals(0.0f, scanner.getProgress(), 0.0f);

scanner.init();
assertNotNull(scanner.next());
assertNull(null, scanner.next());

scanner.close();
assertEquals(1.0f, scanner.getProgress(), 0.0f);
}
}