diff --git a/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java b/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java index eaf50c3..ed2a730 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java +++ b/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java @@ -232,7 +232,7 @@ public CompletionStage getSample() { Connection connection = getConnection(); var result = queryData(sampleMetadata, 100, connection); var rowReader = - new RowReader(tableInfo, new ResultSetValuesGetter(tableInfo, result), sampleMetadata, connection, true); + new RowReader(tableInfo, new ResultSetValuesGetter(tableInfo, result, queryDialect), sampleMetadata, connection, true); var inputStream = new ResultSetInputStream(new CsvRowConverter(tableInfo), rowReader, true); var loadedData = new LoadedData(inputStream, Instant.now()); return CompletableFuture.completedFuture(loadedData); @@ -462,7 +462,7 @@ private CompletionStage>> splitData(Result // Value getter + Some of the code in RowReader are needed only because we insist on running a single query // and using a single result set for all ranges. If we allow query per window a lot of the code can be simplified. - var valueGetter = new ResultSetValuesGetter(tableInfo, resultSet); + var valueGetter = new ResultSetValuesGetter(tableInfo, resultSet, queryDialect); for (int i = 0; i < wantedRanges.size(); i++) { final var isLast = i == wantedRanges.size() - 1; diff --git a/src/main/java/com/upsolver/datasources/jdbc/ResultSetValuesGetter.java b/src/main/java/com/upsolver/datasources/jdbc/ResultSetValuesGetter.java index fd51052..577ec32 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/ResultSetValuesGetter.java +++ b/src/main/java/com/upsolver/datasources/jdbc/ResultSetValuesGetter.java @@ -1,23 +1,30 @@ package com.upsolver.datasources.jdbc; import com.upsolver.datasources.jdbc.metadata.TableInfo; +import com.upsolver.datasources.jdbc.querybuilders.QueryDialect; +import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; class ResultSetValuesGetter implements AutoCloseable { private final TableInfo tableInfo; private final ResultSet underlying; + private final List> valueGetters; private String[] nextValues = null; private long nextIncValue; private Timestamp nextTimestampValue; private boolean onNextValues = false; - ResultSetValuesGetter(TableInfo tableInfo, ResultSet underlying) { + ResultSetValuesGetter(TableInfo tableInfo, ResultSet underlying, QueryDialect queryDialect) { this.tableInfo = tableInfo; this.underlying = underlying; + valueGetters = initValueGetters(queryDialect); } public boolean next() throws SQLException { @@ -74,7 +81,7 @@ public String[] getValues() throws SQLException { } else { var result = new String[tableInfo.getColumnCount()]; for (int i = 0; i < tableInfo.getColumnCount(); i++) { - result[i] = underlying.getString(i + 1); // Column indices start at 1 (☉_☉) + result[i] = valueGetters.get(i).apply(underlying, i + 1); // Column indices start at 1 (☉_☉) } return result; } @@ -84,4 +91,18 @@ public String[] getValues() throws SQLException { public void close() throws Exception { underlying.close(); } + + private List> initValueGetters(QueryDialect queryDialect) { + try { + ResultSetMetaData md = underlying.getMetaData(); + int n = md.getColumnCount(); + List> valueGetters = new ArrayList<>(); + for (int i = 0; i < n; i++) { + valueGetters.add(queryDialect.getStringValueGetter(md.getColumnType(i + 1))); + } + return valueGetters; + } catch (SQLException e) { + throw new RuntimeException("Error while retrieving table metadata", e); + } + } } diff --git a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/DefaultQueryDialect.java b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/DefaultQueryDialect.java index 5fbcc81..e228db2 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/DefaultQueryDialect.java +++ b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/DefaultQueryDialect.java @@ -4,6 +4,7 @@ import com.upsolver.datasources.jdbc.metadata.SimpleSqlType; import com.upsolver.datasources.jdbc.metadata.TableInfo; import com.upsolver.datasources.jdbc.utils.NamedPreparedStatment; +import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,9 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; public class DefaultQueryDialect implements QueryDialect { private static final Logger logger = LoggerFactory.getLogger(DefaultQueryDialect.class); @@ -29,6 +32,17 @@ public class DefaultQueryDialect implements QueryDialect { JDBCType.TIMESTAMP_WITH_TIMEZONE )); + private static final ThrowingBiFunction getString = ResultSet::getString; + private final Map> valueGetters; + + public DefaultQueryDialect() { + this(Collections.emptyMap()); + } + + public DefaultQueryDialect(Map> valueGetters) { + this.valueGetters = valueGetters; + } + @Override public long utcOffsetSeconds(Connection connection) throws SQLException { var rs = connection.prepareStatement("SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP)").executeQuery(); @@ -245,4 +259,9 @@ public Connection getConnection(String url, java.util.Properties info) throws SQ public String getDriverClassName() { return null; } + + @Override + public ThrowingBiFunction getStringValueGetter(int sqlType) { + return valueGetters.getOrDefault(sqlType, getString); + } } diff --git a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/OracleQueryDialect.java b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/OracleQueryDialect.java index 614bc85..a31b1d5 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/OracleQueryDialect.java +++ b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/OracleQueryDialect.java @@ -1,15 +1,20 @@ package com.upsolver.datasources.jdbc.querybuilders; +import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction; import oracle.jdbc.OracleType; +import java.math.BigInteger; import java.sql.Connection; import java.sql.JDBCType; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLType; +import java.sql.Types; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Optional; public class OracleQueryDialect extends DefaultQueryDialect { @@ -19,6 +24,13 @@ public class OracleQueryDialect extends DefaultQueryDialect { OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE )); + private static final ThrowingBiFunction blobAsString = (rs, i) -> Optional.ofNullable(rs.getBytes(i)).map(bytes -> new BigInteger(1, bytes).toString(16)).orElse(null); + private static final Map> blobValueGetters = Collections.singletonMap(Types.BLOB, blobAsString); + + public OracleQueryDialect() { + super(blobValueGetters); + } + @Override public long utcOffsetSeconds(Connection connection) throws SQLException { var rs = diff --git a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/QueryDialect.java b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/QueryDialect.java index 985b962..24cf804 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/querybuilders/QueryDialect.java +++ b/src/main/java/com/upsolver/datasources/jdbc/querybuilders/QueryDialect.java @@ -3,9 +3,9 @@ import com.upsolver.datasources.jdbc.JDBCTaskMetadata; import com.upsolver.datasources.jdbc.metadata.TableInfo; import com.upsolver.datasources.jdbc.utils.NamedPreparedStatment; +import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction; import java.sql.Connection; -import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -70,4 +70,5 @@ NamedPreparedStatment queryFullTable(TableInfo tableInfo, String getDriverClassName(); + ThrowingBiFunction getStringValueGetter(int sqlType); } diff --git a/src/main/java/com/upsolver/datasources/jdbc/utils/ThrowingBiFunction.java b/src/main/java/com/upsolver/datasources/jdbc/utils/ThrowingBiFunction.java new file mode 100644 index 0000000..aeec2d7 --- /dev/null +++ b/src/main/java/com/upsolver/datasources/jdbc/utils/ThrowingBiFunction.java @@ -0,0 +1,6 @@ +package com.upsolver.datasources.jdbc.utils; + +@FunctionalInterface +public interface ThrowingBiFunction { + R apply(T t, U u) throws E; +}