Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,10 @@ public BooleanDatum equalsTo(Datum datum) {
public int compareTo(Datum datum) {
switch (datum.type()) {
case BOOLEAN:
if (val && !datum.asBool()) {
return -1;
} else if (val && datum.asBool()) {
return 1;
} else {
if ((val ^ datum.asBool()) == false) { // if both are the same regardless of its value.
return 0;
} else {
return val ? -1 : 1;
}
default:
throw new InvalidOperationException(datum.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.gson.annotations.Expose;
import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.NetUtils;

import static org.apache.tajo.common.TajoDataTypes.Type;

Expand All @@ -36,11 +37,7 @@ public class Inet4Datum extends Datum {

public Inet4Datum(String addr) {
super(Type.INET4);
String [] elems = addr.split("\\.");
address = Integer.parseInt(elems[3]) & 0xFF
| ((Integer.parseInt(elems[2]) << 8) & 0xFF00)
| ((Integer.parseInt(elems[1]) << 16) & 0xFF0000)
| ((Integer.parseInt(elems[0]) << 24) & 0xFF000000);
address = NetUtils.convertIPStringToInt(addr);
}

public Inet4Datum(byte[] addr) {
Expand Down
13 changes: 0 additions & 13 deletions tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public int compareTo(Datum datum) {
public Datum plus(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt2((short) (val + datum.asInt2()));
case INT4:
return DatumFactory.createInt4(val + datum.asInt4());
case INT8:
Expand All @@ -220,7 +219,6 @@ public Datum plus(Datum datum) {
public Datum minus(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt2((short) (val - datum.asInt2()));
case INT4:
return DatumFactory.createInt4(val - datum.asInt4());
case INT8:
Expand All @@ -245,7 +243,6 @@ public Datum minus(Datum datum) {
public Datum multiply(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt4(val * datum.asInt2());
case INT4:
return DatumFactory.createInt4(val * datum.asInt4());
case INT8:
Expand All @@ -268,11 +265,6 @@ public Datum multiply(Datum datum) {
public Datum divide(Datum datum) {
switch (datum.type()) {
case INT2:
short paramValueI2 = datum.asInt2();
if (!validateDivideZero(paramValueI2)) {
return NullDatum.get();
}
return DatumFactory.createInt2((short) (val / paramValueI2));
case INT4:
int paramValueI4 = datum.asInt4();
if (!validateDivideZero(paramValueI4)) {
Expand Down Expand Up @@ -308,11 +300,6 @@ public Datum divide(Datum datum) {
public Datum modular(Datum datum) {
switch (datum.type()) {
case INT2:
short paramValueI2 = datum.asInt2();
if (!validateDivideZero(paramValueI2)) {
return NullDatum.get();
}
return DatumFactory.createInt2((short) (val % paramValueI2));
case INT4:
int paramValueI4 = datum.asInt4();
if (!validateDivideZero(paramValueI4)) {
Expand Down
9 changes: 9 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,13 @@ public static String normalizeHost(String host) {
}
return host;
}

public static int convertIPStringToInt(String ipAddr) {
String [] elems = ipAddr.split("\\.");
int address = Integer.parseInt(elems[3]) & 0xFF
| ((Integer.parseInt(elems[2]) << 8) & 0xFF00)
| ((Integer.parseInt(elems[1]) << 16) & 0xFF0000)
| ((Integer.parseInt(elems[0]) << 24) & 0xFF000000);
return address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ public void setUp() {
@Test
public void testInt2Datum() throws Exception {
//plus
runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)15));
runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)15));
runAndAssert("plus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(15));
runAndAssert("plus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(15));
runAndAssert("plus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(15.0f));
runAndAssert("plus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(15.0));

//minus
runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)5));
runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)5));
runAndAssert("minus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(5));
runAndAssert("minus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(5));
runAndAssert("minus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(5.0f));
runAndAssert("minus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(5.0));

runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int2Datum((short)-5));
runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int4Datum((short)-5));
runAndAssert("minus", new Int2Datum((short)5), new Int4Datum(10), new Int4Datum(-5));
runAndAssert("minus", new Int2Datum((short)5), new Int8Datum(10), new Int8Datum(-5));
runAndAssert("minus", new Int2Datum((short)5), new Float4Datum(10.0f), new Float4Datum(-5.0f));
Expand All @@ -89,7 +89,7 @@ public void testInt2Datum() throws Exception {
runAndAssert("multiply", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(50.0));

//divide
runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)2));
runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)2));
runAndAssert("divide", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(2));
runAndAssert("divide", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(2));
runAndAssert("divide", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(2.0f));
Expand All @@ -102,7 +102,7 @@ public void testInt2Datum() throws Exception {
runAndAssert("divide", new Int2Datum((short)10), new Float8Datum(0.0), NullDatum.get());

//modular
runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int2Datum((short)1));
runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int4Datum((short)1));
runAndAssert("modular", new Int2Datum((short)10), new Int4Datum(3), new Int4Datum(1));
runAndAssert("modular", new Int2Datum((short)10), new Int8Datum(3), new Int8Datum(1));
runAndAssert("modular", new Int2Datum((short)10), new Float4Datum(3.0f), new Float4Datum(1.0f));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -73,6 +75,11 @@ public Tuple next() throws IOException {
}
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -158,6 +160,11 @@ public Tuple next() throws IOException {
return outTuple;
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
if (state == State.INITED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.tajo.catalog.SchemaObject;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
Expand Down Expand Up @@ -60,6 +63,10 @@ protected void compile() throws CompilationError {

public abstract Tuple next() throws IOException;

public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

public abstract void rescan() throws IOException;

public abstract void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -62,6 +64,11 @@ public Tuple next() throws IOException {
}
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
init();
Expand Down
51 changes: 51 additions & 0 deletions tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter;
import org.apache.tajo.tuple.offheap.RowWriter;
import org.apache.tajo.util.BytesUtils;

import java.io.*;
Expand Down Expand Up @@ -480,6 +484,53 @@ public Tuple next() throws IOException {
}
}

TextSerializerDeserializer deserializer = new TextSerializerDeserializer();

boolean hasNext() throws IOException {
if (currentIdx == validIdx) {
if (eof) {
return false;
} else {
page();

if(currentIdx == validIdx){
return false;
}
}
}

return true;
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
rowBlock.clear();
OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter();

while(hasNext() && rowBlock.rows() < rowBlock.maxRowNum()) {
byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
currentIdx++;

int fieldIdx = 0;
writer.startRow();
for (; fieldIdx < cells.length && fieldIdx < schema.size(); fieldIdx++) {
if (cells[fieldIdx] == null) {
writer.skipField();
} else {
deserializer.write(writer, schema.getColumn(fieldIdx), cells[fieldIdx], 0, cells[fieldIdx].length, nullChars);

}
}
for (; fieldIdx < schema.size(); fieldIdx++) {
writer.skipField();
}
writer.endRow();
}

return rowBlock.rows() > 0;
}

private boolean isCompress() {
return codec != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;

Expand Down Expand Up @@ -80,6 +82,11 @@ public Schema getSchema() {
return schema;
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void setTarget(Column[] targets) {
if (inited) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -113,6 +115,11 @@ public Tuple next() throws IOException {
return tuple;
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
this.iterator = fragments.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SchemaObject;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -41,6 +43,13 @@ public interface Scanner extends SchemaObject, Closeable {
* @throws IOException if internal I/O error occurs during next method
*/
Tuple next() throws IOException;

/**
*
* @param rowBlock
* @return
*/
boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException;

/**
* Reset the cursor. After executed, the scanner
Expand Down
Loading