Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Nov 24, 2025

Closes #805

Summary by CodeRabbit

  • New Features

    • Added an index caching option to cache and reuse generated indexes, improving performance on repeated reads.
  • Improvements

    • Validation to prevent conflicting index settings (error when caching is enabled but indexing is disabled).
    • Index creation now uses a caching-aware flow and clearer logging around indexing and data locality decisions.
  • Tests

    • Added tests covering index caching behavior and the new validation.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 24, 2025

Walkthrough

Added an in-memory index caching option for variable-length record processing. Introduced enable_index_cache parameter, propagated it through parameter classes and parser, implemented a file-path-keyed cache in IndexBuilder with caching-aware control flow, updated call sites and tests, and added validation preventing cache usage when indexes are disabled.

Changes

Cohort / File(s) Summary
Documentation
README.md
Documented new enable_index_cache option enabling in-memory caching of calculated indexes.
Parameter classes
cobol-parser/src/main/scala/.../ReaderParameters.scala, cobol-parser/src/main/scala/.../VariableLengthParameters.scala
Added isIndexCachingAllowed: Boolean field to ReaderParameters (default false) and to VariableLengthParameters constructor and KDoc.
Parameter parser & wiring
cobol-parser/src/main/scala/.../CobolParametersParser.scala
Added PARAM_ENABLE_INDEX_CACHE constant, populated isIndexCachingAllowed from params, propagated it into VariableLengthParameters and ReaderParameters, and added validation forbidding enable_index_cache=true when enable_indexes=false.
Index caching implementation
spark-cobol/src/main/scala/.../source/index/IndexBuilder.scala
Added global indexCache: ConcurrentHashMap[String, Array[SparseIndexEntry]], extended buildIndex and buildIndexForVarLenReader with cachingAllowed flag, split files into cached/non-cached, generate/store new indexes, retrieve cached indexes, combine results via new createIndexRDD helper, and added logging.
Index builder call sites
spark-cobol/src/main/scala/.../SparkCobolProcessor.scala, spark-cobol/src/main/scala/.../source/CobolRelation.scala
Updated calls to IndexBuilder.buildIndex to pass the new caching flag (sourced from reader properties).
Logging tweak
spark-cobol/src/main/scala/.../source/streaming/FileStreamer.scala
Changed end-of-stream log level from WARN to INFO and added an informational end-of-stream log.
Tests
spark-cobol/src/test/scala/.../index/IndexBuilderSpec.scala, spark-cobol/src/test/scala/.../integration/Test37RecordLengthMappingSpec.scala
Updated tests to pass cachingAllowed = false to new signatures; added integration tests verifying cache population and error when enable_index_cache is requested while enable_indexes is disabled.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Parser as CobolParametersParser
    participant Reader
    participant IndexBuilder
    participant Cache as indexCache

    User->>Parser: parse parameters (enable_index_cache=true)
    Parser->>Parser: validate flags (enable_indexes vs enable_index_cache)
    Parser->>Reader: set isIndexCachingAllowed=true
    Reader->>IndexBuilder: buildIndex(files, reader, sqlContext, cachingAllowed=true)

    alt File indexes in cache
        IndexBuilder->>Cache: lookup by file path
        Cache-->>IndexBuilder: return cached SparseIndexEntry[]
    else Not cached
        IndexBuilder->>IndexBuilder: generate indexes for nonCachedFiles
        IndexBuilder->>Cache: store sorted SparseIndexEntry[] per file path
    end

    IndexBuilder->>IndexBuilder: createIndexRDD(combine cached + new)
    IndexBuilder-->>Reader: return RDD[SparseIndexEntry]
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Review concurrency and keying of indexCache in IndexBuilder (ConcurrentHashMap usage).
  • Verify parser validation for enable_indexes=false && enable_index_cache=true covers all input paths and has clear error message.
  • Check tests for deterministic cache assertions and that added logging doesn't leak sensitive info.

Poem

🐰 I hopped through params and found an index heap,
Saved offsets in memory for searches quick and deep,
Files once scanned now fetched with a wink,
No re-scan delays — performance in a blink,
My whiskers twitch — cached indexes make me leap!

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: enabling index caching for VRL files to improve performance when processing the same files multiple times.
Linked Issues check ✅ Passed The code changes implement the core requirement from issue #805: caching calculated indexes for VRL files. The implementation includes a new enable_index_cache parameter, in-memory caching via ConcurrentHashMap, cache validation logic, and integration across parameter parsing, reader properties, and index building.
Out of Scope Changes check ✅ Passed All code changes are directly related to implementing index caching for VRL files. The one minor change (logging level in FileStreamer) is incidental and aligned with the feature scope.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/805-enable-index-cache

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

JaCoCo code coverage report - 'cobol-parser'

Overall Project 91.52% 🍏

There is no coverage information present for the Files changed

@github-actions
Copy link

github-actions bot commented Nov 24, 2025

JaCoCo code coverage report - 'spark-cobol'

File Coverage [94.99%] 🍏
FileStreamer.scala 98.87% 🍏
SparkCobolProcessor.scala 96.69% 🍏
IndexBuilder.scala 96.26% 🍏
CobolRelation.scala 82.8% 🍏
Total Project Coverage 81.37% 🍏

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

37-50: Index cache key and scope risk silently wrong indexes across readers/sessions

The global indexCache: ConcurrentHashMap[String, Array[SparseIndexEntry]] is keyed only by filePath. Index contents, however, depend on the VarLenReader configuration (copybook, RDW settings, fileStartOffset/fileEndOffset, record extractor, occurs mappings, etc.) and on the file contents. Reusing cached entries for the same path but a different reader configuration or after the file has changed will silently produce incorrect indexes, and the cache is shared JVM‑wide across Spark sessions.

Consider:

  • Including a reader/file signature in the cache key (e.g., case class IndexCacheKey(filePath: String, readerSignature: String, startOffset: Long, endOffset: Long, fileSize: Long)), where readerSignature is derived from the relevant ReaderParameters fields.
  • Optionally scoping the cache per Spark application (e.g., by including sparkContext.applicationId in the key) or providing an explicit clear/eviction mechanism so long‑lived drivers that touch many distinct files don’t accumulate unbounded index state.
🧹 Nitpick comments (7)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (1)

173-196: Good test coverage for index caching, but consider verifying cache hits

The test correctly verifies that cached and non-cached results are identical. However, it doesn't verify that caching actually occurred (e.g., that the second read used the cache).

Consider adding a verification mechanism to confirm the cache was used on the second read. This could be done by:

  • Checking execution time (second read should be faster)
  • Adding a cache hit counter that can be inspected
  • Logging cache hits and checking logs

This would provide stronger confidence that the caching mechanism is working as intended, not just producing correct results.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (3)

120-174: Caching currently bypassed for locality‑aware indexing and may surprise users

buildIndex passes cachingAllowed only into the non‑local variable‑length path:

case reader: VarLenReader if reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs) =>
  // locality path – cachingAllowed ignored
case reader: VarLenReader =>
  // non‑local path – cachingAllowed used
  buildIndexForVarLenReader(filesList, reader, sqlContext, cachingAllowed)

With defaults (improve_locality = true on HDFS), enable_index_cache will effectively be ignored and indexes will still be recomputed on every run for locality‑aware indexing.

If this is intentional, it’s worth documenting that caching only applies when locality is disabled or unsupported. Otherwise, consider extending buildIndexForVarLenReaderWithFullLocality to reuse cached SparseIndexEntrys (and only recompute block locations) so caching works for the common HDFS+locality case as well.


127-174: Minor improvements to caching path: typos and defensive guard

Two small nits and a safety tweak:

  • Comment typo on Line 127: "chached and teh""cached and the".
  • createIndexRDD(cachedIndexes ++ newIndexes, sqlContext) assumes there is at least one index; if that ever isn’t true, parallelize will be called with numPartitions = 0 and fail at runtime.

You can make this more robust and keep behavior unchanged for non‑empty inputs by guarding the partition count:

-  private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
-    val indexCount = indexes.length
-
-    val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
+  private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
+    val indexCount = indexes.length
+    val numPartitions = Math.max(1, Math.min(indexCount, Constants.maxNumPartitions))

381-388: Index RDD no longer cached in Spark; confirm this is acceptable

The old non‑local path used repartitionIndexes, which did indexRDD.repartition(...).cache(). The new helper returns a plain parallelized RDD without caching:

private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
  val indexCount = indexes.length
  val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
  sqlContext.sparkContext.parallelize(indexes, numPartitions)
}

If downstream code runs multiple actions on indexes within a single read, this may re‑scan or reshuffle more than before. If you still want Spark‑level reuse for a single scan, consider either:

  • Calling .cache() here, or
  • Reusing repartitionIndexes (possibly generalized) to keep the existing behavior.
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala (1)

67-107: Tests cover new signatures but not cache‑hit behavior

The updated tests correctly pass cachingAllowed = false, preserving existing behavior. However, none of the specs exercise the new caching path (e.g., repeated indexing of the same file with cachingAllowed = true).

Consider adding a focused test that:

  1. Calls buildIndexForVarLenReader twice on the same file with cachingAllowed = true.
  2. Asserts that the second call does not re‑run generateIndexEntry (via a spy/metric) and returns the same index entries, and
  3. Verifies that cached indexes are remapped to the correct fileId when FileWithOrder.order changes.

Also applies to: 171-191

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)

65-71: Derive indexCachingAllowed from reader properties instead of wiring it as a constructor arg

The new indexCachingAllowed: Boolean parameter is immediately passed through to IndexBuilder.buildIndex(...), while the same information is already conceptually part of the reader configuration (ReaderParameters.isIndexCachingAllowed).

To reduce duplication and the risk of call‑site mismatches, consider:

  • Dropping the explicit indexCachingAllowed constructor parameter, and
  • Inside CobolRelation, deriving it from the reader, e.g.:
private lazy val indexes: RDD[SparseIndexEntry] = {
  val cachingAllowed = cobolReader match {
    case v: VarLenReader => v.getReaderProperties.isIndexCachingAllowed
    case _               => false
  }
  IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cachingAllowed)(localityParams)
}

This keeps the public API leaner and tightly couples the caching decision to the actual reader config.

Also applies to: 77-78

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)

120-123: Align enable_index_cache semantics with enable_indexes

The new plumbing for enable_index_cache looks consistent and defaults are sensible (false). One edge case to watch:

isUsingIndex          = params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
isIndexCachingAllowed = params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,

This allows enable_index_cache = true while enable_indexes = false, yielding isIndexGenerationNeeded = false but isIndexCachingAllowed = true in ReaderParameters. Unless all downstream call sites explicitly gate caching on isIndexGenerationNeeded, this can lead to unnecessary index building and cache population for readers that won’t actually use indexes.

Two possible tightenings:

  • Define caching as a derived flag:
val useIndexes  = params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean
val useIndexCache = useIndexes && params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean
...
isUsingIndex          = useIndexes,
isIndexCachingAllowed = useIndexCache,
  • Optionally, fail fast or at least log a warning when enable_index_cache = true but enable_indexes = false, since caching has no effect in that configuration.

Also applies to: 385-386, 421-422, 508-509

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b83b1a and 9e6ff80.

📒 Files selected for processing (12)
  • README.md (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (4 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (3 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala (5 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)
  • sqlContext (126-126)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
  • buildIndex (51-69)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (1)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (1)
  • withTempBinFile (60-69)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
  • buildIndex (51-69)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/LocalityParameters.scala (3)
  • LocalityParameters (21-21)
  • LocalityParameters (23-30)
  • extract (24-29)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
  • buildIndex (51-69)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/internal/Logging.scala (1)
  • logger (33-38)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • getReaderProperties (367-456)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala (1)
  • Constants (20-81)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • getOrElse (78-81)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (9)
README.md (1)

1605-1605: LGTM! Clear documentation for the new caching feature.

The documentation clearly explains when to use this option and what it does.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)

86-86: Logging level change: WARNING → INFO

The logging level for end-of-stream messages has been changed from WARNING to INFO. This is appropriate since reaching the end of a stream is normal behavior, not a warning condition. However, this change appears unrelated to the index caching feature.

Was this logging change intentional, or should it be in a separate commit?


101-101: Consistent logging level change

Similar to line 86, this end-of-stream logging is now at INFO level, which is more appropriate for normal operational messages.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (1)

67-68: Test updated correctly for new parameter

The indexCachingAllowed = false parameter is added consistently across all test cases, maintaining existing behavior while supporting the new API.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)

102-102: LGTM! Backward-compatible API extension

The new isIndexCachingAllowed parameter is added with a safe default of false, ensuring backward compatibility. The caching feature is opt-in, which is appropriate for a performance optimization that changes runtime behavior.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala (1)

37-37: LGTM! Parameter properly documented and positioned

The new isIndexCachingAllowed parameter is well-documented and logically placed after the related isUsingIndex parameter. Note that unlike ReaderParameters, this parameter doesn't have a default value, requiring callers to explicitly provide it.

Also applies to: 60-60

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (2)

61-64: LGTM! Proper parameter extraction with safe defaults

The indexCachingAllowed flag is correctly extracted from variable-length parameters with an appropriate default of false when variable-length parameters are not present.


69-70: Constructor call updated correctly

The indexCachingAllowed parameter is properly threaded through to the CobolRelation constructor.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)

221-221: LGTM! IndexBuilder call updated for caching support

The buildIndex call is correctly updated to pass readerParameters.isIndexCachingAllowed, enabling the index caching mechanism when configured.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e6ff80 and 6fb06ee.

📒 Files selected for processing (4)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (5 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala
🧰 Additional context used
🧬 Code graph analysis (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (2)
  • getOrElse (78-81)
  • contains (66-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2)

166-170: LGTM - Cached index retrieval correctly updates fileId.

The logic correctly retrieves cached indexes and updates the fileId to match the current file ordering. The .map(ind => ind.copy(fileId = f.order)) ensures each cached index entry is updated without modifying the original cached array.

Note: For files with very large numbers of index entries, this mapping operation could have performance implications. Consider monitoring if this becomes a bottleneck.


381-388: LGTM - Clean helper method for RDD creation.

The new createIndexRDD helper method is well-structured and correctly handles edge cases (empty index arrays) by ensuring at least 1 partition with Math.max(1, ...).

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (2)

122-122: LGTM - Parameter constant properly defined.

The new PARAM_ENABLE_INDEX_CACHE constant is well-placed in the "Indexed multisegment file processing" section and follows the existing naming conventions.


385-385: LGTM - Parameter correctly propagated through configuration layers.

The isIndexCachingAllowed parameter is properly threaded through the configuration hierarchy:

  • Parsed from parameters with a safe default of false (line 508)
  • Propagated from VariableLengthParameters to ReaderParameters (line 421)
  • Fallback case correctly defaults to false (line 385)

Also applies to: 421-421, 508-508

* In a nutshell, ideally, there will be as many partitions as are there are indexes.
*/
private[cobol] object IndexBuilder extends Logging {
private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add cache eviction policy and file modification tracking.

The global indexCache has no eviction policy and will grow unboundedly as new files are processed, potentially causing memory exhaustion in long-running Spark applications. Additionally, the cache is keyed solely by file path without tracking file modification times, so if a file is modified, stale cached indexes will be returned.

Consider these improvements:

  1. Add a bounded cache with LRU eviction (e.g., using Guava's CacheBuilder or Caffeine)
  2. Include file modification timestamp or checksum in the cache key
  3. Add a configurable maximum cache size or TTL
  4. Consider adding cache statistics logging

Example with size-bounded cache:

+import com.google.common.cache.{Cache, CacheBuilder}
+import java.util.concurrent.TimeUnit
+
-private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
+private val indexCache: Cache[String, Array[SparseIndexEntry]] = CacheBuilder.newBuilder()
+  .maximumSize(1000) // configurable
+  .expireAfterAccess(1, TimeUnit.HOURS) // configurable
+  .build()

Then update get/put operations accordingly.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala
around line 49, the global indexCache is an unbounded ConcurrentHashMap which
can grow indefinitely and does not account for file modifications; replace it
with a bounded LRU/TTL cache (e.g., Guava CacheBuilder or Caffeine) configured
via a new configurable max size and optional TTL, and change cache keys to
include file modification timestamp (or checksum) so modified files miss the
cache; update all get/put usages to the new cache API, add configuration hooks
for size/TTL, and optionally log basic cache statistics (hits/evictions) for
monitoring.

Comment on lines +127 to 174
// Splitting between files for which indexes are cached and the list of files for which indexes are not cached
val cachedFiles = if (cachingAllowed) {
filesList.filter(f => indexCache.containsKey(f.filePath))
} else {
Array.empty[FileWithOrder]
}

val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val nonCachedFiles = filesList.diff(cachedFiles)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).cache()
// Getting indexes for files for which indexes are not in the cache
val newIndexes = if (nonCachedFiles.length > 0) {
if (reader.getReaderProperties.enableSelfChecks) {
selfCheckForIndexCompatibility(reader, nonCachedFiles.head.filePath, conf)
}

repartitionIndexes(indexRDD)
val filesRDD = sqlContext.sparkContext.parallelize(nonCachedFiles, nonCachedFiles.length)
filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).collect()
} else {
Array.empty[SparseIndexEntry]
}

// Storing new indexes in the cache
if (cachingAllowed && newIndexes.length > 0) {
newIndexes.groupBy(_.fileId).foreach { case (fileId, indexEntries) =>
val filePathOpt = filesList.find(_.order == fileId).map(_.filePath)

filePathOpt.foreach { filePath =>
logger.info(s"Index stored to cache for file: $filePath.")
indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom))
}
}
}

// Getting indexes for files for which indexes are in the cache
val cachedIndexes = cachedFiles.flatMap { f =>
logger.info("Index fetched from cache for file: " + f.filePath)
indexCache.get(f.filePath)
.map(ind => ind.copy(fileId = f.order))
}

// Creating the final RDD with all indexes
createIndexRDD(cachedIndexes ++ newIndexes, sqlContext)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Verify cache key includes file modification tracking.

The caching logic filters files using only containsKey(f.filePath) without considering whether the file has been modified since the index was cached. This means if a VRL file is modified (records added/removed/changed), the stale cached index will be used, leading to incorrect data processing.

Run the following script to check if file modification time is tracked elsewhere in the codebase:

If modification time is not tracked, the cache key should be changed from just the file path to include modification metadata, e.g., s"${filePath}_${modificationTime}".


🏁 Script executed:

#!/bin/bash
# Search for file modification time tracking in the codebase
rg -n "getModificationTime|lastModified|FileStatus" --type scala -C 3

Length of output: 8054


Now I'll examine the IndexBuilder.scala file to understand the cache structure and identify whether modification time is tracked:


🏁 Script executed:

#!/bin/bash
# Examine the IndexBuilder.scala file around the cache logic
cat spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala | head -180 | tail -80

Length of output: 3010



🏁 Script executed:

#!/bin/bash
# Find FileWithOrder class definition
fd -t f "FileWithOrder" spark-cobol/src/main/scala/

Length of output: 89



🏁 Script executed:

#!/bin/bash
# Examine FileWithOrder class definition
cat spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala

Length of output: 778



🏁 Script executed:

#!/bin/bash
# Find indexCache declaration and initialization
rg -n "indexCache" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala -B 5 -A 2 | head -50

Length of output: 1494



🏁 Script executed:

#!/bin/bash
# Search for any file validation or modification checks in the index building process
rg -n "getModificationTime|lastModified|getLen|FileStatus" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/ -C 3

Length of output: 867


Add file modification time tracking to cache key to prevent stale index usage.

The cache uses only filePath as the key and does not consider whether files have been modified. If a VRL file is modified (records added/removed/changed) between index generations, the stale cached index will be returned without any validation, leading to incorrect data processing.

Suggested fix:
Include file modification time in the cache key:

// Line 129 - change from:
filesList.filter(f => indexCache.containsKey(f.filePath))
// to:
filesList.filter(f => {
  val fileStatus = fileSystem.getFileStatus(new Path(f.filePath))
  val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
  indexCache.containsKey(cacheKey)
})

// Line 160 - update cache storage to use same key format:
val fileStatus = fileSystem.getFileStatus(new Path(filePath))
val cacheKey = s"${filePath}_${fileStatus.getModificationTime}"
indexCache.put(cacheKey, indexEntries.sortBy(_.offsetFrom))

// Line 168 - update cache retrieval to use same key format:
val fileStatus = fileSystem.getFileStatus(new Path(f.filePath))
val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
indexCache.get(cacheKey)

Alternatively, store the modification time alongside cached data and validate it on retrieval.

🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala
around lines 127-174, the cache currently keys only by filePath which allows
stale indexes to be used after file modifications; update the cache logic to
include file modification time in the cache key (or store and validate
modification time alongside the cached entry). Specifically: when selecting
cachedFiles (line ~129) call the FileSystem to get the file status and build a
cacheKey using filePath + modificationTime and check
indexCache.containsKey(cacheKey); when storing newIndexes (around lines
~160-168) get the file status for the filePath, build the same cacheKey and put
the sorted indexEntries under that cacheKey; when fetching cachedIndexes (around
lines ~168+) compute the cacheKey from f.filePath and its modificationTime and
read indexCache.get(cacheKey) (or alternatively store modification time with
entries and validate it on retrieval). Use
org.apache.hadoop.fs.FileSystem.getFileStatus(new Path(...)).getModificationTime
for the timestamp.

@yruslan yruslan force-pushed the feature/805-enable-index-cache branch from 6fb06ee to 5e5dd76 Compare November 24, 2025 09:30
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

37-49: Cache implementation needs eviction policy and file modification tracking.

The previous review comments on this file identified two major issues with the cache implementation that remain unaddressed:

  1. Unbounded cache growth: The ConcurrentHashMap has no size limit or eviction policy, which can exhaust memory in long-running Spark applications as more unique files are processed.

  2. No file modification tracking: Cache keys are based solely on file path without checking modification times. If a VRL file is modified between reads, stale cached indexes will be returned, leading to incorrect data processing.

Please refer to the detailed suggestions in the previous review comments (lines 49 and 127-174 from past reviews) which recommend:

  • Using a bounded cache with LRU eviction (e.g., Guava's CacheBuilder or Caffeine)
  • Including file modification timestamp in the cache key
  • Adding configurable cache size limits and TTL
  • Adding cache statistics logging

Based on learnings, past review feedback should be addressed before merging.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6fb06ee and 5e5dd76.

📒 Files selected for processing (4)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (5 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
🧰 Additional context used
🧬 Code graph analysis (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
  • buildIndex (51-69)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • getReaderProperties (367-456)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (5)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (1)

21-21: LGTM!

The import correctly brings in the parameter constants needed for the error message assertion in the new validation test.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)

68-69: LGTM!

Formatting adjustment improves readability by separating the implicit SQLContext parameter.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (3)

51-69: LGTM!

The signature change correctly adds the cachingAllowed parameter and appropriately propagates it to buildIndexForVarLenReader. The routing logic correctly applies caching only to the variable-length reader path where index caching is implemented.


127-173: Caching logic flow is correct, subject to addressing past review concerns.

The implementation correctly:

  • Partitions files into cached and non-cached sets (lines 128-134)
  • Generates indexes only for non-cached files (lines 137-151)
  • Stores newly generated indexes grouped by fileId and keyed by filePath (lines 154-162)
  • Retrieves cached indexes and updates fileId to match current file order (lines 166-170)
  • Combines both sets into the final RDD (line 173)

The fileId update (line 169) is particularly important because cached entries retain the fileId from their original indexing, which may differ from the current file order.

However, the major issues flagged in previous review comments (unbounded cache growth and missing file modification tracking) still need to be addressed before this implementation is production-ready.

Based on learnings, past review feedback on cache architecture should be resolved.


381-388: LGTM!

The helper method correctly creates an RDD from an array of indexes with appropriate partition sizing (capped at maxNumPartitions with a minimum of 1) and helpful logging.

Comment on lines 174 to 202
"work for data with offsets and indexes and index cache" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("file_start_offset", 1)
.option("file_end_offset", 2)
.option("input_split_records", "2")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)

val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
val actualCached = df.orderBy("SEG_ID").toJSON.collect().mkString(",")

assert(actualInitial == expected)
assert(actualCached == expected)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Test may not properly verify index caching behavior.

The test reads from the same DataFrame instance twice (lines 191-192). In Spark, since DataFrames are lazy and the execution plan is determined at planning time (when the DataFrame is created), both collect() calls likely use the same plan without re-triggering index generation. This means the test may not actually verify that cached indexes are retrieved and used on subsequent reads.

To properly test caching, create a new DataFrame for the second read:

     val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
-    val actualCached = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
+    
+    // Create a new DataFrame to trigger index retrieval from cache
+    val df2 = spark.read
+      .format("cobol")
+      .option("copybook_contents", copybook)
+      .option("record_format", "F")
+      .option("record_length_field", "SEG-ID")
+      .option("file_start_offset", 1)
+      .option("file_end_offset", 2)
+      .option("input_split_records", "2")
+      .option("enable_index_cache", "true")
+      .option("pedantic", "true")
+      .option("record_length_map", """{"A":4,"B":7,"C":8}""")
+      .load(tempFile)
+    
+    val actualCached = df2.orderBy("SEG_ID").toJSON.collect().mkString(",")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"work for data with offsets and indexes and index cache" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("file_start_offset", 1)
.option("file_end_offset", 2)
.option("input_split_records", "2")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
val actualCached = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
assert(actualInitial == expected)
assert(actualCached == expected)
}
}
"work for data with offsets and indexes and index cache" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("file_start_offset", 1)
.option("file_end_offset", 2)
.option("input_split_records", "2")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
// Create a new DataFrame to trigger index retrieval from cache
val df2 = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("file_start_offset", 1)
.option("file_end_offset", 2)
.option("input_split_records", "2")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
val actualCached = df2.orderBy("SEG_ID").toJSON.collect().mkString(",")
assert(actualInitial == expected)
assert(actualCached == expected)
}
}
🤖 Prompt for AI Agents
In
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala
around lines 174 to 197, the test uses the same DataFrame instance for both
reads so both collect() calls use the same planned execution and do not verify
index caching behavior; change the test to create a new DataFrame (re-run
spark.read with the same options and tempFile) for the second read (or otherwise
materialize and unpersist then re-create the DataFrame) so the second collect()
runs a fresh read that will exercise retrieving from the index cache and allow
valid assertions.

Comment on lines 199 to 221
"throw an exception when index caching is requested while indexes are turned off" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

val ex = intercept[IllegalArgumentException] {
spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("enable_indexes", "false")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
}

assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused variable.

Line 201 declares expected but it's never used in the test. The test only validates the exception message, making this variable unnecessary.

Apply this diff:

   "throw an exception when index caching is requested while indexes are turned off" in {
     withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
-      val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
-
       val ex = intercept[IllegalArgumentException] {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"throw an exception when index caching is requested while indexes are turned off" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
val ex = intercept[IllegalArgumentException] {
spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("enable_indexes", "false")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
}
assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
}
}
"throw an exception when index caching is requested while indexes are turned off" in {
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
val ex = intercept[IllegalArgumentException] {
spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("enable_indexes", "false")
.option("enable_index_cache", "true")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)
}
assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
}
}
🤖 Prompt for AI Agents
In
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala
around lines 199 to 218, there is an unused local val expected declared (line
~201) that is never referenced; remove that declaration to clean up the test and
avoid dead code, keeping the rest of the test unchanged so it still only
intercepts and asserts the IllegalArgumentException message.

@yruslan yruslan force-pushed the feature/805-enable-index-cache branch from 5e5dd76 to e4fec96 Compare November 24, 2025 10:03
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)

76-76: LGTM - Critical issue from previous review has been resolved.

The call now correctly passes isIndexCachingAllowed to control caching behavior, rather than the previously incorrect isIndexGenerationNeeded. This ensures the enable_index_cache parameter works as intended.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)

957-959: LGTM - Error message corrected from previous review.

The validation correctly prevents enabling index caching when indexes are disabled, and the error message now properly references both PARAM_ENABLE_INDEXES and PARAM_ENABLE_INDEX_CACHE.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2)

49-49: Cache lacks eviction policy and will grow unboundedly.

The global indexCache has no size limit or TTL, which can cause memory exhaustion in long-running Spark applications as more files are processed. Consider using a bounded cache with LRU eviction (e.g., Guava's CacheBuilder or Caffeine) or adding a configurable maximum cache size.


127-174: Cache keys lack file modification tracking, risking stale index usage.

The cache uses only filePath as the key (lines 129, 160, 168) without considering file modification time. If a VRL file is modified between runs, the cached index will be returned without validation, potentially causing incorrect data processing.

Include file modification time in the cache key or validate modification time on cache retrieval:

// When filtering cached files (around line 129):
val fs = new Path(filesList.head.filePath).getFileSystem(conf)
val cachedFiles = if (cachingAllowed) {
  filesList.filter { f =>
    val fileStatus = fs.getFileStatus(new Path(f.filePath))
    val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
    indexCache.containsKey(cacheKey)
  }
} else {
  Array.empty[FileWithOrder]
}

// When storing (around line 160):
val fileStatus = fs.getFileStatus(new Path(filePath))
val cacheKey = s"${filePath}_${fileStatus.getModificationTime}"
indexCache.put(cacheKey, indexEntries.sortBy(_.offsetFrom))

// When retrieving (around line 168):
val fileStatus = fs.getFileStatus(new Path(f.filePath))
val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
indexCache.get(cacheKey).map(ind => ind.copy(fileId = f.order))
🧹 Nitpick comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)

835-837: Validation added appears unrelated to PR objectives.

This validation ensures record_length_map requires record_length_field, which is reasonable but doesn't appear to be part of the index caching feature described in the PR objectives. Was this intentionally included?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e5dd76 and e4fec96.

📒 Files selected for processing (4)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (6 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala
🧰 Additional context used
🧬 Code graph analysis (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
  • buildIndex (51-69)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • getReaderProperties (367-456)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala (1)
  • Constants (20-81)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (2)
  • getOrElse (78-81)
  • contains (66-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (5)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

381-388: LGTM - Clean helper method for index RDD creation.

The createIndexRDD helper centralizes index RDD assembly with proper partitioning logic, making the code more maintainable.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (4)

122-122: LGTM - New parameter constant added.

The PARAM_ENABLE_INDEX_CACHE constant is properly defined and follows the existing naming conventions.


385-385: LGTM - Correct default for fallback case.

Setting isIndexCachingAllowed = false in the fallback VariableLengthParameters is the safe default.


421-421: LGTM - Parameter propagation is correct.

The isIndexCachingAllowed field is properly propagated from varLenParams to ReaderParameters.


508-508: LGTM - Parameter parsing implemented correctly.

The enable_index_cache option is parsed with a safe default of false, ensuring caching is opt-in.

@yruslan yruslan merged commit cd292b4 into master Nov 24, 2025
7 checks passed
@yruslan yruslan deleted the feature/805-enable-index-cache branch November 24, 2025 10:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cache calculated indexes for VRL files to increase performance of processing same files multiple times

2 participants