Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,22 @@ public void invoke(String[] command) throws Exception {

CountDownLatch latch = new CountDownLatch(2);
Process process = Runtime.getRuntime().exec(execCommand);
InputStreamConsoleWriter inWriter = new InputStreamConsoleWriter(process.getInputStream(), sout, "", latch);
InputStreamConsoleWriter errWriter = new InputStreamConsoleWriter(process.getErrorStream(), sout, "ERROR: ", latch);
try {
InputStreamConsoleWriter inWriter = new InputStreamConsoleWriter(process.getInputStream(), sout, "", latch);
InputStreamConsoleWriter errWriter = new InputStreamConsoleWriter(process.getErrorStream(), sout, "ERROR: ", latch);

inWriter.start();
errWriter.start();
inWriter.start();
errWriter.start();

int processResult = process.waitFor();
latch.await();
if (processResult != 0) {
throw new IOException("ERROR: Failed with exit code = " + processResult);
int processResult = process.waitFor();
latch.await();
if (processResult != 0) {
throw new IOException("ERROR: Failed with exit code = " + processResult);
}
} finally {
org.apache.commons.io.IOUtils.closeQuietly(process.getInputStream());
org.apache.commons.io.IOUtils.closeQuietly(process.getOutputStream());
org.apache.commons.io.IOUtils.closeQuietly(process.getErrorStream());
}
}

Expand Down
5 changes: 4 additions & 1 deletion tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ public static enum ConfVars {
OPTIMIZER_JOIN_ENABLE("tajo.optimizer.join.enable", true),

// DEBUG OPTION
TAJO_DEBUG("tajo.debug", false)
TAJO_DEBUG("tajo.debug", false),

// ONLY FOR TESTCASE
TESTCASE_MIN_TASK_NUM("tajo.testcase.min.task.num", -1)
;

public final String varname;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,17 @@ public JoinTupleComparator(Schema leftschema, Schema rightschema, SortSpec[][] s
@Override
public int compare(Tuple outerTuple, Tuple innerTuple) {
for (int i = 0; i < numSortKey; i++) {
outer = outerTuple.get(outerSortKeyIds[i]);
inner = innerTuple.get(innerSortKeyIds[i]);
if (outerTuple == null) {
outer = NullDatum.get();
} else {
outer = outerTuple.get(outerSortKeyIds[i]);
}

if (innerTuple == null) {
inner = NullDatum.get();
} else {
inner = innerTuple.get(innerSortKeyIds[i]);
}

if (outer instanceof NullDatum || inner instanceof NullDatum) {
if (!outer.equals(inner)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,9 @@ public void dumpThread(Writer writer) {

public static List<File> getMountPath() throws Exception {
BufferedReader mountOutput = null;
Process mountProcess = null;
try {
Process mountProcess = Runtime.getRuntime ().exec("mount");
mountProcess = Runtime.getRuntime ().exec("mount");
mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
List<File> mountPaths = new ArrayList<File>();
while (true) {
Expand All @@ -560,6 +561,11 @@ public static List<File> getMountPath() throws Exception {
if(mountOutput != null) {
mountOutput.close();
}
if (mountProcess != null) {
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getInputStream());
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getOutputStream());
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getErrorStream());
}
}
}
public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC
}
}

// If one of inner join tables has no input data,
// it should return zero rows.
// If one of inner join tables has no input data, it should return zero rows.
JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN);
if (joinNode != null) {
if ( (joinNode.getJoinType().equals(JoinType.INNER))) {
if ( (joinNode.getJoinType() == JoinType.INNER)) {
for (int i = 0; i < stats.length; i++) {
if (stats[i] == 0) {
return;
Expand All @@ -124,6 +123,36 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC
}
}

// If node is outer join and a preserved relation is empty, it should return zero rows.
joinNode = PlannerUtil.findTopNode(execBlock.getPlan(), NodeType.JOIN);
if (joinNode != null) {
// find left top scan node
ScanNode leftScanNode = PlannerUtil.findTopNode(joinNode.getLeftChild(), NodeType.SCAN);
ScanNode rightScanNode = PlannerUtil.findTopNode(joinNode.getRightChild(), NodeType.SCAN);

long leftStats = -1;
long rightStats = -1;
if (stats.length == 2) {
for (int i = 0; i < stats.length; i++) {
if (scans[i].equals(leftScanNode)) {
leftStats = stats[i];
} else if (scans[i].equals(rightScanNode)) {
rightStats = stats[i];
}
}
if (joinNode.getJoinType() == JoinType.LEFT_OUTER) {
if (leftStats == 0) {
return;
}
}
if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
if (rightStats == 0) {
return;
}
}
}
}

// Assigning either fragments or fetch urls to query units
boolean isAllBroadcastTable = true;
int baseScanIdx = -1;
Expand Down Expand Up @@ -360,7 +389,7 @@ private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
String mergedKey = partition.getEbId().toString() + "," + partition.getPullHost();

if (mergedPartitions.containsKey(mergedKey)) {
FetchImpl fetch = mergedPartitions.get(partition.getPullHost());
FetchImpl fetch = mergedPartitions.get(mergedKey);
fetch.addPart(partition.getTaskId(), partition.getAttemptId());
} else {
// In some cases like union each IntermediateEntry has different EBID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann

// determine the number of task
taskNum = Math.min(taskNum, slots);
if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) {
taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM);
LOG.warn("!!!!! TESTCASE MODE !!!!!");
}
LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);

// The shuffle output numbers of join may be inconsistent by execution block order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,9 @@ public void dumpThread(Writer writer) {

public static List<File> getMountPath() throws IOException {
BufferedReader mountOutput = null;
Process mountProcess = null;
try {
Process mountProcess = Runtime.getRuntime ().exec("mount");
mountProcess = Runtime.getRuntime ().exec("mount");
mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
List<File> mountPaths = new ArrayList<File>();
while (true) {
Expand All @@ -563,6 +564,11 @@ public static List<File> getMountPath() throws IOException {
if(mountOutput != null) {
mountOutput.close();
}
if (mountProcess != null) {
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getInputStream());
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getOutputStream());
org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getErrorStream());
}
}
}

Expand Down
77 changes: 50 additions & 27 deletions tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,21 +573,29 @@ public static ResultSet run(String[] names,
}
TajoConf conf = util.getConfiguration();
TajoClient client = new TajoClient(conf);

FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
getStorageManager().getWarehouseDir();
fs.mkdirs(rootDir);
for (int i = 0; i < names.length; i++) {
createTable(names[i], schemas[i], tableOption, tables[i]);
try {
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
getStorageManager().getWarehouseDir();
fs.mkdirs(rootDir);
for (int i = 0; i < names.length; i++) {
createTable(names[i], schemas[i], tableOption, tables[i]);
}
Thread.sleep(1000);
ResultSet res = client.executeQueryAndGetResult(query);
return res;
} finally {
client.close();
}
Thread.sleep(1000);
ResultSet res = client.executeQueryAndGetResult(query);
return res;
}

public static void createTable(String tableName, Schema schema,
KeyValueSet tableOption, String[] tableDatas) throws Exception {
createTable(tableName, schema, tableOption, tableDatas, 1);
}

public static void createTable(String tableName, Schema schema,
KeyValueSet tableOption, String[] tableDatas, int numDataFiles) throws Exception {
TpchTestBase instance = TpchTestBase.getInstance();
TajoTestingCluster util = instance.getTestingCluster();
while(true) {
Expand All @@ -598,25 +606,40 @@ public static void createTable(String tableName, Schema schema,
}
TajoConf conf = util.getConfiguration();
TajoClient client = new TajoClient(conf);

FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
getStorageManager().getWarehouseDir();
if (!fs.exists(rootDir)) {
fs.mkdirs(rootDir);
}
Path tablePath = new Path(rootDir, tableName);
fs.mkdirs(tablePath);
if (tableDatas.length > 0) {
Path dfsPath = new Path(tablePath, tableName + ".tbl");
FSDataOutputStream out = fs.create(dfsPath);
for (int j = 0; j < tableDatas.length; j++) {
out.write((tableDatas[j] + "\n").getBytes());
try {
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
getStorageManager().getWarehouseDir();
if (!fs.exists(rootDir)) {
fs.mkdirs(rootDir);
}
out.close();
Path tablePath = new Path(rootDir, tableName);
fs.mkdirs(tablePath);
if (tableDatas.length > 0) {
int recordPerFile = tableDatas.length / numDataFiles;
if (recordPerFile == 0) {
recordPerFile = 1;
}
FSDataOutputStream out = null;
for (int j = 0; j < tableDatas.length; j++) {
if (out == null || j % recordPerFile == 0) {
if (out != null) {
out.close();
}
Path dfsPath = new Path(tablePath, tableName + j + ".tbl");
out = fs.create(dfsPath);
}
out.write((tableDatas[j] + "\n").getBytes());
}
if (out != null) {
out.close();
}
}
TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
client.createExternalTable(tableName, schema, tablePath, meta);
} finally {
client.close();
}
TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
client.createExternalTable(tableName, schema, tablePath, meta);
}

/**
Expand Down
Loading