Skip to content

Commit

Permalink
[FLINK-32300][jdbc-driver] Support getObject for FlinkResultSet
Browse files Browse the repository at this point in the history
Close #22757
  • Loading branch information
FangYongs authored and libenchao committed Jun 12, 2023
1 parent a6adbdd commit c7afa32
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 63 deletions.
18 changes: 16 additions & 2 deletions flink-table/flink-sql-jdbc-driver-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@

<build>
<plugins>
<!-- Build flink-sql-gateway jar -->
<plugin>
<groupId>io.github.zentol.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Build flink sql jdbc driver bundle jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<!-- Exclude all flink-dist files and only contain sql-gateway files -->
<execution>
<id>shade-flink</id>
<phase>package</phase>
Expand All @@ -104,6 +110,14 @@
<include>org.apache.flink:flink-sql-gateway</include>
<include>org.apache.flink:flink-table-common</include>
<include>org.apache.flink:flink-annotations</include>
<include>org.apache.flink:flink-core</include>
<incldue>org.apache.flink:flink-runtime</incldue>
<include>org.apache.flink:flink-clients</include>
<include>org.apache.flink:flink-table-api-java</include>
<include>org.apache.flink:flink-json</include>
<include>org.apache.flink:flink-shaded-netty</include>
<include>org.apache.flink:flink-shaded-jackson</include>
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
"FlinkStatement#isWrapperFor is not supported yet.");
}

@Override
public int getUpdateCount() throws SQLException {
throw new SQLFeatureNotSupportedException("FlinkStatement#getUpdateCount is not supported");
}

@Override
public void closeOnCompletion() throws SQLException {
throw new SQLFeatureNotSupportedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.client.gateway.StatementResult;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.jdbc.utils.ArrayFieldGetter;
import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
import org.apache.flink.table.jdbc.utils.StatementResultIterator;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;

import java.math.BigDecimal;
import java.math.RoundingMode;
Expand All @@ -38,8 +44,11 @@
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.jdbc.utils.DriverUtils.checkNotNull;

Expand All @@ -50,6 +59,7 @@
public class FlinkResultSet extends BaseResultSet {
private final List<DataType> dataTypeList;
private final List<String> columnNameList;
private final List<RowData.FieldGetter> fieldGetterList;
private final Statement statement;
private final CloseableResultIterator<RowData> iterator;
private final FlinkResultSetMetaData resultSetMetaData;
Expand All @@ -71,9 +81,19 @@ public FlinkResultSet(

this.dataTypeList = schema.getColumnDataTypes();
this.columnNameList = schema.getColumnNames();
this.fieldGetterList = createFieldGetterList(dataTypeList);
this.resultSetMetaData = new FlinkResultSetMetaData(columnNameList, dataTypeList);
}

private List<RowData.FieldGetter> createFieldGetterList(List<DataType> dataTypeList) {
List<RowData.FieldGetter> fieldGetterList = new ArrayList<>(dataTypeList.size());
for (int i = 0; i < dataTypeList.size(); i++) {
fieldGetterList.add(RowData.createFieldGetter(dataTypeList.get(i).getLogicalType(), i));
}

return fieldGetterList;
}

@Override
public boolean next() throws SQLException {
checkClosed();
Expand Down Expand Up @@ -353,8 +373,71 @@ public ResultSetMetaData getMetaData() throws SQLException {

@Override
public Object getObject(int columnIndex) throws SQLException {
// TODO support get object
throw new SQLFeatureNotSupportedException("FlinkResultSet#getObject is not supported");
checkClosed();
checkValidRow();
checkValidColumn(columnIndex);
try {
Object object = fieldGetterList.get(columnIndex - 1).getFieldOrNull(currentRow);
DataType dataType = dataTypeList.get(columnIndex - 1);
return convertToJavaObject(object, dataType.getLogicalType());
} catch (Exception e) {
throw new SQLDataException(e);
}
}

private Object convertToJavaObject(Object object, LogicalType dataType) throws SQLException {
if (object == null) {
return null;
}

switch (dataType.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case BINARY:
case VARBINARY:
{
return object;
}
case VARCHAR:
case CHAR:
{
return object.toString();
}
case DECIMAL:
{
return ((DecimalData) object).toBigDecimal();
}
case MAP:
{
LogicalType keyType = ((MapType) dataType).getKeyType();
LogicalType valueType = ((MapType) dataType).getValueType();
ArrayFieldGetter keyGetter = ArrayFieldGetter.createFieldGetter(keyType);
ArrayFieldGetter valueGetter = ArrayFieldGetter.createFieldGetter(valueType);
MapData mapData = (MapData) object;
int size = mapData.size();
ArrayData keyArrayData = mapData.keyArray();
ArrayData valueArrayData = mapData.valueArray();
Map<Object, Object> mapResult = new HashMap<>();
for (int i = 0; i < size; i++) {
mapResult.put(
convertToJavaObject(
keyGetter.getObjectOrNull(keyArrayData, i), keyType),
convertToJavaObject(
valueGetter.getObjectOrNull(valueArrayData, i), valueType));
}
return mapResult;
}
default:
{
throw new SQLDataException(
String.format("Not supported value type %s", dataType));
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FlinkStatement extends BaseStatement {
private final FlinkConnection connection;
private final Executor executor;
private FlinkResultSet currentResults;
private boolean hasResults;
private boolean closed;

public FlinkStatement(FlinkConnection connection) {
Expand All @@ -58,6 +59,7 @@ public ResultSet executeQuery(String sql) throws SQLException {
throw new SQLException(String.format("Statement[%s] is not a query.", sql));
}
currentResults = new FlinkResultSet(this, result);
hasResults = true;

return currentResults;
}
Expand Down Expand Up @@ -106,9 +108,11 @@ public boolean execute(String sql) throws SQLException {
StatementResult result = executeInternal(sql);
if (result.isQueryResult() || result.getResultKind() == ResultKind.SUCCESS_WITH_CONTENT) {
currentResults = new FlinkResultSet(this, result);
hasResults = true;
return true;
}

hasResults = false;
return false;
}

Expand Down Expand Up @@ -145,6 +149,16 @@ public boolean getMoreResults() throws SQLException {
throw new SQLFeatureNotSupportedException("Multiple open results not supported");
}

@Override
public int getUpdateCount() throws SQLException {
if (hasResults) {
throw new SQLFeatureNotSupportedException(
"FlinkStatement#getUpdateCount is not supported for query");
} else {
return 0;
}
}

@Override
public Connection getConnection() throws SQLException {
return connection;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.jdbc.utils;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;

import javax.annotation.Nullable;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;

/** Creates an accessor for getting elements in an array data structure at the given position. */
public interface ArrayFieldGetter {
@Nullable
Object getObjectOrNull(ArrayData array, int index);

static ArrayFieldGetter createFieldGetter(LogicalType type) {

final ArrayFieldGetter fieldGetter;
// ordered by type root definition
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
fieldGetter = ArrayData::getString;
break;
case BOOLEAN:
fieldGetter = ArrayData::getBoolean;
break;
case BINARY:
case VARBINARY:
fieldGetter = ArrayData::getBinary;
break;
case DECIMAL:
final int decimalPrecision = getPrecision(type);
final int decimalScale = getScale(type);
fieldGetter =
(array, index) -> array.getDecimal(index, decimalPrecision, decimalScale);
break;
case TINYINT:
fieldGetter = ArrayData::getByte;
break;
case SMALLINT:
fieldGetter = ArrayData::getShort;
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
fieldGetter = ArrayData::getInt;
break;
case BIGINT:
case INTERVAL_DAY_TIME:
fieldGetter = ArrayData::getLong;
break;
case FLOAT:
fieldGetter = ArrayData::getFloat;
break;
case DOUBLE:
fieldGetter = ArrayData::getDouble;
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(type);
fieldGetter = (array, index) -> array.getTimestamp(index, timestampPrecision);
break;
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException();
case ARRAY:
fieldGetter = ArrayData::getArray;
break;
case MULTISET:
case MAP:
fieldGetter = ArrayData::getMap;
break;
case ROW:
case STRUCTURED_TYPE:
final int arrayFieldCount = getFieldCount(type);
fieldGetter = (array, index) -> array.getRow(index, arrayFieldCount);
break;
case DISTINCT_TYPE:
fieldGetter = createFieldGetter(((DistinctType) type).getSourceType());
break;
case RAW:
fieldGetter = ArrayData::getRawValue;
break;
case NULL:
case SYMBOL:
case UNRESOLVED:
default:
throw new IllegalArgumentException();
}
if (!type.isNullable()) {
return fieldGetter;
}
return (array, index) -> {
if (array.isNullAt(index)) {
return null;
}
return fieldGetter.getObjectOrNull(array, index);
};
}
}

0 comments on commit c7afa32

Please sign in to comment.