Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17629: [Java] Bind DB column to Arrow Map type in JdbcToArrowUtils #14134

Merged
merged 4 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions java/adapter/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* </ul>
*/
public class JdbcFieldInfo {
private final int column;
private final int jdbcType;
private final int nullability;
private final int precision;
Expand All @@ -53,6 +54,7 @@ public JdbcFieldInfo(int jdbcType) {
(jdbcType != Types.DECIMAL && jdbcType != Types.NUMERIC),
"DECIMAL and NUMERIC types require a precision and scale; please use another constructor.");

this.column = 0;
this.jdbcType = jdbcType;
this.nullability = ResultSetMetaData.columnNullableUnknown;
this.precision = 0;
Expand All @@ -68,6 +70,7 @@ public JdbcFieldInfo(int jdbcType) {
* @param scale The field's numeric scale.
*/
public JdbcFieldInfo(int jdbcType, int precision, int scale) {
this.column = 0;
this.jdbcType = jdbcType;
this.nullability = ResultSetMetaData.columnNullableUnknown;
this.precision = precision;
Expand All @@ -84,6 +87,7 @@ public JdbcFieldInfo(int jdbcType, int precision, int scale) {
* @param scale The field's numeric scale.
*/
public JdbcFieldInfo(int jdbcType, int nullability, int precision, int scale) {
this.column = 0;
this.jdbcType = jdbcType;
this.nullability = nullability;
this.precision = precision;
Expand All @@ -106,6 +110,7 @@ public JdbcFieldInfo(ResultSetMetaData rsmd, int column) throws SQLException {
column <= rsmd.getColumnCount(),
"The index must be within the number of columns (1 to %s, inclusive)", rsmd.getColumnCount());

this.column = column;
this.jdbcType = rsmd.getColumnType(column);
this.nullability = rsmd.isNullable(column);
this.precision = rsmd.getPrecision(column);
Expand Down Expand Up @@ -139,4 +144,11 @@ public int getPrecision() {
public int getScale() {
return scale;
}

/**
* The column index for query column.
*/
public int getColumn() {
return column;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public final class JdbcToArrowConfig {

// set up type converter
this.jdbcToArrowTypeConverter = jdbcToArrowTypeConverter != null ? jdbcToArrowTypeConverter :
jdbcFieldInfo -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
(jdbcFieldInfo) -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,6 +50,7 @@
import org.apache.arrow.adapter.jdbc.consumer.FloatConsumer;
import org.apache.arrow.adapter.jdbc.consumer.IntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.MapConsumer;
import org.apache.arrow.adapter.jdbc.consumer.NullConsumer;
import org.apache.arrow.adapter.jdbc.consumer.SmallIntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TimeConsumer;
Expand Down Expand Up @@ -76,6 +78,7 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -279,6 +282,14 @@ public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig
children = new ArrayList<Field>();
final ArrowType childType = config.getJdbcToArrowTypeConverter().apply(arrayFieldInfo);
children.add(new Field("child", FieldType.nullable(childType), null));
} else if (arrowType.getTypeID() == ArrowType.ArrowTypeID.Map) {
FieldType mapType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
FieldType keyType = new FieldType(false, new ArrowType.Utf8(), null, null);
FieldType valueType = new FieldType(false, new ArrowType.Utf8(), null, null);
children = new ArrayList<>();
children.add(new Field("child", mapType,
Arrays.asList(new Field(MapVector.KEY_NAME, keyType, null),
new Field(MapVector.VALUE_NAME, valueType, null))));
}

fields.add(new Field(columnName, fieldType, children));
Expand Down Expand Up @@ -471,6 +482,8 @@ static JdbcConsumer getConsumer(ArrowType arrowType, int columnIndex, boolean nu
JdbcConsumer delegate = getConsumer(childVector.getField().getType(), JDBC_ARRAY_VALUE_COLUMN,
childVector.getField().isNullable(), childVector, config);
return ArrayConsumer.createConsumer((ListVector) vector, delegate, columnIndex, nullable);
case Map:
return MapConsumer.createConsumer((MapVector) vector, columnIndex, nullable);
case Null:
return new NullConsumer((NullVector) vector);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adapter.jdbc.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
import org.apache.arrow.vector.util.ObjectMapperFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Consumer which consume map type values from {@link ResultSet}.
* Write the data into {@link org.apache.arrow.vector.complex.MapVector}.
*/
public class MapConsumer extends BaseConsumer<MapVector> {


private final UnionMapWriter writer;
private final ObjectMapper objectMapper = ObjectMapperFactory.newObjectMapper();
private final TypeReference<Map<String, String>> typeReference = new TypeReference<Map<String, String>>() {};
private int currentRow;

/**
* Creates a consumer for {@link MapVector}.
*/
public static MapConsumer createConsumer(MapVector mapVector, int index, boolean nullable) {
return new MapConsumer(mapVector, index);
}

/**
* Instantiate a MapConsumer.
*/
public MapConsumer(MapVector vector, int index) {
super(vector, index);
writer = vector.getWriter();
}

@Override
public void consume(ResultSet resultSet) throws SQLException, IOException {
Object map = resultSet.getObject(columnIndexInResultSet);
writer.setPosition(currentRow++);
if (map != null) {
if (map instanceof String) {
writeJavaMapIntoVector(objectMapper.readValue((String) map, typeReference));
} else if (map instanceof Map) {
writeJavaMapIntoVector((Map<String, String>) map);
} else {
throw new IllegalArgumentException("Unknown type of map type column from JDBC " + map.getClass().getName());
}
} else {
writer.writeNull();
}
}

private void writeJavaMapIntoVector(Map<String, String> map) {
BufferAllocator allocator = vector.getAllocator();
writer.startMap();
map.forEach((key, value) -> {
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = value != null ? value.getBytes(StandardCharsets.UTF_8) : null;
try (
ArrowBuf keyBuf = allocator.buffer(keyBytes.length);
ArrowBuf valueBuf = valueBytes != null ? allocator.buffer(valueBytes.length) : null;
) {
writer.startEntry();
keyBuf.writeBytes(keyBytes);
writer.key().varChar().writeVarChar(0, keyBytes.length, keyBuf);
if (valueBytes != null) {
valueBuf.writeBytes(valueBytes);
writer.value().varChar().writeVarChar(0, valueBytes.length, valueBuf);
} else {
writer.value().varChar().writeNull();
}
writer.endEntry();
}
});
writer.endMap();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Function;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.util.ValueVectorUtility;
import org.junit.After;
import org.junit.Before;
Expand All @@ -58,6 +61,7 @@ public abstract class AbstractJdbcToArrowTest {
protected static final String DOUBLE = "DOUBLE_FIELD7";
protected static final String INT = "INT_FIELD1";
protected static final String LIST = "LIST_FIELD19";
protected static final String MAP = "MAP_FIELD20";
protected static final String REAL = "REAL_FIELD8";
protected static final String SMALLINT = "SMALLINT_FIELD4";
protected static final String TIME = "TIME_FIELD9";
Expand Down Expand Up @@ -155,8 +159,10 @@ public static Object[][] prepareTestData(String[] testFiles, @SuppressWarnings("
* Abstract method to implement logic to assert test various datatype values.
*
* @param root VectorSchemaRoot for test
* @param isIncludeMapVector is this dataset checks includes map column.
* Jdbc type to 'map' mapping declared in configuration only manually
*/
public abstract void testDataSets(VectorSchemaRoot root);
public abstract void testDataSets(VectorSchemaRoot root, boolean isIncludeMapVector);

/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow objects.
Expand Down Expand Up @@ -342,4 +348,34 @@ public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, JdbcToArrowConfig
return root;
}

/**
* Register MAP_FIELD20 as ArrowType.Map
* @param calendar Calendar instance to use for Date, Time and Timestamp datasets, or <code>null</code> if none.
* @param rsmd ResultSetMetaData to lookup column name from result set metadata
* @return typeConverter instance with mapping column to Map type
*/
protected Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter(
Calendar calendar, ResultSetMetaData rsmd) {
return (jdbcFieldInfo) -> {
String columnLabel = null;
try {
int columnIndex = jdbcFieldInfo.getColumn();
if (columnIndex != 0) {
columnLabel = rsmd.getColumnLabel(columnIndex);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (MAP.equals(columnLabel)) {
return new ArrowType.Map(false);
} else {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
}
};
}

protected ResultSetMetaData getQueryMetaData(String query) throws SQLException {
return conn.createStatement().executeQuery(query).getMetaData();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.nio.charset.Charset;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -47,8 +49,17 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.JsonStringHashMap;
import org.apache.arrow.vector.util.ObjectMapperFactory;
import org.apache.arrow.vector.util.Text;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This is a Helper class which has functionalities to read and assert the values from the given FieldVector object.
Expand Down Expand Up @@ -240,6 +251,45 @@ public static void assertListVectorValues(ListVector listVector, int rowCount, I
}
}

public static void assertMapVectorValues(MapVector mapVector, int rowCount, Map<String, String>[] values) {
assertEquals(rowCount, mapVector.getValueCount());

for (int j = 0; j < mapVector.getValueCount(); j++) {
if (values[j] == null) {
assertTrue(mapVector.isNull(j));
} else {
JsonStringArrayList<JsonStringHashMap<String, Text>> actualSource =
(JsonStringArrayList<JsonStringHashMap<String, Text>>) mapVector.getObject(j);
Map<String, String> actualMap = null;
if (actualSource != null && !actualSource.isEmpty()) {
actualMap = actualSource.stream().map(entry ->
new AbstractMap.SimpleEntry<>(entry.get("key").toString(),
entry.get("value") != null ? entry.get("value").toString() : null))
.collect(HashMap::new, (collector, val) -> collector.put(val.getKey(), val.getValue()), HashMap::putAll);
}
assertEquals(values[j], actualMap);
}
}
}

public static Map<String, String>[] getMapValues(String[] values, String dataType) {
String[] dataArr = getValues(values, dataType);
Map<String, String>[] maps = new Map[dataArr.length];
ObjectMapper objectMapper = ObjectMapperFactory.newObjectMapper();
TypeReference<Map<String, String>> typeReference = new TypeReference<Map<String, String>>() {};
for (int idx = 0; idx < dataArr.length; idx++) {
String jsonString = dataArr[idx].replace("|", ",");
if (!jsonString.isEmpty()) {
try {
maps[idx] = objectMapper.readValue(jsonString, typeReference);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
return maps;
}

public static void assertNullValues(BaseValueVector vector, int rowCount) {
assertEquals(rowCount, vector.getValueCount());

Expand Down
Loading