Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/SessionVars.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public enum SessionVars implements ConfigKey {
"shuffle output size for partition table write (mb)", DEFAULT),

// for physical Executors
EXEC_ENGINE(ConfVars.$EXECUTOR_ENGINE,
"executor engine types that queries will use. Types: volcano and block (default is volcano)", DEFAULT),
EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),
INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public static enum ConfVars implements ConfigKey {
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),

// for physical Executors
$EXECUTOR_ENGINE("tajo.executor.engine", "volcano"), // volcano, and block
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,10 @@ public BooleanDatum equalsTo(Datum datum) {
public int compareTo(Datum datum) {
switch (datum.type()) {
case BOOLEAN:
if (val && !datum.asBool()) {
return -1;
} else if (val && datum.asBool()) {
return 1;
} else {
if ((val ^ datum.asBool()) == false) { // if both are the same regardless of its value.
return 0;
} else {
return val ? -1 : 1;
}
default:
throw new InvalidOperationException(datum.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.gson.annotations.Expose;
import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.NetUtils;

import static org.apache.tajo.common.TajoDataTypes.Type;

Expand All @@ -36,11 +37,7 @@ public class Inet4Datum extends Datum {

public Inet4Datum(String addr) {
super(Type.INET4);
String [] elems = addr.split("\\.");
address = Integer.parseInt(elems[3]) & 0xFF
| ((Integer.parseInt(elems[2]) << 8) & 0xFF00)
| ((Integer.parseInt(elems[1]) << 16) & 0xFF0000)
| ((Integer.parseInt(elems[0]) << 24) & 0xFF000000);
address = NetUtils.convertIPStringToInt(addr);
}

public Inet4Datum(byte[] addr) {
Expand Down
13 changes: 0 additions & 13 deletions tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public int compareTo(Datum datum) {
public Datum plus(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt2((short) (val + datum.asInt2()));
case INT4:
return DatumFactory.createInt4(val + datum.asInt4());
case INT8:
Expand All @@ -220,7 +219,6 @@ public Datum plus(Datum datum) {
public Datum minus(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt2((short) (val - datum.asInt2()));
case INT4:
return DatumFactory.createInt4(val - datum.asInt4());
case INT8:
Expand All @@ -245,7 +243,6 @@ public Datum minus(Datum datum) {
public Datum multiply(Datum datum) {
switch (datum.type()) {
case INT2:
return DatumFactory.createInt4(val * datum.asInt2());
case INT4:
return DatumFactory.createInt4(val * datum.asInt4());
case INT8:
Expand All @@ -268,11 +265,6 @@ public Datum multiply(Datum datum) {
public Datum divide(Datum datum) {
switch (datum.type()) {
case INT2:
short paramValueI2 = datum.asInt2();
if (!validateDivideZero(paramValueI2)) {
return NullDatum.get();
}
return DatumFactory.createInt2((short) (val / paramValueI2));
case INT4:
int paramValueI4 = datum.asInt4();
if (!validateDivideZero(paramValueI4)) {
Expand Down Expand Up @@ -308,11 +300,6 @@ public Datum divide(Datum datum) {
public Datum modular(Datum datum) {
switch (datum.type()) {
case INT2:
short paramValueI2 = datum.asInt2();
if (!validateDivideZero(paramValueI2)) {
return NullDatum.get();
}
return DatumFactory.createInt2((short) (val % paramValueI2));
case INT4:
int paramValueI4 = datum.asInt4();
if (!validateDivideZero(paramValueI4)) {
Expand Down
9 changes: 9 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,13 @@ public static String normalizeHost(String host) {
}
return host;
}

public static int convertIPStringToInt(String ipAddr) {
String [] elems = ipAddr.split("\\.");
int address = Integer.parseInt(elems[3]) & 0xFF
| ((Integer.parseInt(elems[2]) << 8) & 0xFF00)
| ((Integer.parseInt(elems[1]) << 16) & 0xFF0000)
| ((Integer.parseInt(elems[0]) << 24) & 0xFF000000);
return address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ public void setUp() {
@Test
public void testInt2Datum() throws Exception {
//plus
runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)15));
runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)15));
runAndAssert("plus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(15));
runAndAssert("plus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(15));
runAndAssert("plus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(15.0f));
runAndAssert("plus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(15.0));

//minus
runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)5));
runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)5));
runAndAssert("minus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(5));
runAndAssert("minus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(5));
runAndAssert("minus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(5.0f));
runAndAssert("minus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(5.0));

runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int2Datum((short)-5));
runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int4Datum((short)-5));
runAndAssert("minus", new Int2Datum((short)5), new Int4Datum(10), new Int4Datum(-5));
runAndAssert("minus", new Int2Datum((short)5), new Int8Datum(10), new Int8Datum(-5));
runAndAssert("minus", new Int2Datum((short)5), new Float4Datum(10.0f), new Float4Datum(-5.0f));
Expand All @@ -89,7 +89,7 @@ public void testInt2Datum() throws Exception {
runAndAssert("multiply", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(50.0));

//divide
runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)2));
runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)2));
runAndAssert("divide", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(2));
runAndAssert("divide", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(2));
runAndAssert("divide", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(2.0f));
Expand All @@ -102,7 +102,7 @@ public void testInt2Datum() throws Exception {
runAndAssert("divide", new Int2Datum((short)10), new Float8Datum(0.0), NullDatum.get());

//modular
runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int2Datum((short)1));
runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int4Datum((short)1));
runAndAssert("modular", new Int2Datum((short)10), new Int4Datum(3), new Int4Datum(1));
runAndAssert("modular", new Int2Datum((short)10), new Int8Datum(3), new Int8Datum(1));
runAndAssert("modular", new Int2Datum((short)10), new Float4Datum(3.0f), new Float4Datum(1.0f));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.utils.TupleBuilderUtil;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.offheap.RowWriter;

/**
* An annotated expression which includes actual data domains.
Expand Down Expand Up @@ -59,6 +61,11 @@ public String toJson() {

public abstract <T extends Datum> T eval(Schema schema, Tuple tuple);

public void eval(Schema schema, Tuple tuple, RowWriter builder) {
Datum result = eval(schema, tuple);
TupleBuilderUtil.writeEvalResult(builder, result.type(), result);
}

@Deprecated
public abstract void preOrder(EvalNodeVisitor visitor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.utils.TupleBuilderUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.TupleBuilder;
import org.apache.tajo.tuple.offheap.RowWriter;
import org.apache.tajo.worker.TaskAttemptContext;

public class Projector {
Expand All @@ -33,7 +36,14 @@ public class Projector {
private final int targetNum;
private final EvalNode[] evals;

private final boolean useJITInSession;
private final boolean useJITInOperator;

public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
this(context, inSchema, outSchema, targets, true);
}

public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets, boolean useJIT) {
this.context = context;
this.inSchema = inSchema;
if (targets == null) {
Expand All @@ -45,7 +55,10 @@ public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema,
this.targetNum = this.targets.length;
evals = new EvalNode[targetNum];

if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
useJITInOperator = useJIT;
useJITInSession = context.getQueryContext().getBool(SessionVars.CODEGEN);

if (useJITInOperator && useJITInSession) {
EvalNode eval;
for (int i = 0; i < targetNum; i++) {
eval = this.targets[i].getEvalTree();
Expand All @@ -63,4 +76,12 @@ public void eval(Tuple in, Tuple out) {
out.put(i, evals[i].eval(inSchema, in));
}
}

public void eval(Tuple in, RowWriter builder) {
if (useJITInOperator && useJITInSession) {
TupleBuilderUtil.evaluateNative(inSchema, in, builder, evals);
} else {
TupleBuilderUtil.evaluate(inSchema, in, builder, evals);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -73,6 +75,11 @@ public Tuple next() throws IOException {
}
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -158,6 +160,11 @@ public Tuple next() throws IOException {
return outTuple;
}

@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

@Override
public void reset() throws IOException {
if (state == State.INITED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.tajo.catalog.SchemaObject;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.codegen.CompilationError;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
Expand Down Expand Up @@ -60,6 +63,10 @@ protected void compile() throws CompilationError {

public abstract Tuple next() throws IOException;

public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}

public abstract void rescan() throws IOException;

public abstract void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.tuple.RowBlockReader;
import org.apache.tajo.tuple.TupleBuilder;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
Expand All @@ -67,6 +72,8 @@ public class SeqScanExec extends PhysicalExec {

private boolean cacheRead = false;

private OffHeapRowBlock inRowBlock;

public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
Expand Down Expand Up @@ -94,6 +101,8 @@ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNo
&& plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
rewriteColumnPartitionedTableSchema();
}

inRowBlock = new OffHeapRowBlock(inSchema, 64 * StorageUnit.KB);
}

/**
Expand Down Expand Up @@ -289,6 +298,21 @@ public Tuple next() throws IOException {
}
}

public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
boolean noMoreTuple = scanner.nextFetch(inRowBlock);
if (!noMoreTuple) {
return false;
}

ZeroCopyTuple zcTuple = new ZeroCopyTuple();
RowBlockReader reader = inRowBlock.getReader();
while (reader.next(zcTuple)) {
projector.eval(zcTuple, rowBlock.getWriter());
}

return true;
}

@Override
public void rescan() throws IOException {
scanner.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.InsertNode;
import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.RowBlockReader;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;

Expand Down Expand Up @@ -121,6 +125,30 @@ public Tuple next() throws IOException {
return null;
}

ZeroCopyTuple zcTuple = new ZeroCopyTuple();
RowBlockReader reader;

public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
if (child.nextFetch(rowBlock)) {
reader = rowBlock.getReader();
while (reader.next(zcTuple)) {
appender.addTuple(zcTuple);;

if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) {
appender.close();

writtenFileNum++;
StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
openNewFile(writtenFileNum);
}
}

return true;
} else {
return false;
}
}

@Override
public void rescan() throws IOException {
// nothing to do
Expand Down
Loading