Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tajo.engine.query;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
Expand All @@ -39,6 +40,7 @@
import org.junit.experimental.categories.Category;

import java.io.File;
import java.io.OutputStream;
import java.sql.ResultSet;

import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
Expand Down Expand Up @@ -646,5 +648,35 @@ public final void testSelfJoin() throws Exception {

}

@Test
public void testMultipleBroadcastDataFileWithZeroLength() throws Exception {
createMultiFile("nation", 2, new TupleCreator() {
public Tuple createTuple(String[] columnDatas) {
return new VTuple(new Datum[]{
new Int4Datum(Integer.parseInt(columnDatas[0])),
new TextDatum(columnDatas[1]),
new Int4Datum(Integer.parseInt(columnDatas[2])),
new TextDatum(columnDatas[3])
});
}
});
addEmptyDataFile("nation");

ResultSet res = executeQuery();

assertResultSet(res);
cleanupQuery(res);

executeString("DROP TABLE nation_multifile PURGE");
}

private void addEmptyDataFile(String tableName) throws Exception {
String multiTableName = tableName + "_multifile";
TableDesc table = client.getTableDesc(multiTableName);

Path dataPath = new Path(table.getPath(), 999999 + "_empty.csv");
FileSystem fs = dataPath.getFileSystem(conf);
OutputStream out = fs.create(dataPath);
out.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
select * from customer_large a
left outer join nation_multifile b on a.c_nationkey = b.n_nationkey
where b.n_nationkey is null
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment,n_nationkey,n_name,n_regionkey,n_comment
-------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.tajo.storage.fragment.FileFragment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -60,8 +61,17 @@ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<File
this.meta = meta;
this.target = target;

this.fragments = new ArrayList<FileFragment>();

long numBytes = 0;
for (FileFragment eachFileFragment: rawFragmentList) {
numBytes += eachFileFragment.getEndKey();
if (eachFileFragment.getEndKey() > 0) {
fragments.add(eachFileFragment);
}
}

// it should keep the input order. Otherwise, it causes wrong result of sort queries.
this.fragments = ImmutableList.copyOf(rawFragmentList);
this.reset();

if (currentScanner != null) {
Expand All @@ -70,13 +80,9 @@ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<File
}

tableStats = new TableStats();
long numBytes = 0;

for (FileFragment eachFileFragment: rawFragmentList) {
numBytes += (eachFileFragment.getEndKey() - eachFileFragment.getStartKey());
}
tableStats.setNumBytes(numBytes);
tableStats.setNumBlocks(rawFragmentList.size());
tableStats.setNumBlocks(fragments.size());

for(Column eachColumn: schema.getColumns()) {
ColumnStats columnStats = new ColumnStats(eachColumn);
Expand Down