Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ The output looks like this:
| .option("redefine-segment-id-map:0", "REDEFINED_FIELD1 => SegmentId1,SegmentId2,...") | Specifies a mapping between redefined field names and segment id values. Each option specifies a mapping for a single segment. The numeric value for each mapping option must be incremented so the option keys are unique. |
| .option("segment-children:0", "COMPANY => EMPLOYEE,DEPARTMENT") | Specifies a mapping between segment redefined fields and their children. Each option specifies a mapping for a single parent field. The numeric value for each mapping option must be incremented so the option keys are unique. If such mapping is specified hierarchical record structure will be automatically reconstructed. This require `redefine-segment-id-map` to be set. |
| .option("enable_indexes", "true") | Turns on indexing of multisegment variable length files (on by default). |
| .option("enable_index_cache", "false") | When true, calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
| .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) |
| .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object CobolParametersParser extends Logging {

// Indexed multisegment file processing
val PARAM_ENABLE_INDEXES = "enable_indexes"
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
Expand Down Expand Up @@ -381,6 +382,7 @@ object CobolParametersParser extends Logging {
fileEndOffset = 0,
generateRecordId = false,
isUsingIndex = false,
isIndexCachingAllowed = false,
inputSplitRecords = None,
inputSplitSizeMB = None,
improveLocality = false,
Expand Down Expand Up @@ -416,6 +418,7 @@ object CobolParametersParser extends Logging {
isRdwPartRecLength = varLenParams.isRdwPartRecLength,
rdwAdjustment = varLenParams.rdwAdjustment,
isIndexGenerationNeeded = varLenParams.isUsingIndex,
isIndexCachingAllowed = varLenParams.isIndexCachingAllowed,
inputSplitRecords = varLenParams.inputSplitRecords,
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
hdfsDefaultBlockSize = defaultBlockSize,
Expand Down Expand Up @@ -502,6 +505,7 @@ object CobolParametersParser extends Logging {
fileEndOffset,
isRecordIdGenerationEnabled,
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean,
Expand Down Expand Up @@ -828,6 +832,10 @@ object CobolParametersParser extends Logging {
}
}

if (!params.contains(PARAM_RECORD_LENGTH_FIELD) && params.contains(PARAM_RECORD_LENGTH_MAP)) {
throw new IllegalArgumentException(s"Option '$PARAM_RECORD_LENGTH_MAP' requires '$PARAM_RECORD_LENGTH_FIELD' to be specified.")
}

if (params.contains(PARAM_RECORD_LENGTH)) {
val incorrectParameters = new ListBuffer[String]
if (isText) {
Expand Down Expand Up @@ -946,6 +954,10 @@ object CobolParametersParser extends Logging {
params.contains(PARAM_STRICT_INTEGRAL_PRECISION) && params(PARAM_STRICT_INTEGRAL_PRECISION).toBoolean)
throw new IllegalArgumentException(s"Options '$PARAM_DISPLAY_PIC_ALWAYS_STRING' and '$PARAM_STRICT_INTEGRAL_PRECISION' cannot be used together.")

if (params.contains(PARAM_ENABLE_INDEXES) && !params(PARAM_ENABLE_INDEXES).toBoolean &&
params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean)
throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")

if (validateRedundantOptions && unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ case class ReaderParameters(
isRdwPartRecLength: Boolean = false,
rdwAdjustment: Int = 0,
isIndexGenerationNeeded: Boolean = false,
isIndexCachingAllowed: Boolean = false,
inputSplitRecords: Option[Int] = None,
inputSplitSizeMB: Option[Int] = None,
hdfsDefaultBlockSize: Option[Int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package za.co.absa.cobrix.cobol.reader.parameters
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
* @param isUsingIndex Is indexing input file before processing is requested
* @param isIndexCachingAllowed Is caching of generated index allowed
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
* @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records
Expand All @@ -56,6 +57,7 @@ case class VariableLengthParameters(
fileEndOffset: Int,
generateRecordId: Boolean,
isUsingIndex: Boolean,
isIndexCachingAllowed: Boolean,
inputSplitRecords: Option[Int],
inputSplitSizeMB: Option[Int],
improveLocality: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ object SparkCobolProcessor {
case reader: VarLenReader if reader.isIndexGenerationNeeded && allowIndexes =>
val orderedFiles = CobolRelation.getListFilesWithOrder(listOfFiles, spark.sqlContext, isRecursiveRetrieval = false)
val filesMap = orderedFiles.map(fileWithOrder => (fileWithOrder.order, fileWithOrder.filePath)).toMap
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext)(LocalityParameters(improveLocality = false, optimizeAllocation = false))
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext, readerParameters.isIndexCachingAllowed)(LocalityParameters(improveLocality = false, optimizeAllocation = false))

indexes.flatMap(indexEntry => {
val filePathName = filesMap(indexEntry.fileId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
class CobolRelation(sourceDirs: Seq[String],
cobolReader: Reader,
localityParams: LocalityParameters,
debugIgnoreFileSize: Boolean
)(@transient val sqlContext: SQLContext)
debugIgnoreFileSize: Boolean)
(@transient val sqlContext: SQLContext)
extends BaseRelation
with Serializable
with TableScan {

private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)

private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams)
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cobolReader.getReaderProperties.isIndexCachingAllowed)(localityParams)

override def schema: StructType = {
cobolReader.getSparkSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}

import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer

/**
Expand All @@ -45,18 +46,24 @@ import scala.collection.mutable.ArrayBuffer
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
*/
private[cobol] object IndexBuilder extends Logging {
private[cobol] val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()

def buildIndex(filesList: Array[FileWithOrder],
cobolReader: Reader,
sqlContext: SQLContext)
sqlContext: SQLContext,
cachingAllowed: Boolean)
(localityParams: LocalityParameters): RDD[SparseIndexEntry] = {
val fs = new Path(filesList.head.filePath).getFileSystem(sqlContext.sparkSession.sparkContext.hadoopConfiguration)

cobolReader match {
case reader: VarLenReader if reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs) =>
logger.info("Building indexes with data locality...")
buildIndexForVarLenReaderWithFullLocality(filesList, reader, sqlContext, localityParams.optimizeAllocation)
case reader: VarLenReader =>
buildIndexForVarLenReader(filesList, reader, sqlContext)
logger.info("Building indexes for variable record length input files...")
buildIndexForVarLenReader(filesList, reader, sqlContext, cachingAllowed)
case _ =>
logger.info("Generating indexes for full files...")
buildIndexForFullFiles(filesList, sqlContext)
}
}
Expand Down Expand Up @@ -112,24 +119,58 @@ private[cobol] object IndexBuilder extends Logging {
*/
private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder],
reader: VarLenReader,
sqlContext: SQLContext): RDD[SparseIndexEntry] = {
sqlContext: SQLContext,
cachingAllowed: Boolean): RDD[SparseIndexEntry] = {
val conf = sqlContext.sparkContext.hadoopConfiguration
val sconf = new SerializableConfiguration(conf)

if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) {
selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf)
// Splitting between files for which indexes are cached and the list of files for which indexes are not cached
val cachedFiles = if (cachingAllowed) {
filesList.filter(f => indexCache.containsKey(f.filePath))
} else {
Array.empty[FileWithOrder]
}

val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val nonCachedFiles = filesList.diff(cachedFiles)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).cache()
// Getting indexes for files for which indexes are not in the cache
val newIndexes = if (nonCachedFiles.length > 0) {
if (reader.getReaderProperties.enableSelfChecks) {
selfCheckForIndexCompatibility(reader, nonCachedFiles.head.filePath, conf)
}

repartitionIndexes(indexRDD)
val filesRDD = sqlContext.sparkContext.parallelize(nonCachedFiles, nonCachedFiles.length)
filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
generateIndexEntry(row, sconf.value, reader)
})
}).collect()
} else {
Array.empty[SparseIndexEntry]
}

// Storing new indexes in the cache
if (cachingAllowed && newIndexes.length > 0) {
newIndexes.groupBy(_.fileId).foreach { case (fileId, indexEntries) =>
val filePathOpt = filesList.find(_.order == fileId).map(_.filePath)

filePathOpt.foreach { filePath =>
logger.info(s"Index stored to cache for file: $filePath.")
indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom))
}
}
}

// Getting indexes for files for which indexes are in the cache
val cachedIndexes = cachedFiles.flatMap { f =>
logger.info("Index fetched from cache for file: " + f.filePath)
indexCache.get(f.filePath)
.map(ind => ind.copy(fileId = f.order))
}

// Creating the final RDD with all indexes
createIndexRDD(cachedIndexes ++ newIndexes, sqlContext)
}
Comment on lines +127 to 174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Verify cache key includes file modification tracking.

The caching logic filters files using only containsKey(f.filePath) without considering whether the file has been modified since the index was cached. This means if a VRL file is modified (records added/removed/changed), the stale cached index will be used, leading to incorrect data processing.

Run the following script to check if file modification time is tracked elsewhere in the codebase:

If modification time is not tracked, the cache key should be changed from just the file path to include modification metadata, e.g., s"${filePath}_${modificationTime}".


🏁 Script executed:

#!/bin/bash
# Search for file modification time tracking in the codebase
rg -n "getModificationTime|lastModified|FileStatus" --type scala -C 3

Length of output: 8054


Now I'll examine the IndexBuilder.scala file to understand the cache structure and identify whether modification time is tracked:


🏁 Script executed:

#!/bin/bash
# Examine the IndexBuilder.scala file around the cache logic
cat spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala | head -180 | tail -80

Length of output: 3010



🏁 Script executed:

#!/bin/bash
# Find FileWithOrder class definition
fd -t f "FileWithOrder" spark-cobol/src/main/scala/

Length of output: 89



🏁 Script executed:

#!/bin/bash
# Examine FileWithOrder class definition
cat spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala

Length of output: 778



🏁 Script executed:

#!/bin/bash
# Find indexCache declaration and initialization
rg -n "indexCache" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala -B 5 -A 2 | head -50

Length of output: 1494



🏁 Script executed:

#!/bin/bash
# Search for any file validation or modification checks in the index building process
rg -n "getModificationTime|lastModified|getLen|FileStatus" spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/ -C 3

Length of output: 867


Add file modification time tracking to cache key to prevent stale index usage.

The cache uses only filePath as the key and does not consider whether files have been modified. If a VRL file is modified (records added/removed/changed) between index generations, the stale cached index will be returned without any validation, leading to incorrect data processing.

Suggested fix:
Include file modification time in the cache key:

// Line 129 - change from:
filesList.filter(f => indexCache.containsKey(f.filePath))
// to:
filesList.filter(f => {
  val fileStatus = fileSystem.getFileStatus(new Path(f.filePath))
  val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
  indexCache.containsKey(cacheKey)
})

// Line 160 - update cache storage to use same key format:
val fileStatus = fileSystem.getFileStatus(new Path(filePath))
val cacheKey = s"${filePath}_${fileStatus.getModificationTime}"
indexCache.put(cacheKey, indexEntries.sortBy(_.offsetFrom))

// Line 168 - update cache retrieval to use same key format:
val fileStatus = fileSystem.getFileStatus(new Path(f.filePath))
val cacheKey = s"${f.filePath}_${fileStatus.getModificationTime}"
indexCache.get(cacheKey)

Alternatively, store the modification time alongside cached data and validate it on retrieval.

🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala
around lines 127-174, the cache currently keys only by filePath which allows
stale indexes to be used after file modifications; update the cache logic to
include file modification time in the cache key (or store and validate
modification time alongside the cached entry). Specifically: when selecting
cachedFiles (line ~129) call the FileSystem to get the file status and build a
cacheKey using filePath + modificationTime and check
indexCache.containsKey(cacheKey); when storing newIndexes (around lines
~160-168) get the file status for the filePath, build the same cacheKey and put
the sorted indexEntries under that cacheKey; when fetching cachedIndexes (around
lines ~168+) compute the cacheKey from f.filePath and its modificationTime and
read indexCache.get(cacheKey) (or alternatively store modification time with
entries and validate it on retrieval). Use
org.apache.hadoop.fs.FileSystem.getFileStatus(new Path(...)).getModificationTime
for the timestamp.


/**
Expand Down Expand Up @@ -336,4 +377,13 @@ private[cobol] object IndexBuilder extends Logging {
logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions")
indexRDD.repartition(numPartitions).cache()
}

private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val indexCount = indexes.length

val numPartitions = Math.max(1, Math.min(indexCount, Constants.maxNumPartitions))
logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions")

sqlContext.sparkContext.parallelize(indexes, numPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
if (numberOfBytes <= 0) {
new Array[Byte](0)
} else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) {
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, reached offset $byteIndex.")
close()
new Array[Byte](0)
} else {
Expand All @@ -97,7 +98,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
if (readBytes == numberOfBytes) {
buffer
} else {
logger.warn(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
close()
if (readBytes == actualBytesToRead) {
buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = true, optimizeAllocation = true)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 3)
}
Expand All @@ -86,7 +86,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 3)
}
Expand All @@ -104,7 +104,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)

val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()

assert(index.length == 1)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val reader = new VarLenNestedReader(Seq(copybook), readerParameters)

val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()

assert(index.length == 3)
}
Expand All @@ -188,7 +188,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest

val reader = new VarLenNestedReader(Seq(copybook), readerParameters)

val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()

assert(index.length == 2)
}
Expand Down
Loading
Loading