From b0eeec0c426ca033801431a572be03441bde3a09 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 20 May 2026 13:19:27 +0200 Subject: [PATCH 1/3] #750 Increase the size of pramen_table varchar column. --- .../core/ExternalChannelFactoryReflect.scala | 4 ++-- .../bookkeeper/model/BookkeepingTable.scala | 2 +- .../core/bookkeeper/model/MetadataTable.scala | 2 +- .../core/bookkeeper/model/OffsetTable.scala | 2 +- .../core/bookkeeper/model/SchemaTable.scala | 2 +- .../core/journal/model/JournalTable.scala | 2 +- .../za/co/absa/pramen/core/rdb/PramenDb.scala | 23 ++++++++++++++++++- pramen/project/build.properties | 2 +- 8 files changed, 30 insertions(+), 9 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala index eb07e99d9..c879d40d0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala @@ -49,9 +49,9 @@ object ExternalChannelFactoryReflect { } factory match { - case fv2: ExternalChannelFactoryV2[T] => + case fv2: ExternalChannelFactoryV2[T @unchecked] => fv2.apply(conf, workflowConfig, parentPath, spark) - case fv1: ExternalChannelFactory[T] => + case fv1: ExternalChannelFactory[T @unchecked] => fv1.apply(conf, parentPath, spark) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala index f249ceef1..b44eac99b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala @@ -23,7 +23,7 @@ trait BookkeepingTable { import profile.api._ class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookkeeping") { - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def infoDateBegin = column[String]("info_date_begin", O.Length(20)) def infoDateEnd = column[String]("info_date_end", O.Length(20)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala index d803bb869..f574f31db 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala @@ -23,7 +23,7 @@ trait MetadataTable { import profile.api._ class MetadataRecords(tag: Tag) extends Table[MetadataRecord](tag, "metadata") { - def pramenTableName = column[String]("table_name", O.Length(128)) + def pramenTableName = column[String]("table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def key = column[String]("key", O.Length(255)) def value = column[String]("value") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala index 162b16086..4acc7a968 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala @@ -23,7 +23,7 @@ trait OffsetTable { import profile.api._ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") { - def pramenTableName = column[String]("table_name", O.Length(256)) + def pramenTableName = column[String]("table_name", O.Length(600)) def infoDate = column[String]("info_date", O.Length(20)) def dataType = column[String]("data_type", O.Length(20)) def minOffset = column[String]("min_offset") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala index 7ad1a60c7..4b55a144b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala @@ -23,7 +23,7 @@ trait SchemaTable { import profile.api._ class SchemaRecords(tag: Tag) extends Table[SchemaRecord](tag, "schemas") { - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def schemaJson = column[String]("schema_json") def * = (pramenTableName, infoDate, schemaJson) <> (SchemaRecord.tupled, SchemaRecord.unapply) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala index 9d83c1194..440f69fc8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala @@ -24,7 +24,7 @@ trait JournalTable { class JournalRecords(tag: Tag) extends Table[JournalTask](tag, "journal") { def jobName = column[String]("job_name", O.Length(200)) - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def periodBegin = column[String]("period_begin", O.Length(20)) def periodEnd = column[String]("period_end", O.Length(20)) def informationDate = column[String]("information_date", O.Length(20)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index cf74ad43e..0e3665e7b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -132,6 +132,14 @@ class PramenDb(val jdbcConfig: JdbcConfig, if (dbVersion == 10) { addColumn(executionsTable.records.baseTableRow.tableName, "number_of_tasks_completed", "bigint") } + + if (0 < dbVersion && dbVersion < 12) { + alterColumn(bookkeepingTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") + alterColumn(offsetTable.records.baseTableRow.tableName, "table_name", "varchar(600)") + alterColumn(journalTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") + alterColumn(metadataTable.records.baseTableRow.tableName, "table_name", "varchar(255)") + alterColumn(schemaTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") + } } private def initTable(schema: slickProfile.SchemaDescription): Unit = { @@ -159,6 +167,19 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } + private def alterColumn(table: String, columnName: String, columnType: String): Unit = { + try { + val quotedTable = slickProfile.quoteIdentifier(table) + val quotedColumnName = slickProfile.quoteIdentifier(columnName) + slickDb.run( + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName TYPE #$columnType" + ).execute() + } catch { + case NonFatal(ex) => + throw new RuntimeException(s"Unable to alter column: '$columnName $columnType' in table: '$table' for the url: $activeUrl", ex) + } + } + override def close(): Unit = { try { @@ -176,7 +197,7 @@ object PramenDb { private val log = LoggerFactory.getLogger(this.getClass) private val conf = Pramen.getConfig - val MODEL_VERSION = 11 + val MODEL_VERSION = 12 val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default") val BACKOFF_MIN_MS: Int = conf.getInt("pramen.internal.connection.backoff.min.ms") val BACKOFF_MAX_MS: Int = conf.getInt("pramen.internal.connection.backoff.max.ms") diff --git a/pramen/project/build.properties b/pramen/project/build.properties index 5443af668..7fecd5107 100644 --- a/pramen/project/build.properties +++ b/pramen/project/build.properties @@ -13,4 +13,4 @@ # limitations under the License. # -sbt.version=1.11.6 +sbt.version=1.12.11 From 037cfe63967283eb06506338d764ffacaf48d74f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 20 May 2026 13:57:10 +0200 Subject: [PATCH 2/3] #750 Fix various bookeeping DBs migration ALTER COLUMN steps. --- .../za/co/absa/pramen/core/rdb/PramenDb.scala | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 0e3665e7b..137343d9b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.rdb import org.slf4j.LoggerFactory import slick.jdbc.JdbcBackend.Database -import slick.jdbc.{JdbcBackend, JdbcProfile} +import slick.jdbc._ import slick.util.AsyncExecutor import za.co.absa.pramen.api.Pramen import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable} @@ -171,9 +171,27 @@ class PramenDb(val jdbcConfig: JdbcConfig, try { val quotedTable = slickProfile.quoteIdentifier(table) val quotedColumnName = slickProfile.quoteIdentifier(columnName) - slickDb.run( - sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName TYPE #$columnType" - ).execute() + slickProfile match { + case _: SQLiteProfile => + log.warn(s"SQLite does not support altering column types. Column '$columnName' in table '$table' will remain with the original type for the url: $activeUrl") + case _: MySQLProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable MODIFY COLUMN #$quotedColumnName #$columnType" + ).execute() + case _: OracleProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable MODIFY (#$quotedColumnName #$columnType)" + ).execute() + case _: SQLServerProfile | HsqldbProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName #$columnType" + ).execute() + case _ => + // PostgreSQL, H2, and other profiles that support ALTER COLUMN ... TYPE + slickDb.run( + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName TYPE #$columnType" + ).execute() + } } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to alter column: '$columnName $columnType' in table: '$table' for the url: $activeUrl", ex) From 8e7a3da681fff17fe61c1e7d510e1d68dccffd50 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 20 May 2026 14:43:43 +0200 Subject: [PATCH 3/3] #750 Fix nullability in bookeeping DBs migration ALTER COLUMN steps. --- .../za/co/absa/pramen/core/rdb/PramenDb.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 137343d9b..be8e6bd29 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -134,11 +134,11 @@ class PramenDb(val jdbcConfig: JdbcConfig, } if (0 < dbVersion && dbVersion < 12) { - alterColumn(bookkeepingTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") - alterColumn(offsetTable.records.baseTableRow.tableName, "table_name", "varchar(600)") - alterColumn(journalTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") - alterColumn(metadataTable.records.baseTableRow.tableName, "table_name", "varchar(255)") - alterColumn(schemaTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)") + alterColumn(bookkeepingTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) + alterColumn(offsetTable.records.baseTableRow.tableName, "table_name", "varchar(600)", nullable = false) + alterColumn(journalTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) + alterColumn(metadataTable.records.baseTableRow.tableName, "table_name", "varchar(255)", nullable = false) + alterColumn(schemaTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) } } @@ -167,7 +167,8 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } - private def alterColumn(table: String, columnName: String, columnType: String): Unit = { + private def alterColumn(table: String, columnName: String, columnType: String, nullable: Boolean): Unit = { + val nullSuffix = if (nullable) " NOT NULL" else "" try { val quotedTable = slickProfile.quoteIdentifier(table) val quotedColumnName = slickProfile.quoteIdentifier(columnName) @@ -176,7 +177,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, log.warn(s"SQLite does not support altering column types. Column '$columnName' in table '$table' will remain with the original type for the url: $activeUrl") case _: MySQLProfile => slickDb.run( - sqlu"ALTER TABLE #$quotedTable MODIFY COLUMN #$quotedColumnName #$columnType" + sqlu"ALTER TABLE #$quotedTable MODIFY COLUMN #$quotedColumnName #$columnType $nullSuffix" ).execute() case _: OracleProfile => slickDb.run( @@ -184,7 +185,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, ).execute() case _: SQLServerProfile | HsqldbProfile => slickDb.run( - sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName #$columnType" + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName #$columnType $nullSuffix" ).execute() case _ => // PostgreSQL, H2, and other profiles that support ALTER COLUMN ... TYPE