diff --git a/README.md b/README.md index aa7536915..95156dd7d 100644 --- a/README.md +++ b/README.md @@ -1602,6 +1602,7 @@ The output looks like this: | .option("redefine-segment-id-map:0", "REDEFINED_FIELD1 => SegmentId1,SegmentId2,...") | Specifies a mapping between redefined field names and segment id values. Each option specifies a mapping for a single segment. The numeric value for each mapping option must be incremented so the option keys are unique. | | .option("segment-children:0", "COMPANY => EMPLOYEE,DEPARTMENT") | Specifies a mapping between segment redefined fields and their children. Each option specifies a mapping for a single parent field. The numeric value for each mapping option must be incremented so the option keys are unique. If such mapping is specified hierarchical record structure will be automatically reconstructed. This require `redefine-segment-id-map` to be set. | | .option("enable_indexes", "true") | Turns on indexing of multisegment variable length files (on by default). | +| .option("enable_index_cache", "false") | When true, calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. | | .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) | | .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) | diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index 1a3033baf..3a4a9ad6f 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -119,6 +119,7 @@ object CobolParametersParser extends Logging { // Indexed multisegment file processing val PARAM_ENABLE_INDEXES = "enable_indexes" + val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache" val PARAM_INPUT_SPLIT_RECORDS = "input_split_records" val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb" val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix" @@ -381,6 +382,7 @@ object CobolParametersParser extends Logging { fileEndOffset = 0, generateRecordId = false, isUsingIndex = false, + isIndexCachingAllowed = false, inputSplitRecords = None, inputSplitSizeMB = None, improveLocality = false, @@ -416,6 +418,7 @@ object CobolParametersParser extends Logging { isRdwPartRecLength = varLenParams.isRdwPartRecLength, rdwAdjustment = varLenParams.rdwAdjustment, isIndexGenerationNeeded = varLenParams.isUsingIndex, + isIndexCachingAllowed = varLenParams.isIndexCachingAllowed, inputSplitRecords = varLenParams.inputSplitRecords, inputSplitSizeMB = varLenParams.inputSplitSizeMB, hdfsDefaultBlockSize = defaultBlockSize, @@ -502,6 +505,7 @@ object CobolParametersParser extends Logging { fileEndOffset, isRecordIdGenerationEnabled, params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean, + params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean, params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt), params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt), params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean, @@ -828,6 +832,10 @@ object CobolParametersParser extends Logging { } } + if (!params.contains(PARAM_RECORD_LENGTH_FIELD) && params.contains(PARAM_RECORD_LENGTH_MAP)) { + throw new IllegalArgumentException(s"Option '$PARAM_RECORD_LENGTH_MAP' requires '$PARAM_RECORD_LENGTH_FIELD' to be specified.") + } + if (params.contains(PARAM_RECORD_LENGTH)) { val incorrectParameters = new ListBuffer[String] if (isText) { @@ -946,6 +954,10 @@ object CobolParametersParser extends Logging { params.contains(PARAM_STRICT_INTEGRAL_PRECISION) && params(PARAM_STRICT_INTEGRAL_PRECISION).toBoolean) throw new IllegalArgumentException(s"Options '$PARAM_DISPLAY_PIC_ALWAYS_STRING' and '$PARAM_STRICT_INTEGRAL_PRECISION' cannot be used together.") + if (params.contains(PARAM_ENABLE_INDEXES) && !params(PARAM_ENABLE_INDEXES).toBoolean && + params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean) + throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.") + if (validateRedundantOptions && unusedKeys.nonEmpty) { val unusedKeyStr = unusedKeys.mkString(",") val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr." diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 73c3c8b0a..ac679e1a9 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -99,6 +99,7 @@ case class ReaderParameters( isRdwPartRecLength: Boolean = false, rdwAdjustment: Int = 0, isIndexGenerationNeeded: Boolean = false, + isIndexCachingAllowed: Boolean = false, inputSplitRecords: Option[Int] = None, inputSplitSizeMB: Option[Int] = None, hdfsDefaultBlockSize: Option[Int] = None, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala index d2d289ae5..153dd8fa2 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala @@ -34,6 +34,7 @@ package za.co.absa.cobrix.cobol.reader.parameters * @param fileEndOffset A number of bytes to skip at the end of each file * @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data * @param isUsingIndex Is indexing input file before processing is requested + * @param isIndexCachingAllowed Is caching of generated index allowed * @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size * @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option * @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records @@ -56,6 +57,7 @@ case class VariableLengthParameters( fileEndOffset: Int, generateRecordId: Boolean, isUsingIndex: Boolean, + isIndexCachingAllowed: Boolean, inputSplitRecords: Option[Int], inputSplitSizeMB: Option[Int], improveLocality: Boolean, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala index 8af67cfda..8f0763584 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala @@ -218,7 +218,7 @@ object SparkCobolProcessor { case reader: VarLenReader if reader.isIndexGenerationNeeded && allowIndexes => val orderedFiles = CobolRelation.getListFilesWithOrder(listOfFiles, spark.sqlContext, isRecursiveRetrieval = false) val filesMap = orderedFiles.map(fileWithOrder => (fileWithOrder.order, fileWithOrder.filePath)).toMap - val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext)(LocalityParameters(improveLocality = false, optimizeAllocation = false)) + val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext, readerParameters.isIndexCachingAllowed)(LocalityParameters(improveLocality = false, optimizeAllocation = false)) indexes.flatMap(indexEntry => { val filePathName = filesMap(indexEntry.fileId) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala index ca56280e8..ff7e639f5 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala @@ -65,15 +65,15 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser class CobolRelation(sourceDirs: Seq[String], cobolReader: Reader, localityParams: LocalityParameters, - debugIgnoreFileSize: Boolean - )(@transient val sqlContext: SQLContext) + debugIgnoreFileSize: Boolean) + (@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan { private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval) - private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams) + private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cobolReader.getReaderProperties.isIndexCachingAllowed)(localityParams) override def schema: StructType = { cobolReader.getSparkSchema diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index ab4e657bd..ef334d6e5 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer /** @@ -45,18 +46,24 @@ import scala.collection.mutable.ArrayBuffer * In a nutshell, ideally, there will be as many partitions as are there are indexes. */ private[cobol] object IndexBuilder extends Logging { + private[cobol] val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]() + def buildIndex(filesList: Array[FileWithOrder], cobolReader: Reader, - sqlContext: SQLContext) + sqlContext: SQLContext, + cachingAllowed: Boolean) (localityParams: LocalityParameters): RDD[SparseIndexEntry] = { val fs = new Path(filesList.head.filePath).getFileSystem(sqlContext.sparkSession.sparkContext.hadoopConfiguration) cobolReader match { case reader: VarLenReader if reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs) => + logger.info("Building indexes with data locality...") buildIndexForVarLenReaderWithFullLocality(filesList, reader, sqlContext, localityParams.optimizeAllocation) case reader: VarLenReader => - buildIndexForVarLenReader(filesList, reader, sqlContext) + logger.info("Building indexes for variable record length input files...") + buildIndexForVarLenReader(filesList, reader, sqlContext, cachingAllowed) case _ => + logger.info("Generating indexes for full files...") buildIndexForFullFiles(filesList, sqlContext) } } @@ -112,24 +119,58 @@ private[cobol] object IndexBuilder extends Logging { */ private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder], reader: VarLenReader, - sqlContext: SQLContext): RDD[SparseIndexEntry] = { + sqlContext: SQLContext, + cachingAllowed: Boolean): RDD[SparseIndexEntry] = { val conf = sqlContext.sparkContext.hadoopConfiguration val sconf = new SerializableConfiguration(conf) - if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) { - selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf) + // Splitting between files for which indexes are cached and the list of files for which indexes are not cached + val cachedFiles = if (cachingAllowed) { + filesList.filter(f => indexCache.containsKey(f.filePath)) + } else { + Array.empty[FileWithOrder] } - val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length) + val nonCachedFiles = filesList.diff(cachedFiles) - val indexRDD = filesRDD.mapPartitions( - partition => { - partition.flatMap(row => { - generateIndexEntry(row, sconf.value, reader) - }) - }).cache() + // Getting indexes for files for which indexes are not in the cache + val newIndexes = if (nonCachedFiles.length > 0) { + if (reader.getReaderProperties.enableSelfChecks) { + selfCheckForIndexCompatibility(reader, nonCachedFiles.head.filePath, conf) + } - repartitionIndexes(indexRDD) + val filesRDD = sqlContext.sparkContext.parallelize(nonCachedFiles, nonCachedFiles.length) + filesRDD.mapPartitions( + partition => { + partition.flatMap(row => { + generateIndexEntry(row, sconf.value, reader) + }) + }).collect() + } else { + Array.empty[SparseIndexEntry] + } + + // Storing new indexes in the cache + if (cachingAllowed && newIndexes.length > 0) { + newIndexes.groupBy(_.fileId).foreach { case (fileId, indexEntries) => + val filePathOpt = filesList.find(_.order == fileId).map(_.filePath) + + filePathOpt.foreach { filePath => + logger.info(s"Index stored to cache for file: $filePath.") + indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom)) + } + } + } + + // Getting indexes for files for which indexes are in the cache + val cachedIndexes = cachedFiles.flatMap { f => + logger.info("Index fetched from cache for file: " + f.filePath) + indexCache.get(f.filePath) + .map(ind => ind.copy(fileId = f.order)) + } + + // Creating the final RDD with all indexes + createIndexRDD(cachedIndexes ++ newIndexes, sqlContext) } /** @@ -336,4 +377,13 @@ private[cobol] object IndexBuilder extends Logging { logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions") indexRDD.repartition(numPartitions).cache() } + + private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = { + val indexCount = indexes.length + + val numPartitions = Math.max(1, Math.min(indexCount, Constants.maxNumPartitions)) + logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions") + + sqlContext.sparkContext.parallelize(indexes, numPartitions) + } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index cc8963552..dc61db2fb 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -83,6 +83,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = if (numberOfBytes <= 0) { new Array[Byte](0) } else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) { + logger.info(s"End of stream reached: Requested $numberOfBytes bytes, reached offset $byteIndex.") close() new Array[Byte](0) } else { @@ -97,7 +98,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = if (readBytes == numberOfBytes) { buffer } else { - logger.warn(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.") + logger.info(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.") close() if (readBytes == actualBytesToRead) { buffer diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala index b19bab7ad..4affdfefd 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala @@ -64,7 +64,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest val localityParameters = LocalityParameters(improveLocality = true, optimizeAllocation = true) - val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect() + val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect() assert(index.length == 3) } @@ -86,7 +86,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false) - val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect() + val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect() assert(index.length == 3) } @@ -104,7 +104,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false) - val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect() + val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect() assert(index.length == 1) } @@ -168,7 +168,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest val reader = new VarLenNestedReader(Seq(copybook), readerParameters) - val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect() + val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect() assert(index.length == 3) } @@ -188,7 +188,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest val reader = new VarLenNestedReader(Seq(copybook), readerParameters) - val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect() + val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect() assert(index.length == 2) } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala index 38dad2fda..7c8c77fe4 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala @@ -16,10 +16,13 @@ package za.co.absa.cobrix.spark.cobol.source.integration +import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.{PARAM_ENABLE_INDEXES, PARAM_ENABLE_INDEX_CACHE} import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture +import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture { private val copybook = @@ -170,6 +173,53 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with } } + "work for data with offsets and indexes and index cache" in { + withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile => + val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}""" + + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("input_split_records", "1") + .option("enable_index_cache", "true") + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + + val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + val actualCached = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + + val pathNameAsCached = s"file:$tempFile" + + assert(IndexBuilder.indexCache.get(pathNameAsCached) != null) + assert(IndexBuilder.indexCache.get(pathNameAsCached).length == 2) + + assert(actualInitial == expected) + assert(actualCached == expected) + } + } + + "throw an exception when index caching is requested while indexes are turned off" in { + withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile => + val ex = intercept[IllegalArgumentException] { + spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("enable_indexes", "false") + .option("enable_index_cache", "true") + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + } + + assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.") + } + } + "throw an exception for unknown mapping" in { withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile => val df = spark.read