Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dataframe-jdbc/api/dataframe-jdbc.api
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public abstract class org/jetbrains/kotlinx/dataframe/io/db/DbType {
public fun getDefaultFetchSize ()I
public fun getDefaultQueryTimeout ()Ljava/lang/Integer;
public abstract fun getDriverClassName ()Ljava/lang/String;
public fun getTableColumnsMetadata (Ljava/sql/ResultSet;)Ljava/util/List;
public fun getTableTypes ()Ljava/util/List;
public abstract fun isSystemTable (Lorg/jetbrains/kotlinx/dataframe/io/db/TableMetadata;)Z
public fun makeCommonSqlToKTypeMapping (Lorg/jetbrains/kotlinx/dataframe/io/db/TableColumnMetadata;)Lkotlin/reflect/KType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.sql.NClob
import java.sql.PreparedStatement
import java.sql.Ref
import java.sql.ResultSet
import java.sql.ResultSetMetaData
import java.sql.RowId
import java.sql.SQLXML
import java.sql.Time
Expand Down Expand Up @@ -397,4 +398,101 @@ public abstract class DbType(public val dbTypeInJdbcUrl: String) {
val kType = createArrayTypeIfNeeded(kClass, tableColumnMetadata.isNullable)
return kType
}

/**
* Retrieves column metadata from a JDBC ResultSet.
*
* By default, this method reads column metadata from [ResultSetMetaData],
* which is fast and supported by most JDBC drivers.
* If the driver does not provide sufficient information (e.g., `isNullable` unknown),
* it falls back to using [DatabaseMetaData.getColumns] for affected columns.
*
* Override this method in subclasses to provide database-specific behavior
* (for example, to disable fallback for databases like Teradata or Oracle
* where `DatabaseMetaData.getColumns` is known to be slow).
*
* @param resultSet The [ResultSet] containing query results.
* @return A list of [TableColumnMetadata] objects.
*/
public open fun getTableColumnsMetadata(resultSet: ResultSet): List<TableColumnMetadata> {
val rsMetaData = resultSet.metaData
val connection = resultSet.statement.connection
val dbMetaData = connection.metaData
val catalog = connection.catalog.takeUnless { it.isNullOrBlank() }
val schema = connection.schema.takeUnless { it.isNullOrBlank() }

val columnCount = rsMetaData.columnCount
val columns = mutableListOf<TableColumnMetadata>()
val nameCounter = mutableMapOf<String, Int>()

for (index in 1..columnCount) {
val columnName = rsMetaData.getColumnName(index)
val tableName = rsMetaData.getTableName(index)

// Try to detect nullability from ResultSetMetaData
val isNullable = try {
when (rsMetaData.isNullable(index)) {
ResultSetMetaData.columnNoNulls -> false

ResultSetMetaData.columnNullable -> true

ResultSetMetaData.columnNullableUnknown -> {
// Unknown nullability: assume it nullable, may trigger fallback
true
}

else -> true
}
} catch (_: Exception) {
// Some drivers may throw for unsupported features
// In that case, fallback to DatabaseMetaData
dbMetaData.getColumns(catalog, schema, tableName, columnName).use { cols ->
if (cols.next()) !cols.getString("IS_NULLABLE").equals("NO", ignoreCase = true) else true
}
}

val columnType = rsMetaData.getColumnTypeName(index)
val jdbcType = rsMetaData.getColumnType(index)
val displaySize = rsMetaData.getColumnDisplaySize(index)
val javaClassName = rsMetaData.getColumnClassName(index)

val uniqueName = manageColumnNameDuplication(nameCounter, columnName)

columns += TableColumnMetadata(
uniqueName,
columnType,
jdbcType,
displaySize,
javaClassName,
isNullable,
)
}

return columns
}

/**
* Manages the duplication of column names by appending a unique identifier to the original name if necessary.
*
* @param columnNameCounter a mutable map that keeps track of the count for each column name.
* @param originalName the original name of the column to be managed.
* @return the modified column name that is free from duplication.
*/
internal fun manageColumnNameDuplication(columnNameCounter: MutableMap<String, Int>, originalName: String): String {
var name = originalName
val count = columnNameCounter[originalName]

if (count != null) {
var incrementedCount = count + 1
while (columnNameCounter.containsKey("${originalName}_$incrementedCount")) {
incrementedCount++
}
columnNameCounter[originalName] = incrementedCount
name = "${originalName}_$incrementedCount"
} else {
columnNameCounter[originalName] = 0
}

return name
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public fun Connection.readDataFrameSchema(sqlQueryOrTableName: String, dbType: D
* @return the schema of the [ResultSet] as a [DataFrameSchema] object.
*/
public fun DataFrameSchema.Companion.readResultSet(resultSet: ResultSet, dbType: DbType): DataFrameSchema {
val tableColumns = getTableColumnsMetadata(resultSet)
val tableColumns = getTableColumnsMetadata(resultSet, dbType)
return buildSchemaByTableColumns(tableColumns, dbType)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.sql.DatabaseMetaData
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.ResultSetMetaData
import javax.sql.DataSource
import kotlin.reflect.KType

Expand Down Expand Up @@ -181,7 +180,7 @@ private fun executeQueryAndBuildDataFrame(
configureStatement(statement)
logger.debug { "Executing query: $sqlQuery" }
statement.executeQuery().use { rs ->
val tableColumns = getTableColumnsMetadata(rs)
val tableColumns = getTableColumnsMetadata(rs, determinedDbType)
fetchAndConvertDataFromResultSet(tableColumns, rs, determinedDbType, limit, inferNullability)
}
}
Expand Down Expand Up @@ -562,7 +561,7 @@ public fun DataFrame.Companion.readResultSet(
inferNullability: Boolean = true,
): AnyFrame {
validateLimit(limit)
val tableColumns = getTableColumnsMetadata(resultSet)
val tableColumns = getTableColumnsMetadata(resultSet, dbType)
return fetchAndConvertDataFromResultSet(tableColumns, resultSet, dbType, limit, inferNullability)
}

Expand Down Expand Up @@ -852,71 +851,8 @@ private fun readTableAsDataFrame(
return dataFrame
}

/**
* Retrieves the metadata of the columns in the result set.
*
* @param rs the result set
* @return a mutable list of [TableColumnMetadata] objects,
* where each TableColumnMetadata object contains information such as the column type,
* JDBC type, size, and name.
*/
internal fun getTableColumnsMetadata(rs: ResultSet): MutableList<TableColumnMetadata> {
val metaData: ResultSetMetaData = rs.metaData
val numberOfColumns: Int = metaData.columnCount
val tableColumns = mutableListOf<TableColumnMetadata>()
val columnNameCounter = mutableMapOf<String, Int>()
val databaseMetaData: DatabaseMetaData = rs.statement.connection.metaData
val catalog: String? = rs.statement.connection.catalog.takeUnless { it.isNullOrBlank() }
val schema: String? = rs.statement.connection.schema.takeUnless { it.isNullOrBlank() }

for (i in 1 until numberOfColumns + 1) {
val tableName = metaData.getTableName(i)
val columnName = metaData.getColumnName(i)

// this algorithm works correctly only for SQL Table and ResultSet opened on one SQL table
val columnResultSet: ResultSet =
databaseMetaData.getColumns(catalog, schema, tableName, columnName)
val isNullable = if (columnResultSet.next()) {
columnResultSet.getString("IS_NULLABLE") == "YES"
} else {
true // we assume that it's nullable by default
}

val name = manageColumnNameDuplication(columnNameCounter, columnName)
val size = metaData.getColumnDisplaySize(i)
val type = metaData.getColumnTypeName(i)
val jdbcType = metaData.getColumnType(i)
val javaClassName = metaData.getColumnClassName(i)

tableColumns += TableColumnMetadata(name, type, jdbcType, size, javaClassName, isNullable)
}
return tableColumns
}

/**
* Manages the duplication of column names by appending a unique identifier to the original name if necessary.
*
* @param columnNameCounter a mutable map that keeps track of the count for each column name.
* @param originalName the original name of the column to be managed.
* @return the modified column name that is free from duplication.
*/
internal fun manageColumnNameDuplication(columnNameCounter: MutableMap<String, Int>, originalName: String): String {
var name = originalName
val count = columnNameCounter[originalName]

if (count != null) {
var incrementedCount = count + 1
while (columnNameCounter.containsKey("${originalName}_$incrementedCount")) {
incrementedCount++
}
columnNameCounter[originalName] = incrementedCount
name = "${originalName}_$incrementedCount"
} else {
columnNameCounter[originalName] = 0
}

return name
}
internal fun getTableColumnsMetadata(resultSet: ResultSet, dbType: DbType): MutableList<TableColumnMetadata> =
dbType.getTableColumnsMetadata(resultSet).toMutableList()

/**
* Fetches and converts data from a ResultSet into a mutable map.
Expand Down