Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,27 @@ object Copybook {
* Nested field names can contain '.' to identify the exact field.
* If the field name is unique '.' is not required.
*
* @param field The AST object of the field
* @param recordBytes Binary encoded data of the record
* @param startOffset An offset to the beginning of the field in the data (in bytes).
* @param field The AST object of the field
* @param recordBytes Binary encoded data of the record
* @param configuredStartOffset An offset to the beginning of the field in the data (in bytes).
* @param fieldStartOffsetOverride If this offset is 0 or negative use the field offset is defined by the copybook.
* Otherwise, use the specified offset
* @return The value of the field
*
*/
def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, startOffset: Int = 0): Unit = {
def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, configuredStartOffset: Int = 0, fieldStartOffsetOverride: Int = 0): Unit = {
field.encode match {
case Some(encode) =>
val fieldBytes = encode(value)
val startByte = field.binaryProperties.offset + startOffset
val endByte = field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize

val startByte = if (fieldStartOffsetOverride > 0)
fieldStartOffsetOverride
else
field.binaryProperties.offset + configuredStartOffset
val endByte = if (fieldStartOffsetOverride > 0)
fieldStartOffsetOverride + field.binaryProperties.actualSize
else
field.binaryProperties.offset + configuredStartOffset + field.binaryProperties.actualSize

if (startByte < 0 || endByte > recordBytes.length) {
throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the field is out of bounds of the record.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,29 @@ package za.co.absa.cobrix.cobol.parser.recordformats
sealed trait RecordFormat

object RecordFormat {
case object FixedLength extends RecordFormat
case object FixedBlock extends RecordFormat
case object VariableLength extends RecordFormat
case object VariableBlock extends RecordFormat
case object AsciiText extends RecordFormat
case object CobrixAsciiText extends RecordFormat
case object FixedLength extends RecordFormat {
override def toString: String = "F"
}

case object FixedBlock extends RecordFormat {
override def toString: String = "FB"
}

case object VariableLength extends RecordFormat {
override def toString: String = "V"
}

case object VariableBlock extends RecordFormat {
override def toString: String = "VB"
}

case object AsciiText extends RecordFormat {
override def toString: String = "D"
}

case object CobrixAsciiText extends RecordFormat {
override def toString: String = "D2"
}

def withNameOpt(s: String): Option[RecordFormat] = {
s match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class DefaultSource
CobolParametersValidator.checkSanity(cobolParameters)

val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
CobolParametersValidator.validateParametersForWriting(readerParameters)

val outputPath = new Path(path)
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@

package za.co.absa.cobrix.spark.cobol.source.parameters

import java.io.FileNotFoundException
import java.nio.file.{Files, Paths}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
import za.co.absa.cobrix.spark.cobol.utils.ResourceUtils.getClass
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, ReaderParameters}
import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType}

import java.io.FileNotFoundException
import java.nio.file.{Files, Paths}
import scala.collection.mutable.ListBuffer

/**
* This class provides methods for checking the Spark job options after parsed.
*/
object CobolParametersValidator {

def checkSanity(params: CobolParameters) = {
def checkSanity(params: CobolParameters): Unit = {
if (params.sourcePaths.isEmpty) {
throw new IllegalArgumentException("Data source path must be specified.")
}
Expand Down Expand Up @@ -113,4 +115,29 @@ object CobolParametersValidator {
validatePath(fileName)
}
}
}

def validateParametersForWriting(readerParameters: ReaderParameters): Unit = {
val issues = new ListBuffer[String]

if (readerParameters.recordFormat != RecordFormat.FixedLength && readerParameters.recordFormat != RecordFormat.VariableLength) {
issues += s"Only '${RecordFormat.FixedLength}' and '${RecordFormat.VariableLength}' values for 'record_format' are supported for writing, " +
s"provided value: '${readerParameters.recordFormat}'"
}

if (readerParameters.variableSizeOccurs) {
issues += "Variable size OCCURS ('variable_size_occurs = true') is not supported for writing"
}

if (readerParameters.startOffset != 0 || readerParameters.endOffset != 0) {
issues += "'record_start_offset' and 'record_end_offset' are not supported for writing"
}

if (readerParameters.fileStartOffset != 0 || readerParameters.fileEndOffset != 0) {
issues += "'file_start_offset' and 'file_end_offset' are not supported for writing"
}

if (issues.nonEmpty) {
throw new IllegalArgumentException(s"Writer validation issues: ${issues.mkString("; ")}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.datatype.{Decimal, Integral}
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema

Expand Down Expand Up @@ -61,16 +62,62 @@ class BasicRecordCombiner extends RecordCombiner {
(idx, position)
}

val size = cobolSchema.getRecordSize
val hasRdw = readerParameters.recordFormat == RecordFormat.VariableLength
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the RecordFormat enum definition
fd -e scala -x grep -l "enum.*RecordFormat\|object RecordFormat" {} \;

Repository: AbsaOSS/cobrix

Length of output: 151


🏁 Script executed:

#!/bin/bash
# Search for VariableBlock usage in the codebase
rg "VariableBlock" --type scala -B2 -A2

Repository: AbsaOSS/cobrix

Length of output: 13006


🏁 Script executed:

#!/bin/bash
# Check how RDW is handled in reader parameters across the codebase
rg "hasRdw|rdw" --type scala -i -B2 -A2 | head -100

Repository: AbsaOSS/cobrix

Length of output: 13042


🏁 Script executed:

cat -n spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala | sed -n '55,85p'

Repository: AbsaOSS/cobrix

Length of output: 1375


🏁 Script executed:

# Search for how VariableBlock is handled in BasicRecordCombiner context
rg "VariableBlock|VariableLength|hasRdw" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala -B2 -A2

Repository: AbsaOSS/cobrix

Length of output: 681


🏁 Script executed:

# Check if there are other uses of RDW checks for VariableBlock vs VariableLength
rg "recordFormat.*==.*Variable" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/ -A1

Repository: AbsaOSS/cobrix

Length of output: 372


Add VariableBlock to the RDW check.

Line 65 should also check for RecordFormat.VariableBlock since VB files use RDW headers for individual records within blocks, similar to V format files. Change:

val hasRdw = readerParameters.recordFormat == RecordFormat.VariableLength

to:

val hasRdw = readerParameters.recordFormat == RecordFormat.VariableLength || readerParameters.recordFormat == RecordFormat.VariableBlock

Without this, VariableBlock files will be written without proper RDW headers, causing downstream reading failures.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala`
at line 65, Update the RDW detection in BasicRecordCombiner by expanding the
hasRdw condition to include RecordFormat.VariableBlock; specifically modify the
logic that computes hasRdw from checking only readerParameters.recordFormat ==
RecordFormat.VariableLength to also consider readerParameters.recordFormat ==
RecordFormat.VariableBlock so VariableBlock files get RDW headers just like
VariableLength files (look for the hasRdw variable and the
readerParameters.recordFormat / RecordFormat enum references).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not applicable here. The coborl writer is not going to support variable-block headers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yruslan, thank you for the clarification! I understand that variable-block format support is intentionally not included in the writer implementation. I've noted this design decision for future reference.


✏️ Learnings added
Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 827
File: spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala:65-65
Timestamp: 2026-03-03T08:34:52.931Z
Learning: In the COBOL writer (BasicRecordCombiner), variable-block (VB) format is not supported for writing. Only VariableLength format uses RDW headers in the writer; VariableBlock format will not be implemented.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 799
File: cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala:83-103
Timestamp: 2025-11-13T14:28:55.864Z
Learning: In Cobrix, RDW (Record Descriptor Word) length field contains only the payload size, not including the 4-byte RDW header itself. This is a historical convention. When reading files with RDW that includes the header size, users should use `option("rdw_adjustment", "-4")`.

Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 820
File: cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala:435-436
Timestamp: 2026-02-11T08:01:53.018Z
Learning: In `cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala`, the `extractHierarchicalRecord` method hardcodes `generateCorruptedFields = false` because validation in `CobolParametersParser` throws an `IllegalArgumentException` when `generate_corrupted_fields` is used with segment-children (hierarchical data), preventing users from enabling this unsupported combination.

Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 820
File: cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala:50-50
Timestamp: 2026-02-11T08:15:51.206Z
Learning: Rename the field from _corrupted_fields to _corrupt_fields to align with Apache Spark's naming convention (e.g., _corrupt_record). Apply this change consistently across the codebase where the field is defined or referenced (including cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala) and update any tests or serialization logic that rely on the old name.

val isRdwBigEndian = readerParameters.isRdwBigEndian
val adjustment1 = if (readerParameters.isRdwPartRecLength) 4 else 0
val adjustment2 = readerParameters.rdwAdjustment

val size = if (hasRdw) {
cobolSchema.getRecordSize + 4
} else {
cobolSchema.getRecordSize
}

val startOffset = if (hasRdw) 4 else 0

val recordLengthLong = cobolSchema.getRecordSize.toLong + adjustment1.toLong + adjustment2.toLong
if (recordLengthLong < 0) {
throw new IllegalArgumentException(
s"Invalid RDW length $recordLengthLong. Check 'is_rdw_part_of_record_length' and 'rdw_adjustment'."
)
}
if (isRdwBigEndian && recordLengthLong > 0xFFFFL) {
throw new IllegalArgumentException(
s"RDW length $recordLengthLong exceeds 65535 and cannot be encoded in big-endian mode."
)
}
if (!isRdwBigEndian && recordLengthLong > Int.MaxValue.toLong) {
throw new IllegalArgumentException(
s"RDW length $recordLengthLong exceeds ${Int.MaxValue} and cannot be encoded safely."
)
}
val recordLength = recordLengthLong.toInt

df.rdd.map { row =>
val ar = new Array[Byte](size)

if (hasRdw) {
if (isRdwBigEndian) {
ar(0) = ((recordLength >> 8) & 0xFF).toByte
ar(1) = (recordLength & 0xFF).toByte
// The last two bytes are reserved and defined by IBM as binary zeros on all platforms.
ar(2) = 0
ar(3) = 0
} else {
ar(0) = (recordLength & 0xFF).toByte
ar(1) = ((recordLength >> 8) & 0xFF).toByte
// This is non-standard. But so are little-endian RDW headers.
// As an advantage, it has no effect for small records but adds support for big records (> 64KB).
ar(2) = ((recordLength >> 16) & 0xFF).toByte
ar(3) = ((recordLength >> 24) & 0xFF).toByte
}
}

sparkFieldPositions.foreach { case (cobolIdx, sparkIdx) =>
if (!row.isNullAt(sparkIdx)) {
val fieldStr = row.get(sparkIdx)
val cobolField = cobolFields(cobolIdx)
Copybook.setPrimitiveField(cobolField, ar, fieldStr, 0)
Copybook.setPrimitiveField(cobolField, ar, fieldStr, startOffset)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.writer

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.scalatest.Assertion
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}

class VariableLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {

import spark.implicits._

private val copybookContents =
""" 01 RECORD.
05 A PIC X(1).
05 B PIC X(5).
"""

"cobol writer" should {
"write simple variable -record-length EBCDIC data files with big-endian RDWs" in {
withTempDirectory("cobol_writer1") { tempDir =>
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")

val path = new Path(tempDir, "writer1")

df.coalesce(1)
.orderBy("A")
.write
.format("cobol")
.mode(SaveMode.Overwrite)
.option("copybook_contents", copybookContents)
.option("record_format", "V")
.option("is_rdw_big_endian", "true")
.save(path.toString)

val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)

assert(fs.exists(path), "Output directory should exist")
val files = fs.listStatus(path)
.filter(_.getPath.getName.startsWith("part-"))
assert(files.nonEmpty, "Output directory should contain part files")

val partFile = files.head.getPath
val data = fs.open(partFile)
val bytes = new Array[Byte](files.head.getLen.toInt)
data.readFully(bytes)
data.close()

// Expected EBCDIC data for sample test data
val expected = Array[Byte](
0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW1
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW2
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW3
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
)

if (!bytes.sameElements(expected)) {
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")

assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
}
}
}

"write simple variable -record-length EBCDIC data files with little-endian RDWs and RDW being part of record length" in {
withTempDirectory("cobol_writer1") { tempDir =>
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")

val path = new Path(tempDir, "writer1")

df.coalesce(1)
.orderBy("A")
.write
.format("cobol")
.mode(SaveMode.Overwrite)
.option("copybook_contents", copybookContents)
.option("record_format", "V")
.option("is_rdw_big_endian", "false")
.option("is_rdw_part_of_record_length", "true")
.save(path.toString)

val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)

assert(fs.exists(path), "Output directory should exist")
val files = fs.listStatus(path)
.filter(_.getPath.getName.startsWith("part-"))
assert(files.nonEmpty, "Output directory should contain part files")

val partFile = files.head.getPath
val data = fs.open(partFile)
val bytes = new Array[Byte](files.head.getLen.toInt)
data.readFully(bytes)
data.close()

// Expected EBCDIC data for sample test data
val expected = Array[Byte](
0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW1
0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First
0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW2
0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_
0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW3
0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_
)

if (!bytes.sameElements(expected)) {
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")

assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
}
}
}

"throw an exception on unexpected output record format" in {
withTempDirectory("cobol_writer2") { tempDir =>
val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B")

val path = new Path(tempDir, "writer2")

val exception = intercept[IllegalArgumentException] {
df.coalesce(1)
.orderBy("A")
.write
.format("cobol")
.mode(SaveMode.Overwrite)
.option("copybook_contents", copybookContents)
.option("record_format", "FB") // Not supported
.option("variable_size_occurs", "true") // Not supported
.option("file_start_offset", "2") // Not supported
.option("record_end_offset", "4") // Not supported
.save(path.toString)
}

assert(exception.getMessage.contains("Writer validation issues: Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'FB';"))
assert(exception.getMessage.contains("Variable size OCCURS ('variable_size_occurs = true') is not supported for writing"))
assert(exception.getMessage.contains("'record_start_offset' and 'record_end_offset' are not supported for writing"))
assert(exception.getMessage.contains("'file_start_offset' and 'file_end_offset' are not supported for writing"))
}
}
}

def assertArraysEqual(actual: Array[Byte], expected: Array[Byte]): Assertion = {
if (!actual.sameElements(expected)) {
val actualHex = actual.map(b => f"0x$b%02X").mkString(", ")
val expectedHex = expected.map(b => f"0x$b%02X").mkString(", ")
fail(s"Actual: $actualHex\nExpected: $expectedHex")
} else {
succeed
}
}
}
Loading