Skip to content

Commit

Permalink
add arrow service test support (#2329)
Browse files Browse the repository at this point in the history
* Update service test executor for arrow executions
  • Loading branch information
AFine-gs committed Oct 9, 2023
1 parent 7a77360 commit 59a7c9b
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package org.finos.legend.engine.external.shared.runtime.write;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.eclipse.collections.impl.factory.Lists;
import org.finos.legend.engine.plan.execution.result.Result;
import org.finos.legend.engine.plan.execution.result.ResultVisitor;
Expand Down Expand Up @@ -52,6 +55,21 @@ public Serializer getSerializer(SerializationFormat format)
return new ExternalFormatDefaultSerializer(externalFormatWriter, this);
}

@Override
public String flush(Serializer serializer)
{
try
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
externalFormatWriter.writeDataAsString(bos);
return bos.toString(StandardCharsets.UTF_8.name());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@
public abstract class ExternalFormatWriter
{
public abstract void writeData(OutputStream stream) throws IOException;

public void writeDataAsString(OutputStream outputStream) throws IOException
{
writeData(outputStream);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ public AssertionStatus visit(TestAssertion testAssertion)
{
if (testAssertion instanceof EqualTo)
{
if (!(result instanceof ConstantResult))

Object actual;
Object expected = ((EqualTo) testAssertion).expected.accept(new PrimitiveValueSpecificationToObjectVisitor());;
if (result instanceof ConstantResult)
{
actual = ((ConstantResult) result).getValue();
}
else if (result instanceof StreamingResult)
{
actual = ((StreamingResult) result).flush(((StreamingResult) result).getSerializer(this.serializationFormat));
}

else
{
throw new UnsupportedOperationException("Result type - " + result.getClass().getSimpleName() + " not supported with EqualTo Assert !!");
}

Object expected = ((EqualTo) testAssertion).expected.accept(new PrimitiveValueSpecificationToObjectVisitor());
Object actual = ((ConstantResult) result).getValue();

AssertionStatus assertionStatus;
if (expected.equals(actual))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

package org.apache.arrow.adapter.jdbc;

import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.getJdbcFieldInfoForColumn;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ValueVectorUtility;

/**
* temporary over ride of ArrowVectorIterator until https://github.com/apache/arrow/pull/37085 is released. This works around an long standing bug in H2 where ResultSetMetadata may incorrectly set a result column as not nullable
*/
public class LegendArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoCloseable
{

private final ResultSet resultSet;
private final JdbcToArrowConfig config;

private final Schema schema;
private final ResultSetMetaData rsmd;

private final JdbcConsumer[] consumers;
final CompositeJdbcConsumer compositeConsumer;

// this is used only if resuing vector schema root is enabled.
private VectorSchemaRoot nextBatch;

private final int targetBatchSize;

// This is used to track whether the ResultSet has been fully read, and is needed spcifically for cases where there
// is a ResultSet having zero rows (empty):
private boolean readComplete = false;

/**
* Construct an instance.
*/
private LegendArrowVectorIterator(ResultSet resultSet, JdbcToArrowConfig config) throws SQLException
{
this.resultSet = resultSet;
this.config = config;
this.schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config);
this.targetBatchSize = config.getTargetBatchSize();

rsmd = resultSet.getMetaData();
consumers = new JdbcConsumer[rsmd.getColumnCount()];
this.compositeConsumer = new CompositeJdbcConsumer(consumers);
this.nextBatch = config.isReuseVectorSchemaRoot() ? createVectorSchemaRoot() : null;
}

/**
* Create a ArrowVectorIterator to partially convert data.
*/
public static LegendArrowVectorIterator create(
ResultSet resultSet,
JdbcToArrowConfig config)
throws SQLException
{
LegendArrowVectorIterator iterator = null;
try
{
iterator = new LegendArrowVectorIterator(resultSet, config);
}
catch (Throwable e)
{
AutoCloseables.close(e, iterator);
throw new RuntimeException("Error occurred while creating iterator.", e);
}
return iterator;
}

private void consumeData(VectorSchemaRoot root)
{
// consume data
try
{
int readRowCount = 0;
if (targetBatchSize == JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE)
{
while (resultSet.next())
{
ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
compositeConsumer.consume(resultSet);
readRowCount++;
}
readComplete = true;
}
else
{
while ((readRowCount < targetBatchSize) && !readComplete)
{
if (resultSet.next())
{
compositeConsumer.consume(resultSet);
readRowCount++;
}
else
{
readComplete = true;
}
}
}

root.setRowCount(readRowCount);
}
catch (Throwable e)
{
compositeConsumer.close();
throw new RuntimeException("Error occurred while consuming data.", e);
}
}

private VectorSchemaRoot createVectorSchemaRoot() throws SQLException
{
VectorSchemaRoot root = null;
try
{
root = VectorSchemaRoot.create(schema, config.getAllocator());
if (config.getTargetBatchSize() != JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE)
{
ValueVectorUtility.preAllocate(root, config.getTargetBatchSize());
}
}
catch (Throwable e)
{
if (root != null)
{
root.close();
}
throw new RuntimeException("Error occurred while creating schema root.", e);
}
initialize(root);
return root;
}

private void initialize(VectorSchemaRoot root) throws SQLException
{
for (int i = 1; i <= consumers.length; i++)
{
final JdbcFieldInfo columnFieldInfo = getJdbcFieldInfoForColumn(rsmd, i, config);
ArrowType arrowType = config.getJdbcToArrowTypeConverter().apply(columnFieldInfo);
consumers[i - 1] = JdbcToArrowUtils.getConsumer(
arrowType, i, true, root.getVector(i - 1), config);
}
}

// Loads the next schema root or null if no more rows are available.
private void load(VectorSchemaRoot root)
{
for (int i = 0; i < consumers.length; i++)
{
FieldVector vec = root.getVector(i);
if (config.isReuseVectorSchemaRoot())
{
// if we are reusing the vector schema root,
// we must reset the vector before populating it with data.
vec.reset();
}
consumers[i].resetValueVector(vec);
}

consumeData(root);
}

@Override
public boolean hasNext()
{
return !readComplete;
}

/**
* Gets the next vector.
* If {@link JdbcToArrowConfig#isReuseVectorSchemaRoot()} is false,
* the client is responsible for freeing its resources.
*/
@Override
public VectorSchemaRoot next()
{
Preconditions.checkArgument(hasNext());
try
{
VectorSchemaRoot ret = config.isReuseVectorSchemaRoot() ? nextBatch : createVectorSchemaRoot();
load(ret);
return ret;
}
catch (Exception e)
{
close();
throw new RuntimeException("Error occurred while getting next schema root.", e);
}
}

/**
* Clean up resources ONLY WHEN THE {@link VectorSchemaRoot} HOLDING EACH BATCH IS REUSED. If a new VectorSchemaRoot
* is created for each batch, each root must be closed manually by the client code.
*/
@Override
public void close()
{
if (config.isReuseVectorSchemaRoot())
{
nextBatch.close();
compositeConsumer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

package org.finos.legend.engine.external.format.arrow;

import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Locale;
import java.util.TimeZone;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;

import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.LegendArrowVectorIterator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -34,22 +37,23 @@

public class ArrowDataWriter extends ExternalFormatWriter implements AutoCloseable
{
private final ArrowVectorIterator iterator;
private final LegendArrowVectorIterator iterator;
private final BufferAllocator allocator;

public ArrowDataWriter(ResultSet resultSet) throws SQLException, IOException
public ArrowDataWriter(ResultSet resultSet) throws SQLException
{

HashMap<Integer, JdbcFieldInfo> map = new HashMap<Integer, JdbcFieldInfo>();

this.allocator = new RootAllocator();
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator, Calendar.getInstance(TimeZone.getDefault(), Locale.ROOT)).build();
this.iterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config);
this.iterator = LegendArrowVectorIterator.create(resultSet, config);

}


@Override
public void writeData(OutputStream outputStream) throws IOException
{

try
{
while (this.iterator.hasNext())
Expand All @@ -71,6 +75,28 @@ public void writeData(OutputStream outputStream) throws IOException

}

@Override
public void writeDataAsString(OutputStream outputStream) throws IOException
{
try
{
while (this.iterator.hasNext())
{
try (VectorSchemaRoot vector = iterator.next())
{
outputStream.write(vector.contentToTSVString().getBytes(Charset.forName("UTF-8")));
}

}
this.close();
}
catch (Exception e)
{
this.close();
throw e;
}
}

@Override
public void close()
{
Expand Down
Loading

0 comments on commit 59a7c9b

Please sign in to comment.