diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala index eb07e99d..c879d40d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/ExternalChannelFactoryReflect.scala @@ -49,9 +49,9 @@ object ExternalChannelFactoryReflect { } factory match { - case fv2: ExternalChannelFactoryV2[T] => + case fv2: ExternalChannelFactoryV2[T @unchecked] => fv2.apply(conf, workflowConfig, parentPath, spark) - case fv1: ExternalChannelFactory[T] => + case fv1: ExternalChannelFactory[T @unchecked] => fv1.apply(conf, parentPath, spark) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala index f249ceef..b44eac99 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala @@ -23,7 +23,7 @@ trait BookkeepingTable { import profile.api._ class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookkeeping") { - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def infoDateBegin = column[String]("info_date_begin", O.Length(20)) def infoDateEnd = column[String]("info_date_end", O.Length(20)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala index d803bb86..f574f31d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala @@ -23,7 +23,7 @@ trait MetadataTable { import profile.api._ class MetadataRecords(tag: Tag) extends Table[MetadataRecord](tag, "metadata") { - def pramenTableName = column[String]("table_name", O.Length(128)) + def pramenTableName = column[String]("table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def key = column[String]("key", O.Length(255)) def value = column[String]("value") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala index 162b1608..4acc7a96 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala @@ -23,7 +23,7 @@ trait OffsetTable { import profile.api._ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") { - def pramenTableName = column[String]("table_name", O.Length(256)) + def pramenTableName = column[String]("table_name", O.Length(600)) def infoDate = column[String]("info_date", O.Length(20)) def dataType = column[String]("data_type", O.Length(20)) def minOffset = column[String]("min_offset") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala index 7ad1a60c..4b55a144 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala @@ -23,7 +23,7 @@ trait SchemaTable { import profile.api._ class SchemaRecords(tag: Tag) extends Table[SchemaRecord](tag, "schemas") { - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def infoDate = column[String]("info_date", O.Length(20)) def schemaJson = column[String]("schema_json") def * = (pramenTableName, infoDate, schemaJson) <> (SchemaRecord.tupled, SchemaRecord.unapply) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala index 9d83c119..440f69fc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala @@ -24,7 +24,7 @@ trait JournalTable { class JournalRecords(tag: Tag) extends Table[JournalTask](tag, "journal") { def jobName = column[String]("job_name", O.Length(200)) - def pramenTableName = column[String]("watcher_table_name", O.Length(128)) + def pramenTableName = column[String]("watcher_table_name", O.Length(255)) def periodBegin = column[String]("period_begin", O.Length(20)) def periodEnd = column[String]("period_end", O.Length(20)) def informationDate = column[String]("information_date", O.Length(20)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index cf74ad43..be8e6bd2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.rdb import org.slf4j.LoggerFactory import slick.jdbc.JdbcBackend.Database -import slick.jdbc.{JdbcBackend, JdbcProfile} +import slick.jdbc._ import slick.util.AsyncExecutor import za.co.absa.pramen.api.Pramen import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable} @@ -132,6 +132,14 @@ class PramenDb(val jdbcConfig: JdbcConfig, if (dbVersion == 10) { addColumn(executionsTable.records.baseTableRow.tableName, "number_of_tasks_completed", "bigint") } + + if (0 < dbVersion && dbVersion < 12) { + alterColumn(bookkeepingTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) + alterColumn(offsetTable.records.baseTableRow.tableName, "table_name", "varchar(600)", nullable = false) + alterColumn(journalTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) + alterColumn(metadataTable.records.baseTableRow.tableName, "table_name", "varchar(255)", nullable = false) + alterColumn(schemaTable.records.baseTableRow.tableName, "watcher_table_name", "varchar(255)", nullable = false) + } } private def initTable(schema: slickProfile.SchemaDescription): Unit = { @@ -159,6 +167,38 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } + private def alterColumn(table: String, columnName: String, columnType: String, nullable: Boolean): Unit = { + val nullSuffix = if (nullable) " NOT NULL" else "" + try { + val quotedTable = slickProfile.quoteIdentifier(table) + val quotedColumnName = slickProfile.quoteIdentifier(columnName) + slickProfile match { + case _: SQLiteProfile => + log.warn(s"SQLite does not support altering column types. Column '$columnName' in table '$table' will remain with the original type for the url: $activeUrl") + case _: MySQLProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable MODIFY COLUMN #$quotedColumnName #$columnType $nullSuffix" + ).execute() + case _: OracleProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable MODIFY (#$quotedColumnName #$columnType)" + ).execute() + case _: SQLServerProfile | HsqldbProfile => + slickDb.run( + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName #$columnType $nullSuffix" + ).execute() + case _ => + // PostgreSQL, H2, and other profiles that support ALTER COLUMN ... TYPE + slickDb.run( + sqlu"ALTER TABLE #$quotedTable ALTER COLUMN #$quotedColumnName TYPE #$columnType" + ).execute() + } + } catch { + case NonFatal(ex) => + throw new RuntimeException(s"Unable to alter column: '$columnName $columnType' in table: '$table' for the url: $activeUrl", ex) + } + } + override def close(): Unit = { try { @@ -176,7 +216,7 @@ object PramenDb { private val log = LoggerFactory.getLogger(this.getClass) private val conf = Pramen.getConfig - val MODEL_VERSION = 11 + val MODEL_VERSION = 12 val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default") val BACKOFF_MIN_MS: Int = conf.getInt("pramen.internal.connection.backoff.min.ms") val BACKOFF_MAX_MS: Int = conf.getInt("pramen.internal.connection.backoff.max.ms") diff --git a/pramen/project/build.properties b/pramen/project/build.properties index 5443af66..7fecd510 100644 --- a/pramen/project/build.properties +++ b/pramen/project/build.properties @@ -13,4 +13,4 @@ # limitations under the License. # -sbt.version=1.11.6 +sbt.version=1.12.11