Skip to content
Permalink
Browse files
[Spark on Doris] fix the encode of varchar when convertArrowToRowBatc…
…h (#5202)

`convertArrowToRowBatch` use the default charset to encode String.
Set it to UTF_8, because we use `arrow::utf8` on the Backends.
  • Loading branch information
vagetablechicken committed Jan 10, 2021
1 parent d129998 commit ad9eff5f7ce932c71c6de78ee764ab10047b6f0e
Showing 1 changed file with 9 additions and 13 deletions.
@@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
@@ -52,10 +53,10 @@
* row batch data container.
*/
public class RowBatch {
private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
private static final Logger logger = LoggerFactory.getLogger(RowBatch.class);

public static class Row {
private List<Object> cols;
private final List<Object> cols;

Row(int colCount) {
this.cols = new ArrayList<>(colCount);
@@ -74,11 +75,10 @@ public void put(Object o) {
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
private List<Row> rowBatch = new ArrayList<>();
private final List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private final VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final RootAllocator rootAllocator;
private final Schema schema;

public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
@@ -88,9 +88,8 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
);
this.offsetInRowBatch = 0;
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != schema.size()) {
@@ -119,10 +118,7 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio
}

public boolean hasNext() {
if (offsetInRowBatch < readRowCount) {
return true;
}
return false;
return offsetInRowBatch < readRowCount;
}

private void addValueToRow(int rowIndex, Object obj) {
@@ -268,7 +264,7 @@ public void convertArrowToRowBatch() throws DorisException {
addValueToRow(rowIndex, null);
continue;
}
String value = new String(varCharVector.get(rowIndex));
String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, value);
}
break;
@@ -284,7 +280,7 @@ public void convertArrowToRowBatch() throws DorisException {
}
}

public List<Object> next() throws DorisException {
public List<Object> next() {
if (!hasNext()) {
String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
logger.error(errMsg);

0 comments on commit ad9eff5

Please sign in to comment.