Skip to content

Commit

Permalink
Merge 30a720c into ee78597
Browse files Browse the repository at this point in the history
  • Loading branch information
xubo245 committed Jul 22, 2019
2 parents ee78597 + 30a720c commit 901a3f8
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 5 deletions.
Expand Up @@ -70,13 +70,11 @@
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.ExpressionResult;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression;
import org.apache.carbondata.core.scan.expression.conditional.InExpression;
import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
import org.apache.carbondata.core.scan.expression.conditional.*;
import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl;
import org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo;
Expand Down Expand Up @@ -2374,4 +2372,105 @@ public static void setMinMaxFlagForLegacyStore(boolean[] minMaxFlag,
Arrays.fill(minMaxFlag, index, minMaxFlag.length, false);
}

public static Expression prepareEqualToExpression(String columnName, String dataType,
Object value) {
if (DataTypes.STRING.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.STRING),
new LiteralExpression(value, DataTypes.STRING));
} else if (DataTypes.INT.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.INT),
new LiteralExpression(value, DataTypes.INT));
} else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.DOUBLE),
new LiteralExpression(value, DataTypes.DOUBLE));
} else if (DataTypes.FLOAT.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.FLOAT),
new LiteralExpression(value, DataTypes.FLOAT));
} else if (DataTypes.SHORT.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.SHORT),
new LiteralExpression(value, DataTypes.SHORT));
} else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.BINARY),
new LiteralExpression(value, DataTypes.BINARY));
} else if (DataTypes.DATE.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.DATE),
new LiteralExpression(value, DataTypes.DATE));
} else if (DataTypes.LONG.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.LONG),
new LiteralExpression(value, DataTypes.LONG));
} else if (DataTypes.TIMESTAMP.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.TIMESTAMP),
new LiteralExpression(value, DataTypes.TIMESTAMP));
} else if (DataTypes.BYTE.getName().equalsIgnoreCase(dataType)) {
return new EqualToExpression(
new ColumnExpression(columnName, DataTypes.BYTE),
new LiteralExpression(value, DataTypes.BYTE));
} else {
throw new IllegalArgumentException("Unsupported data type: " + dataType);
}
}

public static Expression prepareOrExpression(List<Expression> expressions) {
if (expressions.size() < 2) {
throw new RuntimeException("Please input at least two expressions");
}
Expression expression = expressions.get(0);
for (int i = 1; i < expressions.size(); i++) {
expression = new OrExpression(expression, expressions.get(i));
}
return expression;
}

private static Expression prepareEqualToExpressionSet(String columnName, DataType dataType,
List<Object> values) {
Expression expression = null;
if (0 == values.size()) {
expression = prepareEqualToExpression(columnName, dataType.getName(), null);
} else {
expression = prepareEqualToExpression(columnName, dataType.getName(), values.get(0));
}
for (int i = 1; i < values.size(); i++) {
Expression expression2 = prepareEqualToExpression(columnName,
dataType.getName(), values.get(i));
expression = new OrExpression(expression, expression2);
}
return expression;
}

public static Expression prepareEqualToExpressionSet(String columnName, String dataType,
List<Object> values) {
if (DataTypes.STRING.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.STRING, values);
} else if (DataTypes.INT.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.INT, values);
} else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.DOUBLE, values);
} else if (DataTypes.FLOAT.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.FLOAT, values);
} else if (DataTypes.SHORT.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.SHORT, values);
} else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.BINARY, values);
} else if (DataTypes.DATE.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.DATE, values);
} else if (DataTypes.LONG.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.LONG, values);
} else if (DataTypes.TIMESTAMP.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.TIMESTAMP, values);
} else if (DataTypes.BYTE.getName().equalsIgnoreCase(dataType)) {
return prepareEqualToExpressionSet(columnName, DataTypes.BYTE, values);
} else {
throw new IllegalArgumentException("Unsupported data type: " + dataType);
}
}

}
Expand Up @@ -23,6 +23,7 @@
import java.util.*;

import org.apache.avro.generic.GenericData;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;

Expand All @@ -44,6 +45,10 @@
import org.apache.commons.io.FileUtils;
import org.junit.*;

import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpression;
import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpressionSet;
import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareOrExpression;

public class CarbonReaderTest extends TestCase {

@Before
Expand Down Expand Up @@ -323,6 +328,157 @@ public void testReadWithFilterOfNonTransactionalOr() throws IOException, Interru
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadWithFilterEqualSet() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(path), false);
Field[] fields = new Field[3];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);
fields[2] = new Field("doubleField", DataTypes.DOUBLE);

TestUtil.writeFilesAndVerify(200, new Schema(fields), path);

List<Object> values = new ArrayList<>();
values.add("robot7");
values.add("robot1");

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("name", "String", values))
.build();

int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 40);

reader.close();

List<Object> values2 = new ArrayList<>();
values2.add(1);
values2.add(7);

CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("age", "int", values2))
.build();

i = 0;
while (reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 2);
reader2.close();


List<Object> values3 = new ArrayList<>();
values3.add(0.5);
values3.add(3.5);
CarbonReader reader3 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpressionSet("doubleField", "double", values3))
.build();

i = 0;
while (reader3.hasNext()) {
Object[] row = (Object[]) reader3.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 2);
reader3.close();

CarbonReader reader4 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareEqualToExpression("name", "string", "robot7"))
.build();

i = 0;
while (reader4.hasNext()) {
Object[] row = (Object[]) reader4.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 20);
reader4.close();

List<Expression> expressions = new ArrayList<>();
expressions.add(prepareEqualToExpression("name", "String", "robot1"));
expressions.add(prepareEqualToExpression("name", "String", "robot7"));
expressions.add(prepareEqualToExpression("age", "int", "2"));

CarbonReader reader5 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age", "doubleField"})
.filter(prepareOrExpression(expressions))
.build();

i = 0;
while (reader5.hasNext()) {
Object[] row = (Object[]) reader5.readNextRow();
if (((String) row[0]).contains("robot7")) {
assert (7 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot1")) {
assert (1 == ((int) (row[1]) % 10));
assert (0.5 == ((double) (row[2]) % 1));
} else if (((String) row[0]).contains("robot2")) {
assert (2 == ((int) (row[1]) % 10));
assert (0 == ((double) (row[2]) % 1));
} else {
Assert.assertTrue(false);
}
i++;
}
Assert.assertEquals(i, 41);

reader5.close();


FileUtils.deleteDirectory(new File(path));
}

@Test
public void testReadWithFilterOfNonTransactionalGreaterThan() throws IOException, InterruptedException {
String path = "./testWriteFiles";
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.Iterator;
import java.util.List;

import static org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpression;
import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;

public class ImageTest extends TestCase {
Expand Down Expand Up @@ -317,6 +318,7 @@ public void testBinaryWithOrWithoutFilter() throws IOException, InvalidLoadOptio
fields[2] = new Field("image", DataTypes.BINARY);

byte[] originBinary = null;
String binaryValue=null;

// read and write image data
for (int j = 0; j < num; j++) {
Expand All @@ -337,7 +339,8 @@ public void testBinaryWithOrWithoutFilter() throws IOException, InvalidLoadOptio
hexValue = Hex.encodeHex(originBinary);
}
// write data
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf(hexValue)});
binaryValue = String.valueOf(hexValue);
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), binaryValue});
bis.close();
}
writer.close();
Expand Down Expand Up @@ -404,6 +407,55 @@ public void testBinaryWithOrWithoutFilter() throws IOException, InvalidLoadOptio
i++;
}
reader2.close();


// Read data with filter for binary

CarbonReader reader3 = CarbonReader
.builder(path, "_temp")
.filter(prepareEqualToExpression("image", "binary", binaryValue))
.build();

System.out.println("\nData:");
i = 0;
while (i < 20 && reader3.hasNext()) {
Object[] row = (Object[]) reader3.readNextRow();

byte[] outputBinary = Hex.decodeHex(new String((byte[]) row[1]).toCharArray());
System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);

// validate output binary data and origin binary data
assert (originBinary.length == outputBinary.length);
for (int j = 0; j < originBinary.length; j++) {
assert (originBinary[j] == outputBinary[j]);
}
String value = new String(outputBinary);
Assert.assertTrue(value.startsWith("�PNG"));
// save image, user can compare the save image and original image
String destString = "./target/binary/image" + i + ".jpg";
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destString));
bos.write(outputBinary);
bos.close();
i++;
}
assert (1 == i);
System.out.println("\nFinished");
reader3.close();


CarbonReader reader4 = CarbonReader
.builder(path, "_temp")
.filter(prepareEqualToExpression("image", "binary", "hello"))
.build();

System.out.println("\nData:");
i = 0;
while (i < 20 && reader4.hasNext()) {
Object[] row = (Object[]) reader4.readNextRow();
assert (null == row[1]);
}
System.out.println("\nFinished");
reader4.close();
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
Expand Down

0 comments on commit 901a3f8

Please sign in to comment.