Skip to content

Add migration from old to new bookeeping model for Delta Lake tables and paths #754

@yruslan

Description

@yruslan

Describe the bug

When updating Pramen to the version when 'batchIId' field is introduced the following error occurs:

rg.apache.spark.sql.catalyst.ExtendedAnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `batchId` cannot be resolved. Did you mean one of the following? [`jobStarted`, `infoDate`, `infoDateEnd`, `jobFinished`, `tableName`]. SQLSTATE: 42703;
'DeserializeToObject newInstance(class za.co.absa.pramen.core.model.DataChunk), obj#4037: za.co.absa.pramen.core.model.DataChunk
+- LocalRelation <empty>, [tableName#4011, infoDate#4012, infoDateBegin#4013, infoDateEnd#4014, inputRecordCount#4015L, outputRecordCount#4016L, jobStarted#4017L, jobFinished#4018L]

  org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:552)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:179)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$10(CheckAnalysis.scala:489)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$10$adapted(CheckAnalysis.scala:474)
  org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:329)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:328)
  scala.collection.immutable.Vector.foreach(Vector.scala:1895)
  org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:328)
  scala.collection.immutable.Vector.foreach(Vector.scala:1895)
  org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:328)
  scala.collection.immutable.Vector.foreach(Vector.scala:1895)
  org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:328)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$9(CheckAnalysis.scala:474)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$9$adapted(CheckAnalysis.scala:474)
  scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
  scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
  scala.collection.AbstractIterable.foreach(Iterable.scala:933)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:474)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:307)
  org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:329)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:307)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:278)
  org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:450)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:263)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:250)
  org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:250)
  org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:450)
  org.apache.spark.sql.catalyst.encoders.BaseExpressionEncoder.resolveAndBindDeserializer(ExpressionEncoder.scala:390)
  org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:554)
  org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:459)
  org.apache.spark.sql.Dataset.resolvedEnc$lzycompute(Dataset.scala:426)
  org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolvedEnc(Dataset.scala:425)
  org.apache.spark.sql.Dataset$.apply(Dataset.scala:100)
  org.apache.spark.sql.Dataset.as(Dataset.scala:671)
  za.co.absa.pramen.core.bookkeeper.BookkeeperDeltaTable.getBkDf(BookkeeperDeltaTable.scala:54)
  za.co.absa.pramen.core.bookkeeper.BookkeeperDeltaBase.getBkData(BookkeeperDeltaBase.scala:125)
  za.co.absa.pramen.core.bookkeeper.BookkeeperDeltaBase.getLatestDataChunkFromStorage(BookkeeperDeltaBase.scala:64)
  za.co.absa.pramen.core.bookkeeper.BookkeeperBase.getLatestDataChunk(BookkeeperBase.scala:96)
  za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils$.getRerun(ScheduleStrategyUtils.scala:56)
  za.co.absa.pramen.core.runner.splitter.ScheduleStrategySourcing.getDaysToRun(ScheduleStrategySourcing.scala:109)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.runEagerJob(ConcurrentJobRunnerImpl.scala:153)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.runJob(ConcurrentJobRunnerImpl.scala:134)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.$anonfun$workerLoop$1(ConcurrentJobRunnerImpl.scala:88)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.$anonfun$workerLoop$1$adapted(ConcurrentJobRunnerImpl.scala:84)
  scala.Option.foreach(Option.scala:437)
  com.github.yruslan.channel.Channel.foreach(Channel.scala:72)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.workerLoop(ConcurrentJobRunnerImpl.scala:84)
  za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl.$anonfun$startWorkerLoop$2(ConcurrentJobRunnerImpl.scala:64)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
  scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
  java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  java.base/java.lang.Thread.run(Thread.java:840)

Code and/or configuration snippet that caused the issue

--

Expected behavior

Delta Tables should auto-migrate.

Context

  • Pramen/pramen-py version: 1.13.17

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions