Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Nov 22, 2025

This is especially related to header streams.

Closes #803

Summary by CodeRabbit

  • New Features

    • Added size and totalSize properties for file streaming operations.
  • Bug Fixes

    • Improved resource management with proper stream closure and cleanup.
    • File streams now open lazily on first read instead of during initialization.
  • Tests

    • Updated test cases to validate lazy file opening behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 22, 2025

Warning

Rate limit exceeded

@yruslan has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 16 minutes and 14 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 34d0965 and 5287283.

📒 Files selected for processing (6)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (2 hunks)

Walkthrough

The PR optimizes resource management across multiple reader and iterator classes by implementing lazy stream opening in FileStreamer, precomputing recordExtractor options for reuse, and adding proper try/finally blocks to ensure stream closure when processing files.

Changes

Cohort / File(s) Summary
FileStreamer lazy opening
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala
Introduces lazy stream opening via ensureOpened() mechanism; removes eager bufferedStream initialization and replaces getHadoopPath helper; adds wasOpened state flag; fileSize computed lazily; adds public accessors size and totalSize
VarLenNestedReader recordExtractor optimization
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala, spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala
Precomputes recordExtractorOpt once and reuses across iterator construction; closes headerStream if extractor option is empty; replaces inline recordExtractor calls with precomputed optional value
IndexBuilder stream closure safety
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala
Wraps record-extractor self-check in try/finally to ensure initial streams always close; guards second-record validation with offset check; ensures second-streams close after validation
BufferedFSDataInputStream annotation
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala
Adds @throws[IOException] annotation to close() method
FileStreamerSpec test updates
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala
Updates non-existent file test to expect FileNotFoundException on size access; adds test verifying no exception when FileStreamer created for non-existent file but not used

Sequence Diagram

sequenceDiagram
    participant Client
    participant FileStreamer
    participant BufferedFSDataInputStream
    participant FileSystem as Hadoop FileSystem

    Client->>FileStreamer: new FileStreamer(filePath)
    Note over FileStreamer: wasOpened = false<br/>bufferedStream = null<br/>fileSize = lazy val
    
    Client->>FileStreamer: next() or size
    FileStreamer->>FileStreamer: ensureOpened()
    alt Stream not yet opened
        FileStreamer->>FileSystem: getHadoopPath
        FileStreamer->>BufferedFSDataInputStream: create & open
        Note over FileStreamer: wasOpened = true
    end
    
    FileStreamer->>BufferedFSDataInputStream: read data
    BufferedFSDataInputStream-->>FileStreamer: data
    
    Client->>FileStreamer: close()
    FileStreamer->>BufferedFSDataInputStream: close()
    Note over BufferedFSDataInputStream: `@throws`[IOException]
    FileStreamer->>FileStreamer: wasOpened = true
Loading
sequenceDiagram
    participant IndexBuilder
    participant RecordExtractor
    participant DataStream
    participant HeaderStream

    IndexBuilder->>DataStream: open
    IndexBuilder->>HeaderStream: open
    
    IndexBuilder->>RecordExtractor: create(dataStream, headerStream)
    
    Note over IndexBuilder: try block
    RecordExtractor->>DataStream: read first record
    IndexBuilder->>IndexBuilder: compute offset
    
    alt Second record exists
        IndexBuilder->>DataStream: open (second)
        IndexBuilder->>HeaderStream: open (second)
        IndexBuilder->>RecordExtractor: create(dataStream2, headerStream2)
        RecordExtractor->>DataStream: read second record
        IndexBuilder->>IndexBuilder: validate match
        IndexBuilder->>DataStream: close (second)
        IndexBuilder->>HeaderStream: close (second)
    end
    
    Note over IndexBuilder: finally block
    IndexBuilder->>DataStream: close (initial)
    IndexBuilder->>HeaderStream: close (initial)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • FileStreamer lazy initialization: Review ensureOpened() logic, wasOpened state management, and lazy val fileSize computation for correctness and thread-safety implications
  • Stream lifecycle in IndexBuilder: Verify try/finally block properly closes all stream paths (initial and second-record flows) and handles exceptions correctly
  • recordExtractorOpt precomputation: Confirm reuse pattern works correctly in both cobol-parser and spark-cobol VarLenNestedReader implementations and that headerStream closure only occurs when appropriate
  • Test changes: Validate that FileNotFoundException expectation and lazy-initialization test properly reflect new FileStreamer behavior

Possibly related PRs

Poem

🐰 A stream opens late, just when it's called,
Resources protected when files are tall,
Try blocks ensure no handles are lost,
S3 connections flow smooth, without cost!
✨ Lazy and careful, we hop on with grace!

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective: fixing unclosed Hadoop file streams causing S3 connection pool timeouts, which aligns with the core changes across multiple files.
Linked Issues check ✅ Passed The PR addresses all coding requirements from issue #803: ensures header streams and data streams are closed in VarLenNestedReader, FileStreamer, and IndexBuilder to prevent connection pool exhaustion.
Out of Scope Changes check ✅ Passed All changes are directly aligned with fixing unclosed streams: lazy opening in FileStreamer, early computation and reuse of recordExtractorOpt in readers, try/finally stream closure in IndexBuilder, and test updates validating the fix.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Nov 22, 2025

JaCoCo code coverage report - 'cobol-parser'

Overall Project 91.52% 🍏
Files changed 100% 🍏

File Coverage
VarLenNestedReader.scala 70.15% 🍏

@github-actions
Copy link

github-actions bot commented Nov 22, 2025

JaCoCo code coverage report - 'spark-cobol'

File Coverage [96.43%] 🍏
FileStreamer.scala 98.79% 🍏
IndexBuilder.scala 97.06% 🍏
VarLenNestedReader.scala 95.45% 🍏
BufferedFSDataInputStream.scala 93.82% 🍏
Total Project Coverage 81.1% 🍏

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

112-118: Clarify the wasOpened flag assignment in close().

Setting wasOpened = true at line 114 inside close() appears unusual. If the stream is being closed, it must have been opened first (via ensureOpened()), so wasOpened should already be true.

The guard condition bufferedStream != null && !bufferedStream.isClosed suggests the stream was opened, making this assignment redundant. If the intent is to mark that we attempted to use the stream, this should be clarified or reconsidered.

Consider this alternative:

  override def close(): Unit = {
    if (bufferedStream != null && !bufferedStream.isClosed) {
-     wasOpened = true
      bufferedStream.close()
      bufferedStream = null
    }
  }

The wasOpened flag should already be true if we're executing this branch, as ensureOpened() is the only method that initializes bufferedStream and it sets the flag.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

175-196: Close headerStream in generateIndex when recordExtractor is empty, matching the pattern in getRecordIterator.

The generateIndex method calls recordExtractor(0L, dataStream, headerStream) inline (lines 179, 186) without checking if it returns None. When it does return None, the headerStream is never closed, unlike in getRecordIterator (lines 104-106) which explicitly closes it.

Since IndexGenerator.sparseIndexGenerator does not receive headerStream as a parameter, cleanup responsibility belongs to the caller. Apply the same pattern:

val recordExtractorOpt = recordExtractor(0L, dataStream, headerStream)
if (recordExtractorOpt.isEmpty) {
  headerStream.close()
}

recordExtractorOpt match {
  case Some(field) => IndexGenerator.sparseIndexGenerator(...)
  case None => IndexGenerator.sparseIndexGenerator(...)
}
🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

124-129: Lazy opening implementation is correct for single-threaded usage.

The ensureOpened() method correctly defers stream initialization until first use.

Note that this implementation is not thread-safe (no synchronization on the wasOpened check-then-act pattern). However, based on the class documentation stating it's "stateful" and "not reusable", concurrent access doesn't appear to be an intended use case.

If thread-safety becomes a concern in the future, consider adding explicit documentation:

  * @param filePath   String containing the fully qualified path to the file.
  * @param fileSystem Underlying Hadoop file system.
+ * @note This class is not thread-safe and should only be accessed from a single thread.
  * @throws IllegalArgumentException if the file is not found in the underlying file system.
  */
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ae45f3 and 3738dd2.

📒 Files selected for processing (3)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • size (54-54)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • close (112-118)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (6)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)

43-50: Excellent lazy initialization pattern to prevent connection pool exhaustion.

The deferred opening of the stream until first use directly addresses the PR objective of preventing S3 HTTP connection pool timeouts. The explanatory comments clearly document the rationale.

Note that fileSize being lazy means accessing the size property will now throw FileNotFoundException for non-existent files, which is a behavioral change properly reflected in the updated tests.


73-73: LGTM - correct placement of ensureOpened().

Calling ensureOpened() at the start of next() ensures the stream is opened exactly when needed, just before the first read operation.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (2)

45-50: Test correctly validates lazy opening behavior.

The updated test properly verifies that accessing size on a non-existent file now throws FileNotFoundException (instead of during construction). This aligns with the lazy evaluation of fileSize in FileStreamer.


52-56: Excellent test coverage for the lazy opening feature.

This new test case is crucial for validating that creating a FileStreamer instance for a non-existent file doesn't throw an exception as long as the stream is never actually used. This directly validates the PR's goal of preventing unnecessary stream opening and connection pool exhaustion.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

103-107: Stream lifecycle verification confirms safe cleanup pattern.

The verification shows that all RawRecordExtractor implementations close headerStream in their constructor—none retain references to it afterward. This means the code pattern is safe:

  • No extractor produced (isEmpty): headerStream closed by VarLenNestedReader (line 106)
  • Extractor produced (isDefined): headerStream closed by the extractor's constructor before any iteration

The dual-cleanup concern is unfounded because the stream is either handled by VarLenNestedReader or transferred to the extractor—never both. The implementation correctly addresses the PR objective of preventing stream leaks.


113-113: Verification confirms iterator signatures are correctly updated.

All constructor signatures match the Option type being passed:

  • VarLenHierarchicalIterator accepts rawRecordExtractor: Option[RawRecordExtractor] at line 47
  • VarLenNestedIterator accepts recordExtractor: Option[RawRecordExtractor] at line 47
  • VRLRecordReader accepts recordExtractor: Option[RawRecordExtractor] at line 44

Both iterators correctly pass the Option parameter downstream to VRLRecordReader. No inconsistencies found.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

44-51: Lazy-open and wasOpened semantics are sound, but next(0) can still open the stream unnecessarily

The lazy initialization via ensureOpened() and the wasOpened flag correctly ensure:

  • The underlying BufferedFSDataInputStream is opened only once.
  • After close(), the stream is not reopened (non-reusable instance behavior is preserved).
  • Unused FileStreamer instances (never calling size/totalSize/next) do not open the underlying file at all.

One small behavioral nit is that next() calls ensureOpened() unconditionally, even when numberOfBytes <= 0. In that case you:

  • Open the underlying stream (S3 connection) in ensureOpened().
  • Immediately return an empty array from the numberOfBytes <= 0 branch without closing it here.

This is unlikely on current call sites, but it slightly undermines the “never open if the stream is effectively unused” goal and can surprise callers that treat next(0) as a no-op.

A low-impact refinement would be to guard the open with the basic sanity checks, e.g.:

-  override def next(numberOfBytes: Int): Array[Byte] = {
-    ensureOpened()
-    val actualBytesToRead = if (maximumBytes > 0) {
+  override def next(numberOfBytes: Int): Array[Byte] = {
+    if (numberOfBytes <= 0) {
+      return new Array[Byte](0)
+    }
+
+    val actualBytesToRead = if (maximumBytes > 0) {
       Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt
     } else {
       numberOfBytes
     }
 
-    if (numberOfBytes <= 0) {
-      new Array[Byte](0)
-    } else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) {
+    if (actualBytesToRead <= 0) {
+      close()
+      return new Array[Byte](0)
+    }
+
+    ensureOpened()
+    if (bufferedStream == null || bufferedStream.isClosed) {
       close()
       new Array[Byte](0)
     } else {
       // existing read logic

This keeps the new lazy-open behavior and non-reusability guarantees while avoiding unnecessary opens on zero-byte requests.

Also applies to: 61-75, 114-130

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3738dd2 and 948c29a.

📒 Files selected for processing (1)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20

@yruslan yruslan force-pushed the bugfix/803-fix-s3-connection-pool-timeouts branch 2 times, most recently from e57a669 to 34d0965 Compare November 22, 2025 09:01
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

25-37: Scaladoc @throws no longer matches actual behavior

The Scaladoc still documents @throws IllegalArgumentException if the file is not found, but the implementation and tests now rely on Hadoop throwing a FileNotFoundException (e.g., via getContentSummary or when opening the stream) on first size/read access, not on construction.

Consider updating the Scaladoc to:

  • Reference FileNotFoundException instead of IllegalArgumentException, and
  • Clarify that the exception is raised when size/totalSize/next are first invoked, not when the instance is created.
🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

215-273: Self-check logic is safer, but secondary streams would benefit from try/finally

The revised self-check correctly:

  • Uses the first extractor to read the first and (optionally) second record,
  • Captures the offset of the second record,
  • Re-opens new streams at that offset and validates that the second extractor returns the same record,
  • Ensures the primary dataStream/headerStream are always closed via finally.

For the secondary streams (dataStream2, headerStream2), they are only closed at the end of extractorOpt2.foreach. If extractor2.hasNext or extractor2.next() throws unexpectedly, those streams may remain open. Consider wrapping their use in a try/finally to guarantee closure:

-              val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
-              val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)
-
-              extractorOpt2.foreach { extractor2 =>
-                if (!extractor2.hasNext) {
-                  // ...
-                }
-                // ...
-                dataStream2.close()
-                headerStream2.close()
-              }
+              val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
+              try {
+                val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)
+
+                extractorOpt2.foreach { extractor2 =>
+                  if (!extractor2.hasNext) {
+                    // ...
+                  }
+                  // ...
+                }
+              } finally {
+                dataStream2.close()
+                headerStream2.close()
+              }

This keeps the self-check logic intact while making resource handling fully robust, including in error paths.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 948c29a and e57a669.

📒 Files selected for processing (5)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
🧰 Additional context used
🧬 Code graph analysis (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (54-89)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • close (113-119)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (41-47)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (54-89)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (3)
  • offset (59-59)
  • next (73-111)
  • close (113-119)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorNoIndex.scala (3)
  • offset (36-36)
  • hasNext (38-38)
  • next (40-55)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (41-47)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • size (55-55)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala (1)
  • next (34-48)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
🔇 Additional comments (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (1)

52-88: Precomputing recordExtractorOpt and closing unused headerStream looks correct

Computing the extractor once and closing headerStream when no extractor is created is a good way to avoid leaking header streams while keeping the iterator construction logic unchanged. Passing the cached recordExtractorOpt into both iterator variants keeps behavior consistent and avoids redundant work.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)

19-55: Tests now align with lazy-open and FileNotFoundException behavior

Expecting FileNotFoundException on the first size access and allowing construction to succeed for an unused non-existent file matches the new lazy-opening semantics in FileStreamer. The added test also guards against regressions in that behavior.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

155-171: Index generation now reliably closes both data and header streams

Wrapping reader.generateIndex(...) in a try/finally that closes both inputStream and headerStream ensures that Hadoop/S3 streams are released even when index generation fails, which is important for avoiding connection pool exhaustion.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

44-52: Lazy opening and EOF handling in next/close look consistent with the stream contract

The combination of:

  • wasOpened + ensureOpened() for on-demand creation of BufferedFSDataInputStream,
  • lazy val fileSize for metadata-based sizing that doesn’t open the data stream,
  • next(...) honoring maximumBytes, returning:
    • empty array for non-positive requests or when no bytes remain / stream closed,
    • a shorter array and closing the stream when fewer than numberOfBytes are available,
  • close() preventing any subsequent reopen (by setting wasOpened = true),

provides predictable, non-reusable stream semantics and ensures underlying Hadoop/S3 resources are closed once the data (or the configured window via maximumBytes) has been fully consumed or close() is called explicitly.

Also applies to: 73-111, 113-130

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

234-235: Optional: Simplify by removing redundant mid-flow close.

Lines 234-235 close dataStream and headerStream, which are then closed again in the finally block (lines 271-272). While safe due to idempotent close operations, this pattern is confusing and unnecessary. The finally block already ensures these streams are closed in all execution paths.

Consider removing lines 234-235 and relying solely on the finally block for cleanup:

               record = extractor.next() // Saving the record to check later
-
-              dataStream.close()
-              headerStream.close()

               // Getting new streams and record extractor that points directly to the second record
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e57a669 and 34d0965.

📒 Files selected for processing (6)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala
🧰 Additional context used
🧬 Code graph analysis (5)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • size (56-56)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (54-89)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (3)
  • offset (60-60)
  • next (74-113)
  • close (115-122)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorNoIndex.scala (3)
  • offset (36-36)
  • hasNext (38-38)
  • next (40-55)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (43-50)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (43-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • close (115-122)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala (2)
  • IndexGenerator (30-171)
  • sparseIndexGenerator (32-141)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (43-50)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/common/Constants.scala (1)
  • Constants (20-44)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (54-89)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (43-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • close (115-122)
🔇 Additional comments (5)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

45-134: LGTM: Lazy opening correctly prevents resource leaks.

The lazy opening mechanism is well-implemented and directly addresses the PR objective:

  • Streams are only opened when next() is called via ensureOpened()
  • Unused streams never trigger expensive S3 file operations
  • The wasOpened flag correctly prevents reopening after close()
  • size and totalSize use a lazy fileSize without opening the stream

This ensures header streams and other auxiliary streams that may never be read are never opened, preventing the HTTP connection pool exhaustion described in issue #803.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)

45-56: LGTM: Tests correctly validate lazy opening behavior.

The test updates properly verify the lazy opening semantics:

  1. Lines 45-50: Accessing size on a non-existent file now throws FileNotFoundException (when the lazy fileSize is evaluated), correctly replacing the previous IllegalArgumentException expectation.

  2. Lines 52-56: The new test confirms that merely constructing a FileStreamer for a non-existent file doesn't throw an exception, validating that file access is deferred until actual use.

These tests ensure the lazy opening feature works as intended and prevent regression.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala (1)

56-88: LGTM: Precomputed extractor with early header stream closure.

The refactoring optimizes resource usage and prevents leaks:

  1. Line 57: Computing recordExtractorOpt once at the start avoids redundant evaluations and ensures consistency across both iterator construction paths.

  2. Lines 58-60: Closing headerStream immediately when the extractor is None is a key improvement—if the header stream won't be used by the iterator, it's released right away rather than held open unnecessarily.

  3. Lines 68, 80: Both iterator constructors receive the precomputed extractor, maintaining consistent behavior.

This change directly addresses the PR objective of preventing S3 connection pool exhaustion by ensuring header streams are closed when not needed.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

103-130: LGTM: Consistent extractor precomputation in getRecordIterator.

The changes mirror the pattern from the Spark module variant:

  • Lines 104-107: Precomputing recordExtractorOpt and immediately closing headerStream when empty prevents unnecessary resource retention.
  • Lines 113, 123: Both iterator types receive the precomputed extractor, ensuring consistency and efficiency.

This aligns with the broader PR goal of proper stream lifecycle management to prevent S3 connection pool exhaustion.


174-201: LGTM: Optimized extractor handling in generateIndex.

The refactoring improves efficiency and resource management in index generation:

  • Lines 174-177: Computing recordExtractorOpt once at the start (with startingRecordNumber = 0) and immediately closing headerStream when empty prevents resource leaks during index generation.

  • Lines 184, 195: Both IndexGenerator.sparseIndexGenerator call sites now receive the precomputed extractor, eliminating redundant extractor creation and ensuring consistent behavior across the segmented and non-segmented index generation paths.

This completes the consistent pattern of precomputed extractors with early header stream closure across both the cobol-parser and spark-cobol modules.

@yruslan yruslan force-pushed the bugfix/803-fix-s3-connection-pool-timeouts branch from 34d0965 to 5287283 Compare November 22, 2025 09:10
@yruslan yruslan merged commit 5b83b1a into master Nov 22, 2025
7 checks passed
@yruslan yruslan deleted the bugfix/803-fix-s3-connection-pool-timeouts branch November 22, 2025 09:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

S3 HTTP connection pool timeout when many files are processed multiple times

2 participants