#797 Add support for variable-sized OCCURS when writing EBCDIC files#830
#797 Add support for variable-sized OCCURS when writing EBCDIC files#830
Conversation
…frame format with OCCURS DEPENDING ON clause.
WalkthroughRefactors writer AST and RDD processing to support OCCURS and OCCURS DEPENDING ON by tracking dependee fields through a dependeeMap; updates writer parameter validation and validation entrypoints for writing; adjusts copybook AST string rendering to use raw names; adds tests and minor CI/build tweaks. Changes
Sequence DiagramsequenceDiagram
participant User
participant Writer
participant Combiner
participant Builder
participant DepMap
participant Processor
User->>Writer: combine(rdd, copybook, schema)
Writer->>Combiner: constructWriterAst(copybook, schema)
Combiner->>DepMap: create empty dependeeMap
Combiner->>Builder: buildGroupField(..., dependeeMap)
Builder->>Builder: buildPrimitiveNode(..., dependeeMap)
alt field has DEPENDING ON
Builder->>DepMap: register DependingOnField(name -> spec)
Builder-->>Combiner: emit PrimitiveDependeeField / arrays with dependingOn
end
Combiner-->>Writer: return root WriterAst
Writer->>Processor: processRDD(..., variableSizeOccurs)
Processor->>Processor: iterate rows, call writeToBytes(ast,..., variableLengthOccurs)
rect rgba(100,200,100,0.5)
Processor->>DepMap: consult dependeeMap for dependee values
Processor->>Processor: compute dynamic OCCURS sizes, write arrays accordingly
end
Processor-->>Writer: RDD[Array[Byte]]
Writer-->>User: output RDD
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
JaCoCo code coverage report - 'cobol-parser'
|
JaCoCo code coverage report - 'spark-cobol'
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala (1)
119-145:⚠️ Potential issue | 🟠 MajorReject
variable_size_occursfor fixed-length writes.Now that
variable_size_occursis allowed for writing,NestedRecordCombiner.processRDD()can emit shorter byte arrays forrecord_format = F. Without RDW or any other framing, that turns the output into an undecodable flat file as soon as one record shrinks. Please fail this combination here, or keep fixed-length writes untrimmed.⚙️ Proposed validation
if (readerParameters.recordFormat != RecordFormat.FixedLength && readerParameters.recordFormat != RecordFormat.VariableLength) { issues += s"Only '${RecordFormat.FixedLength}' and '${RecordFormat.VariableLength}' values for 'record_format' are supported for writing, " + s"provided value: '${readerParameters.recordFormat}'" } + + if (readerParameters.variableSizeOccurs && + readerParameters.recordFormat != RecordFormat.VariableLength) { + issues += "'variable_size_occurs' is supported for writing only when 'record_format' is 'V'" + } if (readerParameters.occursMappings.nonEmpty) { issues += "OCCURS mapping option ('occurs_mappings') is not supported for writing" }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala` around lines 119 - 145, The validator must reject the combination of fixed-length output and variable-size OCCURS; update validateParametersForWriting (in CobolParametersValidator) to add a check that if readerParameters.recordFormat == RecordFormat.FixedLength and the variable-size-occurs option is enabled (readerParameters.variableSizeOccurs / variable_size_occurs flag on ReaderParameters), append an error like "variable_size_occurs is not supported for fixed-length writes" to issues so the IllegalArgumentException is thrown; this ensures fixed-length writes remain untrimmed and prevents emitting shorter byte arrays.
🧹 Nitpick comments (1)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala (1)
134-160: Add aVBrejection regression here too.This negative-path test currently proves
FBis rejected, but notVB. Since issue#797explicitly mentions VB, a dedicatedrecord_format = "VB"assertion would make the intended writer contract much clearer and protect it from regressions.Based on learnings, variable-block format writing will not be implemented in the writer; only VariableLength uses RDW headers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala` around lines 134 - 160, The test in VariableLengthEbcdicWriterSuite ("throw an exception on unexpected output record format") only asserts that "FB" is rejected but misses asserting that "VB" is rejected; add a second negative-path check (or extend the existing intercept) that writes with .option("record_format", "VB") using the same df/path setup and asserts the thrown IllegalArgumentException message includes the same "Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'VB'" text so VB regressions are caught (refer to the test block variables df, path and the intercept/exception handling used in the current test).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 418-426: The null-array branches in NestedRecordCombiner are
leaving DEPENDING ON fields as raw zeros in fixed-length mode because
Copybook.setPrimitiveField is only invoked when variableLengthOccurs is true;
update both null branches (around the blocks that now return 0 and
cobolField.binaryProperties.actualSize, also the similar case at lines 459-466)
to always call Copybook.setPrimitiveField(spec.cobolField, ar, 0,
fieldStartOffsetOverride = spec.baseOffset) for each spec in dependingOn before
returning, so the dependee PrimitiveDependeeField gets an encoded COBOL zero
even when the array is null and variableLengthOccurs is false.
---
Outside diff comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`:
- Around line 119-145: The validator must reject the combination of fixed-length
output and variable-size OCCURS; update validateParametersForWriting (in
CobolParametersValidator) to add a check that if readerParameters.recordFormat
== RecordFormat.FixedLength and the variable-size-occurs option is enabled
(readerParameters.variableSizeOccurs / variable_size_occurs flag on
ReaderParameters), append an error like "variable_size_occurs is not supported
for fixed-length writes" to issues so the IllegalArgumentException is thrown;
this ensures fixed-length writes remain untrimmed and prevents emitting shorter
byte arrays.
---
Nitpick comments:
In
`@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala`:
- Around line 134-160: The test in VariableLengthEbcdicWriterSuite ("throw an
exception on unexpected output record format") only asserts that "FB" is
rejected but misses asserting that "VB" is rejected; add a second negative-path
check (or extend the existing intercept) that writes with
.option("record_format", "VB") using the same df/path setup and asserts the
thrown IllegalArgumentException message includes the same "Only 'F' and 'V'
values for 'record_format' are supported for writing, provided value: 'VB'" text
so VB regressions are caught (refer to the test block variables df, path and the
intercept/exception handling used in the current test).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b88d503c-8012-4362-bbf1-6c0ad43aa41d
📒 Files selected for processing (8)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/DependingOnField.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala (1)
370-373: Mutable state in shared AST node is safe but fragile.
DependingOnField.baseOffsetis mutated at line 372 for every row within the same partition. This works correctly because:
- Rows are processed sequentially within
mapPartitions- Each row gets a fresh byte array
baseOffsetis always updated before being read in the subsequentdependingOn.foreachcallsHowever, this design is fragile—if the processing model ever changes to concurrent row handling, this will silently corrupt data. Consider adding a brief comment explaining why this mutation is safe.
📝 Suggested documentation
// ── Primitive which has an OCCURS DEPENDS ON ───────────────────────────── case PrimitiveDependeeField(spec) => + // NOTE: baseOffset is mutated here for each row. This is safe because rows + // are processed sequentially within mapPartitions, and the offset is always + // updated before being read in subsequent array-element writes. spec.baseOffset = currentOffset spec.cobolField.binaryProperties.actualSize🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala` around lines 370 - 373, The mutation of DependingOnField.baseOffset inside the PrimitiveDependeeField match is fragile; add a concise comment near the assignment (spec.baseOffset = currentOffset) explaining that this is safe because rows are processed sequentially within mapPartitions, each row uses a fresh byte array, and baseOffset is always updated before any dependingOn.foreach reads it, and note that concurrent processing would break this invariant; reference PrimitiveDependeeField, DependingOnField.baseOffset, mapPartitions, and dependingOn.foreach in the comment so future maintainers understand the assumption.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Line 281: Fix the comment typo inside NestedRecordCombiner.scala near the
comment above the dependee handling: change "defines" to "defined" in the
comment "// Dependee fields need not to be defines in Spark schema." so it reads
"// Dependee fields need not to be defined in Spark schema." to correct the
spelling in the NestedRecordCombiner class.
---
Nitpick comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 370-373: The mutation of DependingOnField.baseOffset inside the
PrimitiveDependeeField match is fragile; add a concise comment near the
assignment (spec.baseOffset = currentOffset) explaining that this is safe
because rows are processed sequentially within mapPartitions, each row uses a
fresh byte array, and baseOffset is always updated before any
dependingOn.foreach reads it, and note that concurrent processing would break
this invariant; reference PrimitiveDependeeField, DependingOnField.baseOffset,
mapPartitions, and dependingOn.foreach in the comment so future maintainers
understand the assumption.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7883dcea-55d1-4916-8681-7c58385e6b5c
📒 Files selected for processing (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidatorSuite.scala
| }.getOrElse { | ||
| log.error(s"Field '$path${p.name}' is not found in Spark schema. Will be replaced by filler.") | ||
| Filler(p.binaryProperties.actualSize) | ||
| // Dependee fields need not to be defines in Spark schema. |
There was a problem hiding this comment.
Minor typo in comment.
"defines" should be "defined".
✏️ Fix typo
- // Dependee fields need not to be defines in Spark schema.
+ // Dependee fields need not be defined in Spark schema.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Dependee fields need not to be defines in Spark schema. | |
| // Dependee fields need not be defined in Spark schema. |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`
at line 281, Fix the comment typo inside NestedRecordCombiner.scala near the
comment above the dependee handling: change "defines" to "defined" in the
comment "// Dependee fields need not to be defines in Spark schema." so it reads
"// Dependee fields need not to be defined in Spark schema." to correct the
spelling in the NestedRecordCombiner class.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala (1)
467-476: Reduce coupling to AST layout and exact exception wording.This test depends on
children(5)and the full message with parser line numbers, so harmless AST or wording refactors will break it even if duplicate-dependee validation still works. Prefer selecting the secondCNT1by name and asserting only the stable part of the error.♻️ Suggested cleanup
- val ast = parsedCopybook.ast - val children = ast.children.head.asInstanceOf[Group].children - val cnt2 = children(5).asInstanceOf[Primitive].withUpdatedIsDependee(true) - children(5) = cnt2 + val record = parsedCopybook.ast.children.head.asInstanceOf[Group] + val cnt1Indexes = record.children.zipWithIndex.collect { + case (p: Primitive, idx) if p.name.equalsIgnoreCase("CNT1") => idx + } + assert(cnt1Indexes.size == 2) + val secondCnt1Index = cnt1Indexes(1) + record.children(secondCnt1Index) = + record.children(secondCnt1Index).asInstanceOf[Primitive].withUpdatedIsDependee(true) val ex = intercept[IllegalArgumentException] { NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema) } - assert(ex.getMessage == "Duplicate field name 'CNT1' found in copybook. Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used. Already found a dependee field with the same name at line 4, current field line number: 10.") + assert(ex.getMessage.contains("Duplicate field name 'CNT1' found in copybook.")) + assert(ex.getMessage.contains("Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used."))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala` around lines 467 - 476, The test in NestedWriterSuite is brittle because it relies on AST index children(5) and exact exception text; update the test so it finds the second CNT1 node by name (e.g., walk parsedCopybook.ast to locate the Primitive node whose name equals "CNT1" and then call withUpdatedIsDependee on that node) instead of using children(5), and change the assertion on the thrown IllegalArgumentException from exact string equality to a contains check for the stable fragment (for example verify ex.getMessage contains "Duplicate field name 'CNT1'") while keeping the call to NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`:
- Around line 153-156: The validator currently only logs a warning when
readerParameters.variableSizeOccurs is true with readerParameters.recordFormat
== RecordFormat.FixedLength; change this to reject the configuration by
throwing/returning a validation error in CobolParametersValidator (instead of
log.warn) when variableSizeOccurs is set with RecordFormat.FixedLength,
referencing the same flags (variableSizeOccurs and recordFormat) and
RecordFormat.FixedLength to locate the check; include a clear error message
stating that variable-size occurs cannot be used with fixed-length writes
because RDW framing is only added for RecordFormat.VariableLength (see
NestedRecordCombiner handling), so validation must fail.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala`:
- Around line 42-47: The code currently calls
ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH) and get(PARAM_SOURCE_PATH)
which throw NoSuchElementException before
CobolParametersValidator.validateOrThrow runs; change the parameter extraction
in CobolStreamer: replace direct get(...) calls with getOption(...) and supply a
safe default (e.g., .getOrElse("") ) when building the parameters Map for
PARAM_COPYBOOK_PATH and PARAM_SOURCE_PATH so that
CobolParametersValidator.validateOrThrow(parameters,
ssc.sparkContext.hadoopConfiguration) can run and produce its intended
validation errors instead of a raw exception.
---
Nitpick comments:
In
`@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala`:
- Around line 467-476: The test in NestedWriterSuite is brittle because it
relies on AST index children(5) and exact exception text; update the test so it
finds the second CNT1 node by name (e.g., walk parsedCopybook.ast to locate the
Primitive node whose name equals "CNT1" and then call withUpdatedIsDependee on
that node) instead of using children(5), and change the assertion on the thrown
IllegalArgumentException from exact string equality to a contains check for the
stable fragment (for example verify ex.getMessage contains "Duplicate field name
'CNT1'") while keeping the call to
NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 889a3459-b227-4f39-aa31-c97cb5aed499
📒 Files selected for processing (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala
| if (readerParameters.variableSizeOccurs && | ||
| readerParameters.recordFormat == RecordFormat.FixedLength) { | ||
| log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.") | ||
| } |
There was a problem hiding this comment.
Reject variable_size_occurs for fixed-length writes.
This combination can emit variable-sized records while spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala:45-76 only adds RDW framing for RecordFormat.VariableLength. With record_format=F, downstream consumers lose unambiguous record boundaries, so this should fail validation instead of warning.
🛑 Suggested validation change
- if (readerParameters.variableSizeOccurs &&
- readerParameters.recordFormat == RecordFormat.FixedLength) {
- log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.")
- }
+ if (readerParameters.variableSizeOccurs &&
+ readerParameters.recordFormat == RecordFormat.FixedLength) {
+ issues += "Option 'variable_size_occurs=true' requires 'record_format=V' for writing"
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`
around lines 153 - 156, The validator currently only logs a warning when
readerParameters.variableSizeOccurs is true with readerParameters.recordFormat
== RecordFormat.FixedLength; change this to reject the configuration by
throwing/returning a validation error in CobolParametersValidator (instead of
log.warn) when variableSizeOccurs is set with RecordFormat.FixedLength,
referencing the same flags (variableSizeOccurs and recordFormat) and
RecordFormat.FixedLength to locate the check; include a clear error message
stating that variable-size occurs cannot be used with fixed-length writes
because RDW framing is only added for RecordFormat.VariableLength (see
NestedRecordCombiner handling), so validation must fail.
| val parameters: Map[String, String] = Map[String, String]( | ||
| PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH), | ||
| PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH) | ||
| ) | ||
|
|
||
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Apache Spark's Scala API, what does SparkConf.get(key)do when the key is missing, and how does that differ fromSparkConf.getOption(key)?
💡 Result:
SparkConf.get(key)returns the String value if set, but if the key is missing it throwsjava.util.NoSuchElementException(key). [1]SparkConf.getOption(key)returns anOption[String]:Some(value)if present, otherwiseNone(no exception). [1]
Sources:
[1] Apache Spark source (SparkConf.scala, v3.5.6) (github.com)
Citations:
🏁 Script executed:
# Locate and inspect CobolStreamer.scala around lines 42-47
fd "CobolStreamer.scala" --type fRepository: AbsaOSS/cobrix
Length of output: 1855
🏁 Script executed:
# Inspect CobolParametersValidator to understand error message design
fd "CobolParametersValidator.scala" --type fRepository: AbsaOSS/cobrix
Length of output: 163
🏁 Script executed:
# Find CobolStreamer.scala
fd "CobolStreamer.scala" --path "*/spark-cobol/*"Repository: AbsaOSS/cobrix
Length of output: 273
🏁 Script executed:
# Read CobolStreamer.scala around lines 42-47
find . -path "*/spark-cobol/*" -name "CobolStreamer.scala" -exec cat -n {} \; | sed -n '35,55p'Repository: AbsaOSS/cobrix
Length of output: 1236
🏁 Script executed:
# Read CobolParametersValidator to understand error messages
cat -n spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala | head -150Repository: AbsaOSS/cobrix
Length of output: 8759
Don't call SparkConf.get() before validation.
These lookups throw NoSuchElementException before CobolParametersValidator.validateOrThrow() runs (line 47). Users see a raw config exception instead of the validator's intended error messages for missing required parameters.
Suggested fix
implicit class Deserializer(`@transient` val ssc: StreamingContext) extends Serializable {
- val parameters: Map[String, String] = Map[String, String](
- PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH),
- PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH)
- )
+ private val conf = ssc.sparkContext.getConf
+ val parameters: Map[String, String] =
+ Seq(PARAM_COPYBOOK_PATH, PARAM_SOURCE_PATH)
+ .flatMap(key => conf.getOption(key).map(value => key -> value))
+ .toMap
CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val parameters: Map[String, String] = Map[String, String]( | |
| PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH), | |
| PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH) | |
| ) | |
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) | |
| private val conf = ssc.sparkContext.getConf | |
| val parameters: Map[String, String] = | |
| Seq(PARAM_COPYBOOK_PATH, PARAM_SOURCE_PATH) | |
| .flatMap(key => conf.getOption(key).map(value => key -> value)) | |
| .toMap | |
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala`
around lines 42 - 47, The code currently calls
ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH) and get(PARAM_SOURCE_PATH)
which throw NoSuchElementException before
CobolParametersValidator.validateOrThrow runs; change the parameter extraction
in CobolStreamer: replace direct get(...) calls with getOption(...) and supply a
safe default (e.g., .getOrElse("") ) when building the parameters Map for
PARAM_COPYBOOK_PATH and PARAM_SOURCE_PATH so that
CobolParametersValidator.validateOrThrow(parameters,
ssc.sparkContext.hadoopConfiguration) can run and produce its intended
validation errors instead of a raw exception.
Closes #797
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores