Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ object ExternalChannelFactoryReflect {
}

factory match {
case fv2: ExternalChannelFactoryV2[T] =>
case fv2: ExternalChannelFactoryV2[T @unchecked] =>
fv2.apply(conf, workflowConfig, parentPath, spark)
case fv1: ExternalChannelFactory[T] =>
case fv1: ExternalChannelFactory[T @unchecked] =>
fv1.apply(conf, parentPath, spark)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait BookkeepingTable {
import profile.api._

class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookkeeping") {
def pramenTableName = column[String]("watcher_table_name", O.Length(128))
def pramenTableName = column[String]("watcher_table_name", O.Length(255))
def infoDate = column[String]("info_date", O.Length(20))
def infoDateBegin = column[String]("info_date_begin", O.Length(20))
def infoDateEnd = column[String]("info_date_end", O.Length(20))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait MetadataTable {
import profile.api._

class MetadataRecords(tag: Tag) extends Table[MetadataRecord](tag, "metadata") {
def pramenTableName = column[String]("table_name", O.Length(128))
def pramenTableName = column[String]("table_name", O.Length(255))
def infoDate = column[String]("info_date", O.Length(20))
def key = column[String]("key", O.Length(255))
def value = column[String]("value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait OffsetTable {
import profile.api._

class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
def pramenTableName = column[String]("table_name", O.Length(256))
def pramenTableName = column[String]("table_name", O.Length(600))
def infoDate = column[String]("info_date", O.Length(20))
def dataType = column[String]("data_type", O.Length(20))
def minOffset = column[String]("min_offset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait SchemaTable {
import profile.api._

class SchemaRecords(tag: Tag) extends Table[SchemaRecord](tag, "schemas") {
def pramenTableName = column[String]("watcher_table_name", O.Length(128))
def pramenTableName = column[String]("watcher_table_name", O.Length(255))
def infoDate = column[String]("info_date", O.Length(20))
def schemaJson = column[String]("schema_json")
def * = (pramenTableName, infoDate, schemaJson) <> (SchemaRecord.tupled, SchemaRecord.unapply)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait JournalTable {

class JournalRecords(tag: Tag) extends Table[JournalTask](tag, "journal") {
def jobName = column[String]("job_name", O.Length(200))
def pramenTableName = column[String]("watcher_table_name", O.Length(128))
def pramenTableName = column[String]("watcher_table_name", O.Length(255))
def periodBegin = column[String]("period_begin", O.Length(20))
def periodEnd = column[String]("period_end", O.Length(20))
def informationDate = column[String]("information_date", O.Length(20))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.rdb

import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.{JdbcBackend, JdbcProfile}
import slick.jdbc._
import slick.util.AsyncExecutor
import za.co.absa.pramen.api.Pramen
import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable}
Expand Down Expand Up @@ -132,6 +132,14 @@ class PramenDb(val jdbcConfig: JdbcConfig,
if (dbVersion == 10) {
addColumn(executionsTable.records.baseTableRow.tableName, "number_of_tasks_completed", "bigint")
}

if (0 < dbVersion && dbVersion < 12) {
alterColumn(bookkeepingTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false)
alterColumn(offsetTable.records.baseTableRow.tableName, "table_name", "varchar(600)", nullable = false)
alterColumn(journalTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false)
alterColumn(metadataTable.records.baseTableRow.tableName, "table_name", "varchar(255)", nullable = false)
alterColumn(schemaTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false)
}
}

private def initTable(schema: slickProfile.SchemaDescription): Unit = {
Expand Down Expand Up @@ -159,6 +167,38 @@ class PramenDb(val jdbcConfig: JdbcConfig,
}
}

private def alterColumn(table: String, columnName: String, columnType: String, nullable: Boolean): Unit = {
val nullSuffix = if (nullable) " NOT NULL" else ""
try {
val quotedTable = slickProfile.quoteIdentifier(table)
val quotedColumnName = slickProfile.quoteIdentifier(columnName)
slickProfile match {
case _: SQLiteProfile =>
log.warn(s"SQLite does not support altering column types. Column '$columnName' in table '$table' will remain with the original type for the url: $activeUrl")
case _: MySQLProfile =>
slickDb.run(
sqlu"ALTER TABLE #$quotedTable MODIFY COLUMN #$quotedColumnName #$columnType $nullSuffix"
).execute()
case _: OracleProfile =>
slickDb.run(
sqlu"ALTER TABLE #$quotedTable MODIFY (#$quotedColumnName #$columnType)"
).execute()
case _: SQLServerProfile | HsqldbProfile =>
slickDb.run(
sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName #$columnType $nullSuffix"
).execute()
case _ =>
// PostgreSQL, H2, and other profiles that support ALTER COLUMN ... TYPE
slickDb.run(
sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName TYPE #$columnType"
).execute()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} catch {
case NonFatal(ex) =>
throw new RuntimeException(s"Unable to alter column: '$columnName $columnType' in table: '$table' for the url: $activeUrl", ex)
}
}


override def close(): Unit = {
try {
Expand All @@ -176,7 +216,7 @@ object PramenDb {
private val log = LoggerFactory.getLogger(this.getClass)
private val conf = Pramen.getConfig

val MODEL_VERSION = 11
val MODEL_VERSION = 12
val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default")
val BACKOFF_MIN_MS: Int = conf.getInt("pramen.internal.connection.backoff.min.ms")
val BACKOFF_MAX_MS: Int = conf.getInt("pramen.internal.connection.backoff.max.ms")
Expand Down
2 changes: 1 addition & 1 deletion pramen/project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
#

sbt.version=1.11.6
sbt.version=1.12.11
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

What is the latest stable version of sbt (Scala Build Tool) and are there any known issues with sbt version 1.12.11?

💡 Result:

Latest stable version of sbt (as of 2026-05-20): sbt 1.12.10. [1][2] Known issues with sbt 1.12.11: 1) The sbt project announced sbt 1.12.11 and its headline change was rolling back an “eviction error in Test configuration” because it broke Scala Native builds. [3] 2) No specific widely-documented security advisories or release-blocking bugs for sbt 1.12.11 were found in the sources I checked. However, sbt 1.12.7 (and earlier affected versions) had a Windows command-injection issue (CVE-2026-32948) fixed in sbt 1.12.7/1.12.8 depending on branch; this means you should ensure your sbt is at least at a version that includes that fix rather than relying on 1.12.11 alone. [4][5][6] 3) Separately, there is a known regression in sbt 1.12.6 where a specific sbt plugin configuration (sbt-site 1.4.1) could cause resolution to hang; this is not about 1.12.11, but it’s an example of known issues around nearby 1.12.x versions. [7] If you want, tell me your OS (Windows/Linux/macOS) and whether you use sbt “source dependencies” (VCS/project refs). I can map those facts to the most relevant advisory/upgrade guidance for your exact risk profile.

Citations:


Consider using sbt 1.12.10 instead of 1.12.11.

As of May 2026, sbt 1.12.10 is the latest stable release, while 1.12.11 appears to be a newer post-stable version. While 1.12.11 does exist and includes a rollback of an eviction error change that broke Scala Native builds, using a non-stable version may introduce unexpected issues. If Scala Native compatibility is not a concern for this project, 1.12.10 is the safer choice.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pramen/project/build.properties` at line 16, Update the sbt.version property
value from 1.12.11 to 1.12.10 in the build properties so the project uses the
stable sbt release; locate the sbt.version entry (the key "sbt.version") and
change its value to "1.12.10", then run a quick build/test to confirm no
regressions.

Loading