Skip to content

Commit

Permalink
[CARBONDATA-3413] Fix io.netty out of direct memory exception in arro…
Browse files Browse the repository at this point in the history
…w integration

problem : io.netty out of direct memory exception in arrow integration

cause: In ArrowConverter, allocator is not closed

solution: close the allocator in arrowConverter.
Also handle the problems in test utility API

This closes #3256
  • Loading branch information
ajantha-bhat authored and kumarvishal09 committed Jun 12, 2019
1 parent c497142 commit c1c50cf
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 31 deletions.
Expand Up @@ -62,6 +62,7 @@ public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
* Carbon reader will fill the arrow vector after reading the carbondata files.
* This arrow byte[] can be used to create arrow table and used for in memory analytics
* Note: create a reader at blocklet level, so that arrow byte[] will not exceed INT_MAX
* User need to close the VectorSchemaRoot after usage by calling VectorSchemaRoot.close()
*
* @param carbonSchema
* @return
Expand Down
Expand Up @@ -24,11 +24,11 @@
import org.apache.carbondata.sdk.file.Schema;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

public class ArrowConverter {
Expand Down Expand Up @@ -73,11 +73,13 @@ public void addToArrowBuffer(Object[] data) {
public byte[] toSerializeArray() throws IOException {
arrowWriter.finish();
writer.writeBatch();
this.writer.close();
arrowWriter.reset();
writer.close();
this.root.close();
return out.toByteArray();
arrowWriter.reset();
root.close();
byte[] bytes = out.toByteArray();
allocator.close();
out.close();
return bytes;
}

/**
Expand All @@ -89,34 +91,36 @@ public byte[] toSerializeArray() throws IOException {
public long copySerializeArrayToOffHeap() throws IOException {
arrowWriter.finish();
writer.writeBatch();
this.writer.close();
arrowWriter.reset();
writer.close();
this.root.close();
return out.copyToAddress();
arrowWriter.reset();
root.close();
long address = out.copyToAddress();
allocator.close();
out.close();
return address;
}

/**
* Utility API to convert back the arrow byte[] to arrow VectorSchemaRoot.
* Utility API to convert back the arrow byte[] to arrow ArrowRecordBatch.
* User need to close the ArrowRecordBatch after usage by calling ArrowRecordBatch.close()
*
* @param batchBytes
* @return
* @param bufferAllocator
* @return ArrowRecordBatch
* @throws IOException
*/
public VectorSchemaRoot byteArrayToVector(byte[] batchBytes) throws IOException {
public static ArrowRecordBatch byteArrayToArrowBatch(byte[] batchBytes,
BufferAllocator bufferAllocator)
throws IOException {
ByteArrayReadableSeekableByteChannel in = new ByteArrayReadableSeekableByteChannel(batchBytes);
ArrowFileReader reader = new ArrowFileReader(in, allocator);
ArrowFileReader reader = new ArrowFileReader(in, bufferAllocator);
try {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
VectorUnloader unloader = new VectorUnloader(root);
reader.loadNextBatch();
VectorSchemaRoot arrowRoot = VectorSchemaRoot.create(arrowSchema, allocator);
VectorLoader vectorLoader = new VectorLoader(arrowRoot);
vectorLoader.load(unloader.getRecordBatch());
return arrowRoot;
} catch (IOException e) {
return unloader.getRecordBatch();
} finally {
reader.close();
throw e;
}
}

Expand All @@ -128,7 +132,6 @@ public VectorSchemaRoot byteArrayToVector(byte[] batchBytes) throws IOException
public VectorSchemaRoot getArrowVectors() throws IOException {
arrowWriter.finish();
writer.writeBatch();
this.writer.close();
writer.close();
return root;
}
Expand Down
Expand Up @@ -22,10 +22,13 @@
import java.sql.Timestamp;
import java.util.*;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;
Expand All @@ -45,6 +48,7 @@
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
import org.apache.carbondata.sdk.file.arrow.ArrowUtils;

import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -2479,8 +2483,14 @@ public void testArrowReader() {
CarbonReader.builder(path, "_temp").withRowRecordReader().buildArrowReader();
Schema carbonSchema = CarbonSchemaReader.readSchema(path);
byte[] data = reader.readArrowBatch(carbonSchema);
ArrowConverter arrowConverter = new ArrowConverter(carbonSchema,0);
VectorSchemaRoot vectorSchemaRoot = arrowConverter.byteArrayToVector(data);
BufferAllocator bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
ArrowRecordBatch arrowRecordBatch =
ArrowConverter.byteArrayToArrowBatch(data, bufferAllocator);
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot
.create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
bufferAllocator);
VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
vectorLoader.load(arrowRecordBatch);
// check for 10 rows
assertEquals(vectorSchemaRoot.getRowCount(), 10);
List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
Expand All @@ -2492,6 +2502,9 @@ public void testArrowReader() {
for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors.get(12)).get(i), (float) 1.23);
}
arrowRecordBatch.close();
vectorSchemaRoot.close();
bufferAllocator.close();
reader.close();

// Read data with address (unsafe memory)
Expand All @@ -2501,19 +2514,28 @@ public void testArrowReader() {
int length = CarbonUnsafe.getUnsafe().getInt(address);
byte[] data1 = new byte[length];
CarbonUnsafe.getUnsafe().copyMemory(null, address + 4 , data1, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
ArrowConverter arrowConverter1 = new ArrowConverter(carbonSchema,0);
VectorSchemaRoot vectorSchemaRoot1 = arrowConverter1.byteArrayToVector(data1);
bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
arrowRecordBatch =
ArrowConverter.byteArrayToArrowBatch(data1, bufferAllocator);
vectorSchemaRoot = VectorSchemaRoot
.create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
bufferAllocator);
vectorLoader = new VectorLoader(vectorSchemaRoot);
vectorLoader.load(arrowRecordBatch);
// check for 10 rows
assertEquals(vectorSchemaRoot1.getRowCount(), 10);
List<FieldVector> fieldVectors1 = vectorSchemaRoot1.getFieldVectors();
assertEquals(vectorSchemaRoot.getRowCount(), 10);
List<FieldVector> fieldVectors1 = vectorSchemaRoot.getFieldVectors();
// validate short column
for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((SmallIntVector)fieldVectors1.get(6)).get(i), i);
}
// validate float column
for (int i = 0; i < vectorSchemaRoot1.getRowCount(); i++) {
for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors1.get(12)).get(i), (float) 1.23);
}
arrowRecordBatch.close();
vectorSchemaRoot.close();
bufferAllocator.close();
// free the unsafe memory
reader1.freeArrowBatchMemory(address);
reader1.close();
Expand All @@ -2534,13 +2556,23 @@ public void testArrowReader() {
for (int i = 0; i < vectorSchemaRoot2.getRowCount(); i++) {
assertEquals(((Float4Vector)fieldVectors2.get(12)).get(i), (float) 1.23);
}
vectorSchemaRoot.close();
reader2.close();

// Read arrowSchema
byte[] schema = CarbonSchemaReader.getArrowSchemaAsBytes(path);
ArrowConverter arrowConverter3 = new ArrowConverter(carbonSchema, 0);
VectorSchemaRoot vectorSchemaRoot3 = arrowConverter3.byteArrayToVector(schema);
assertEquals(vectorSchemaRoot3.getSchema().getFields().size(), 13);
bufferAllocator = ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", 0, Long.MAX_VALUE);
arrowRecordBatch =
ArrowConverter.byteArrayToArrowBatch(schema, bufferAllocator);
vectorSchemaRoot = VectorSchemaRoot
.create(ArrowUtils.toArrowSchema(carbonSchema, TimeZone.getDefault().getID()),
bufferAllocator);
vectorLoader = new VectorLoader(vectorSchemaRoot);
vectorLoader.load(arrowRecordBatch);
assertEquals(vectorSchemaRoot.getSchema().getFields().size(), 13);
arrowRecordBatch.close();
vectorSchemaRoot.close();
bufferAllocator.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Expand Down

0 comments on commit c1c50cf

Please sign in to comment.