Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 26 minutes and 55 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
WalkthroughAdds an opt-in in-place Hive schema replacement flow: a new Changes
Sequence Diagram(s)sequenceDiagram
participant Task as TaskRunner/Job
participant JobBase as JobBase
participant Metastore as MetastoreImpl
participant HiveHelper as HiveHelper (SQL/SparkCatalog)
participant HiveConfig as HiveConfig
Task->>JobBase: createOrRefreshHiveTable(..., updateSchema=true)
JobBase->>Metastore: repairOrCreateHiveTable(..., updateSchema=true)
alt table exists and updateSchema=true
Metastore->>HiveHelper: replaceHiveTableSchema(schema, partitionBy, db, table)
HiveHelper->>HiveConfig: read replaceSchemaTemplate
HiveConfig-->>HiveHelper: template (ALTER TABLE ... REPLACE COLUMNS ...)
HiveHelper->>HiveHelper: render SQL with `@fullTableName/`@schema
HiveHelper->>HiveHelper: execute SQL (spark.sql / queryExecutor)
else table exists and updateSchema=false
Metastore->>Metastore: fall back to add partition or repair (MSCK REPAIR TABLE)
end
Metastore-->>JobBase: done
JobBase-->>Task: return results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala (1)
206-218:⚠️ Potential issue | 🟠 MajorAvoid falling through to full table repair after schema replacement.
When
updateSchema = trueandhivePreferAddPartitionis false, this still callsrepairHiveTable, reintroducing the expensive partition re-scan the PR is meant to avoid. Prefer registering only the current Parquet partition on the schema-update path, or explicitly skip repair when the table already exists.🐛 Proposed adjustment
if (updateSchema) { - log.info(s"Updating schema of the Hive table table '$fullTableName'") + log.info(s"Updating schema of the Hive table '$fullTableName'") hiveHelper.replaceHiveTableSchema(effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable) } - if (mt.hivePreferAddPartition && mt.format.isInstanceOf[DataFormat.Parquet]) { + val shouldAddSinglePartition = + mt.format.isInstanceOf[DataFormat.Parquet] && (mt.hivePreferAddPartition || updateSchema) + + if (shouldAddSinglePartition) { val location = new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}") log.info(s"The table '$fullTableName' exists. Adding partition '$location'...") hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), location.toString) + } else if (updateSchema) { + log.info(s"The table '$fullTableName' exists and its schema was updated. Skipping full table repair.") } else { log.info(s"The table '$fullTableName' exists. Repairing it.") hiveHelper.repairHiveTable(mt.hiveConfig.database, hiveTable, format) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala` around lines 206 - 218, The current flow in MetastoreImpl calls hiveHelper.repairHiveTable even when updateSchema is true, causing an expensive full repair; modify the logic so that after hiveHelper.replaceHiveTableSchema(effectiveSchema, ... ) you do not fall through to repairHiveTable: if mt.format is DataFormat.Parquet then register only the current partition by calling hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}").toString), otherwise skip the repair entirely (do not call hiveHelper.repairHiveTable) when updateSchema was performed; keep the existing add-partition branch for the mt.hivePreferAddPartition=true case unchanged.
🧹 Nitpick comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala (1)
123-126: Prefer named arguments forHiveQueryTemplates.Now that another string template was inserted into the case class, positional construction is easy to mis-order and hard to review.
♻️ Proposed refactor
- templates = HiveQueryTemplates(createTableTemplate, createOnlyTableTemplate, updateSchemaTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate), + templates = HiveQueryTemplates( + createTableTemplate = createTableTemplate, + createOnlyTableTemplate = createOnlyTableTemplate, + replaceSchemaTemplate = updateSchemaTemplate, + repairTableTemplate = repairTableTemplate, + addPartitionTemplate = addPartitionTableTemplate, + dropTableTemplate = dropTableTemplate + ),- HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, DEFAULT_UPDATE_SCHEMA_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE), + HiveQueryTemplates( + createTableTemplate = DEFAULT_CREATE_TABLE_TEMPLATE, + createOnlyTableTemplate = DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, + replaceSchemaTemplate = DEFAULT_UPDATE_SCHEMA_TEMPLATE, + repairTableTemplate = DEFAULT_REPAIR_TABLE_TEMPLATE, + addPartitionTemplate = DEFAULT_ADD_PARTITION_TEMPLATE, + dropTableTemplate = DEFAULT_DROP_TABLE_TEMPLATE + ),Also applies to: 147-150
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala` around lines 123 - 126, The construction of HiveQueryTemplates using positional arguments is fragile after a new template parameter was added; update the HiveConfig instantiations that pass templates (where HiveQueryTemplates(...) is used) to use named arguments for each template parameter (e.g., createTableTemplate = ..., createOnlyTableTemplate = ..., updateSchemaTemplate = ..., repairTableTemplate = ..., addPartitionTableTemplate = ..., dropTableTemplate = ... or the exact parameter names from HiveQueryTemplates), and do the same for the other occurrence mentioned so the order cannot be mistaken.pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala (1)
25-101: Naming inconsistency: "replace" vs "update" schema.The new field/config key use "replace" (
replaceSchemaTemplate,REPLACE_SCHEMA_TEMPLATE_KEY = "replace.schema.template"), but the default constant and the local binding infromConfiguse "update" (DEFAULT_UPDATE_SCHEMA_TEMPLATE,val updateSchemaTemplate = ...). Since the SQL it emits isALTER TABLE ... REPLACE COLUMNS, aligning everything onREPLACEwould be more consistent and easier to grep.♻️ Suggested rename
- val DEFAULT_UPDATE_SCHEMA_TEMPLATE: String = "ALTER TABLE `@fullTableName` REPLACE COLUMNS ( `@schema` );" + val DEFAULT_REPLACE_SCHEMA_TEMPLATE: String = "ALTER TABLE `@fullTableName` REPLACE COLUMNS ( `@schema` );" @@ - val updateSchemaTemplate = ConfigUtils.getOptionString(conf, REPLACE_SCHEMA_TEMPLATE_KEY) - .getOrElse(DEFAULT_UPDATE_SCHEMA_TEMPLATE) + val replaceSchemaTemplate = ConfigUtils.getOptionString(conf, REPLACE_SCHEMA_TEMPLATE_KEY) + .getOrElse(DEFAULT_REPLACE_SCHEMA_TEMPLATE) @@ - replaceSchemaTemplate = updateSchemaTemplate, + replaceSchemaTemplate = replaceSchemaTemplate, @@ - replaceSchemaTemplate = DEFAULT_UPDATE_SCHEMA_TEMPLATE, + replaceSchemaTemplate = DEFAULT_REPLACE_SCHEMA_TEMPLATE,Don't forget to update the reference in
HiveHelperSparkCatalog.scala(line 86) accordingly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala` around lines 25 - 101, The codebase mixes "replace" and "update" naming for the schema template: rename DEFAULT_UPDATE_SCHEMA_TEMPLATE -> DEFAULT_REPLACE_SCHEMA_TEMPLATE and the local val updateSchemaTemplate -> replaceSchemaTemplate in HiveQueryTemplates.fromConfig, update any references where DEFAULT_UPDATE_SCHEMA_TEMPLATE or updateSchemaTemplate are used (including the HiveQueryTemplates constructor call that currently passes updateSchemaTemplate into replaceSchemaTemplate), and adjust HiveHelperSparkCatalog.scala (the usage at line referenced in review) to use the new DEFAULT_REPLACE_SCHEMA_TEMPLATE / replaceSchemaTemplate identifiers so all symbols (REPLACE_SCHEMA_TEMPLATE_KEY, replaceSchemaTemplate, DEFAULT_REPLACE_SCHEMA_TEMPLATE) consistently use "replace".pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala (1)
73-92: Consider verifying the table exists before issuingALTER TABLE.
replaceHiveTableSchemaassumes the table already exists — if called on a missing table, theALTER TABLE ... REPLACE COLUMNSwill fail with a Spark/Hive parse/analysis error that is less informative than the explicitIllegalStateExceptionused elsewhere in this class (seecreateHiveTable/createOrUpdateHiveTable). If callers always guard withdoesTableExistfirst, this is fine; otherwise consider adding a guard here for symmetry with the rest of the API.Also note: unlike
HiveHelperSql(which readshiveConfig.replaceSchemaTemplate), this implementation hardcodesHiveQueryTemplates.DEFAULT_UPDATE_SCHEMA_TEMPLATE, so a user-configuredreplace.schema.templatewill have no effect when using the Spark Catalog backend. That matches the existing behavior for other operations in this class (which bypass templates entirely), but worth confirming this is intentional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala` around lines 73 - 92, The replaceHiveTableSchema method currently runs an ALTER/REPLACE without checking existence and hardcodes HiveQueryTemplates.DEFAULT_UPDATE_SCHEMA_TEMPLATE; update replaceHiveTableSchema to first call doesTableExist(fullTableName) (or the existing doesTableExist method in this class) and throw the same IllegalStateException used by createHiveTable/createOrUpdateHiveTable if the table is missing, and modify the SQL template usage to respect hiveConfig.replaceSchemaTemplate (fall back to HiveQueryTemplates.DEFAULT_UPDATE_SCHEMA_TEMPLATE if the config value is empty) so the Spark Catalog backend honors configured templates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 426-427: The updateSchema decision currently only checks
schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty
and thus skips Hive metadata updates when handleSchemaChange() returned (true,
Nil) for a first-time registration; modify the logic so updateSchema is true not
only when the change lists are non-empty but also when the boolean result from
handleSchemaChange() indicates a registration happened (i.e., consider the
boolean part of handleSchemaChange() for before/after schema checks) and then
call task.job.createOrRefreshHiveTable(...) when either a change list is
non-empty OR the corresponding handleSchemaChange boolean is true to ensure
first-time schema registration triggers replaceHiveTableSchema.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala`:
- Line 53: The MAX_VARCHAR_LENGTH constant in SparkUtils (val
MAX_VARCHAR_LENGTH) does not match the boundary used by
JdbcSparkUtils.addMetadata (which writes maxLength for lengths < 8192), causing
mismatched treatment of varchar(>4096 and <8192) values; fix by centralizing the
boundary constant (e.g., create a shared VarcharLengthLimit constant) or change
val MAX_VARCHAR_LENGTH to 8192 and update all usages (including
SparkUtils.MAX_VARCHAR_LENGTH and JdbcSparkUtils.addMetadata) so both producer
and consumer use the exact same exclusive bound.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scala`:
- Around line 79-81: MetastoreImpl.repairOrCreateHiveTable currently calls
replaceHiveTableSchema unconditionally when updateSchema is true which throws
AnalysisException for partitioned Parquet tables with SparkCatalog; wrap the
call to replaceHiveTableSchema inside a try-catch that catches AnalysisException
(or Throwable), logs a warning with the exception, and falls back to recreating
the table (i.e., call the existing table creation/replacement code path used
when replacement is not possible) for SparkCatalog/partitioned tables; ensure
you check the partitioned-table condition and still respect updateSchema while
providing the fallback behavior in MetastoreImpl.repairOrCreateHiveTable.
---
Outside diff comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala`:
- Around line 206-218: The current flow in MetastoreImpl calls
hiveHelper.repairHiveTable even when updateSchema is true, causing an expensive
full repair; modify the logic so that after
hiveHelper.replaceHiveTableSchema(effectiveSchema, ... ) you do not fall through
to repairHiveTable: if mt.format is DataFormat.Parquet then register only the
current partition by calling hiveHelper.addPartition(mt.hiveConfig.database,
hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), new
Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}").toString), otherwise
skip the repair entirely (do not call hiveHelper.repairHiveTable) when
updateSchema was performed; keep the existing add-partition branch for the
mt.hivePreferAddPartition=true case unchanged.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala`:
- Around line 123-126: The construction of HiveQueryTemplates using positional
arguments is fragile after a new template parameter was added; update the
HiveConfig instantiations that pass templates (where HiveQueryTemplates(...) is
used) to use named arguments for each template parameter (e.g.,
createTableTemplate = ..., createOnlyTableTemplate = ..., updateSchemaTemplate =
..., repairTableTemplate = ..., addPartitionTableTemplate = ...,
dropTableTemplate = ... or the exact parameter names from HiveQueryTemplates),
and do the same for the other occurrence mentioned so the order cannot be
mistaken.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala`:
- Around line 73-92: The replaceHiveTableSchema method currently runs an
ALTER/REPLACE without checking existence and hardcodes
HiveQueryTemplates.DEFAULT_UPDATE_SCHEMA_TEMPLATE; update replaceHiveTableSchema
to first call doesTableExist(fullTableName) (or the existing doesTableExist
method in this class) and throw the same IllegalStateException used by
createHiveTable/createOrUpdateHiveTable if the table is missing, and modify the
SQL template usage to respect hiveConfig.replaceSchemaTemplate (fall back to
HiveQueryTemplates.DEFAULT_UPDATE_SCHEMA_TEMPLATE if the config value is empty)
so the Spark Catalog backend honors configured templates.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala`:
- Around line 25-101: The codebase mixes "replace" and "update" naming for the
schema template: rename DEFAULT_UPDATE_SCHEMA_TEMPLATE ->
DEFAULT_REPLACE_SCHEMA_TEMPLATE and the local val updateSchemaTemplate ->
replaceSchemaTemplate in HiveQueryTemplates.fromConfig, update any references
where DEFAULT_UPDATE_SCHEMA_TEMPLATE or updateSchemaTemplate are used (including
the HiveQueryTemplates constructor call that currently passes
updateSchemaTemplate into replaceSchemaTemplate), and adjust
HiveHelperSparkCatalog.scala (the usage at line referenced in review) to use the
new DEFAULT_REPLACE_SCHEMA_TEMPLATE / replaceSchemaTemplate identifiers so all
symbols (REPLACE_SCHEMA_TEMPLATE_KEY, replaceSchemaTemplate,
DEFAULT_REPLACE_SCHEMA_TEMPLATE) consistently use "replace".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f9080a27-7fd3-4b9b-bf42-138db54a4aba
📒 Files selected for processing (19)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/JobBaseSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala
Unit Test Coverage
Files
|
…bles if Spark Catalog fails to do it.
Closes #736
Summary by CodeRabbit