Skip to content

Commit

Permalink
[CARBONDATA-2546] Fixed the ArrayIndexOutOfBoundsException when give …
Browse files Browse the repository at this point in the history
…same column twice in projection of CarbonReader

This closes #2348
  • Loading branch information
xubo245 authored and jackylk committed May 29, 2018
1 parent e740182 commit a7faef8
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)

@Override public boolean nextKeyValue() {
return carbonIterator.hasNext();

}

@Override public Void getCurrentKey() throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
Expand Down Expand Up @@ -190,6 +191,32 @@ public static void setFilterPredicates(Configuration configuration, Expression f
}
}

/**
* Set the column projection column names
*
* @param configuration Configuration info
* @param projectionColumns projection columns name
*/
public static void setColumnProjection(Configuration configuration, String[] projectionColumns) {
Objects.requireNonNull(projectionColumns);
if (projectionColumns.length < 1) {
throw new RuntimeException("Projection can't be empty");
}
StringBuilder builder = new StringBuilder();
for (String column : projectionColumns) {
builder.append(column).append(",");
}
String columnString = builder.toString();
columnString = columnString.substring(0, columnString.length() - 1);
configuration.set(COLUMN_PROJECTION, columnString);
}

/**
* Set the column projection column names from CarbonProjection
*
* @param configuration Configuration info
* @param projection CarbonProjection object that includes unique projection column name
*/
public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
if (projection == null || projection.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)


checkAnswer(sql("SELECT name,name FROM sdkOutputTable"), Seq(
Row("robot0", "robot0"),
Row("robot1", "robot1"),
Row("robot2", "robot2")))
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("robot0", 0, 0.0),
Row("robot1", 1, 0.5),
Expand Down Expand Up @@ -1529,6 +1532,12 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)

sql("SELECT name,name FROM sdkOutputTable").show()
checkAnswer(sql("SELECT name,name FROM sdkOutputTable"), Seq(
Row("bob", "bob"),
Row("bob", "bob"),
Row("bob", "bob")))

sql("select * from sdkOutputTable").show(false)

// TODO: Add a validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -224,7 +223,7 @@ public <T> CarbonReader<T> build() throws IOException, InterruptedException {
if (isProjectAllColumns) {
projectAllColumns();
}
format.setColumnProjection(job.getConfiguration(), new CarbonProjection(projectionColumns));
format.setColumnProjection(job.getConfiguration(), projectionColumns);

final List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,47 @@ public void testWriteAndReadFiles() throws IOException, InterruptedException {
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadColumnTwice() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(new Schema(fields), path, true);

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "name", "age", "name"})
.build();

// expected output after sorting
String[] name = new String[100];
int[] age = new int[100];
for (int i = 0; i < 100; i++) {
name[i] = "robot" + (i / 10);
age[i] = (i % 10) * 10 + i / 10;
}

int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
// Default sort column is applied for dimensions. So, need to validate accordingly
Assert.assertEquals(name[i], row[0]);
Assert.assertEquals(name[i], row[1]);
Assert.assertEquals(age[i], row[2]);
Assert.assertEquals(name[i], row[3]);
i++;
}
Assert.assertEquals(i, 100);

reader.close();

FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadFilesParallel() throws IOException, InterruptedException {
String path = "./testWriteFiles";
Expand Down Expand Up @@ -836,23 +877,14 @@ public void testReadFilesWithNullProjection() throws IOException, InterruptedExc

TestUtil.writeFilesAndVerify(new Schema(fields), path, true);

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{})
.build();

// expected output after sorting
String[] name = new String[100];
int[] age = new int[100];
for (int i = 0; i < 100; i++) {
name[i] = "robot" + (i / 10);
age[i] = (i % 10) * 10 + i / 10;
}
// Default sort column is applied for dimensions. So, need to validate accordingly

while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
assert(row.length==0);
try {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{})
.build();
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equalsIgnoreCase("Projection can't be empty"));
}
}

Expand Down

0 comments on commit a7faef8

Please sign in to comment.