Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public CompletionStage<LoadedData> getSample() {
Connection connection = getConnection();
var result = queryData(sampleMetadata, 100, connection);
var rowReader =
new RowReader(tableInfo, new ResultSetValuesGetter(tableInfo, result), sampleMetadata, connection, true);
new RowReader(tableInfo, new ResultSetValuesGetter(tableInfo, result, queryDialect), sampleMetadata, connection, true);
var inputStream = new ResultSetInputStream(new CsvRowConverter(tableInfo), rowReader, true);
var loadedData = new LoadedData(inputStream, Instant.now());
return CompletableFuture.completedFuture(loadedData);
Expand Down Expand Up @@ -462,7 +462,7 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result

// Value getter + Some of the code in RowReader are needed only because we insist on running a single query
// and using a single result set for all ranges. If we allow query per window a lot of the code can be simplified.
var valueGetter = new ResultSetValuesGetter(tableInfo, resultSet);
var valueGetter = new ResultSetValuesGetter(tableInfo, resultSet, queryDialect);

for (int i = 0; i < wantedRanges.size(); i++) {
final var isLast = i == wantedRanges.size() - 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package com.upsolver.datasources.jdbc;

import com.upsolver.datasources.jdbc.metadata.TableInfo;
import com.upsolver.datasources.jdbc.querybuilders.QueryDialect;
import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

class ResultSetValuesGetter implements AutoCloseable {
private final TableInfo tableInfo;
private final ResultSet underlying;
private final List<ThrowingBiFunction<ResultSet, Integer, String, SQLException>> valueGetters;

private String[] nextValues = null;
private long nextIncValue;
private Timestamp nextTimestampValue;
private boolean onNextValues = false;

ResultSetValuesGetter(TableInfo tableInfo, ResultSet underlying) {
ResultSetValuesGetter(TableInfo tableInfo, ResultSet underlying, QueryDialect queryDialect) {
this.tableInfo = tableInfo;
this.underlying = underlying;
valueGetters = initValueGetters(queryDialect);
}

public boolean next() throws SQLException {
Expand Down Expand Up @@ -74,7 +81,7 @@ public String[] getValues() throws SQLException {
} else {
var result = new String[tableInfo.getColumnCount()];
for (int i = 0; i < tableInfo.getColumnCount(); i++) {
result[i] = underlying.getString(i + 1); // Column indices start at 1 (☉_☉)
result[i] = valueGetters.get(i).apply(underlying, i + 1); // Column indices start at 1 (☉_☉)
}
return result;
}
Expand All @@ -84,4 +91,18 @@ public String[] getValues() throws SQLException {
public void close() throws Exception {
underlying.close();
}

private List<ThrowingBiFunction<ResultSet, Integer, String, SQLException>> initValueGetters(QueryDialect queryDialect) {
try {
ResultSetMetaData md = underlying.getMetaData();
int n = md.getColumnCount();
List<ThrowingBiFunction<ResultSet, Integer, String, SQLException>> valueGetters = new ArrayList<>();
for (int i = 0; i < n; i++) {
valueGetters.add(queryDialect.getStringValueGetter(md.getColumnType(i + 1)));
}
return valueGetters;
} catch (SQLException e) {
throw new RuntimeException("Error while retrieving table metadata", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.upsolver.datasources.jdbc.metadata.SimpleSqlType;
import com.upsolver.datasources.jdbc.metadata.TableInfo;
import com.upsolver.datasources.jdbc.utils.NamedPreparedStatment;
import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,7 +18,9 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

public class DefaultQueryDialect implements QueryDialect {
private static final Logger logger = LoggerFactory.getLogger(DefaultQueryDialect.class);
Expand All @@ -29,6 +32,17 @@ public class DefaultQueryDialect implements QueryDialect {
JDBCType.TIMESTAMP_WITH_TIMEZONE
));

private static final ThrowingBiFunction<ResultSet, Integer, String, SQLException> getString = ResultSet::getString;
private final Map<Integer, ThrowingBiFunction<ResultSet, Integer, String, SQLException>> valueGetters;

public DefaultQueryDialect() {
this(Collections.emptyMap());
}

public DefaultQueryDialect(Map<Integer, ThrowingBiFunction<ResultSet, Integer, String, SQLException>> valueGetters) {
this.valueGetters = valueGetters;
}

@Override
public long utcOffsetSeconds(Connection connection) throws SQLException {
var rs = connection.prepareStatement("SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP)").executeQuery();
Expand Down Expand Up @@ -245,4 +259,9 @@ public Connection getConnection(String url, java.util.Properties info) throws SQ
public String getDriverClassName() {
return null;
}

@Override
public ThrowingBiFunction<ResultSet, Integer, String, SQLException> getStringValueGetter(int sqlType) {
return valueGetters.getOrDefault(sqlType, getString);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package com.upsolver.datasources.jdbc.querybuilders;

import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction;
import oracle.jdbc.OracleType;

import java.math.BigInteger;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLType;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;

public class OracleQueryDialect extends DefaultQueryDialect {
Expand All @@ -19,6 +24,13 @@ public class OracleQueryDialect extends DefaultQueryDialect {
OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE
));

private static final ThrowingBiFunction<ResultSet, Integer, String, SQLException> blobAsString = (rs, i) -> Optional.ofNullable(rs.getBytes(i)).map(bytes -> new BigInteger(1, bytes).toString(16)).orElse(null);
private static final Map<Integer, ThrowingBiFunction<ResultSet, Integer, String, SQLException>> blobValueGetters = Collections.singletonMap(Types.BLOB, blobAsString);

public OracleQueryDialect() {
super(blobValueGetters);
}

@Override
public long utcOffsetSeconds(Connection connection) throws SQLException {
var rs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import com.upsolver.datasources.jdbc.JDBCTaskMetadata;
import com.upsolver.datasources.jdbc.metadata.TableInfo;
import com.upsolver.datasources.jdbc.utils.NamedPreparedStatment;
import com.upsolver.datasources.jdbc.utils.ThrowingBiFunction;

import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -70,4 +70,5 @@ NamedPreparedStatment queryFullTable(TableInfo tableInfo,

String getDriverClassName();

ThrowingBiFunction<ResultSet, Integer, String, SQLException> getStringValueGetter(int sqlType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.upsolver.datasources.jdbc.utils;

@FunctionalInterface
public interface ThrowingBiFunction<T, U, R, E extends Throwable> {
R apply(T t, U u) throws E;
}