diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 5c327cf18..caf568364 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -100,13 +100,17 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, - startingRecordIndex: Long): Iterator[Seq[Any]] = + startingRecordIndex: Long): Iterator[Seq[Any]] = { + val recordExtractorOpt = recordExtractor(startingRecordIndex, dataStream, headerStream) + if (recordExtractorOpt.isEmpty) { + headerStream.close() + } if (cobolSchema.copybook.isHierarchical) { new VarLenHierarchicalIterator(cobolSchema.copybook, dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream), + recordExtractorOpt, fileNumber, startingRecordIndex, startingFileOffset, @@ -116,13 +120,14 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream), + recordExtractorOpt, fileNumber, startingRecordIndex, startingFileOffset, cobolSchema.segmentIdPrefix, handler) } + } /** * Traverses the data sequentially as fast as possible to generate record index. @@ -166,12 +171,17 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], case None => false } + val recordExtractorOpt = recordExtractor(0L, dataStream, headerStream) + if (recordExtractorOpt.isEmpty) { + headerStream.close() + } + segmentIdField match { case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber, dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, dataStream, headerStream), + recordExtractorOpt, inputSplitSizeRecords, inputSplitSizeMB, Some(copybook), @@ -182,7 +192,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, dataStream, headerStream), + recordExtractorOpt, inputSplitSizeRecords, inputSplitSizeMB, None, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala index d9aa80367..98710ace1 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala @@ -53,14 +53,19 @@ final class VarLenNestedReader(copybookContents: Seq[String], headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, - startingRecordIndex: Long): Iterator[Row] = + startingRecordIndex: Long): Iterator[Row] = { + val recordExtractorOpt = recordExtractor(startingRecordIndex, dataStream, headerStream) + if (recordExtractorOpt.isEmpty) { + headerStream.close() + } + if (cobolSchema.copybook.isHierarchical) { new RowIterator( new VarLenHierarchicalIterator(cobolSchema.copybook, dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream), + recordExtractorOpt, fileNumber, startingRecordIndex, startingFileOffset, @@ -72,7 +77,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream), + recordExtractorOpt, fileNumber, startingRecordIndex, startingFileOffset, @@ -80,4 +85,5 @@ final class VarLenNestedReader(copybookContents: Seq[String], new RowHandler()) ) } + } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index 339dcb2d5..ab4e657bd 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -215,56 +215,64 @@ private[cobol] object IndexBuilder extends Logging { readerProperties.recordExtractor.foreach { recordExtractorClass => val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config) - val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream) + try { + val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream) - var offset = -1L - var record = Array[Byte]() - - extractorOpt.foreach { extractor => - if (extractor.hasNext) { - // Getting the first record, if available - extractor.next() - offset = extractor.offset // Saving offset to jump to later + var offset = -1L + var record = Array[Byte]() + extractorOpt.foreach { extractor => if (extractor.hasNext) { - // Getting the second record, if available - record = extractor.next() // Saving the record to check later - - dataStream.close() - headerStream.close() - - // Getting new streams and record extractor that points directly to the second record - val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config) - val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2) - - extractorOpt2.foreach { extractor2 => - if (!extractor2.hasNext) { - // If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support. - throw new RuntimeException( - s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " + - "Please, use 'enable_indexes = false'. " + - s"File: $filePath, offset: $offset" - ) - } - - // Getting the second record from the extractor pointing to the second record offset at the start. - val expectedRecord = extractor2.next() - - if (!expectedRecord.sameElements(record)) { - // Records should match. If they don't, the record extractor is faulty in terms of indexing support.. - throw new RuntimeException( - s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " + - "Please, use 'enable_indexes = false'. " + - s"File: $filePath, offset: $offset" - ) - } else { - logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset") + // Getting the first record, if available + extractor.next() + offset = extractor.offset // Saving offset to jump to later + + if (extractor.hasNext) { + // Getting the second record, if available + record = extractor.next() // Saving the record to check later + + dataStream.close() + headerStream.close() + + // Getting new streams and record extractor that points directly to the second record + val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config) + try { + val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2) + + extractorOpt2.foreach { extractor2 => + if (!extractor2.hasNext) { + // If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support. + throw new RuntimeException( + s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " + + "Please, use 'enable_indexes = false'. " + + s"File: $filePath, offset: $offset" + ) + } + + // Getting the second record from the extractor pointing to the second record offset at the start. + val expectedRecord = extractor2.next() + + if (!expectedRecord.sameElements(record)) { + // Records should match. If they don't, the record extractor is faulty in terms of indexing support.. + throw new RuntimeException( + s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " + + "Please, use 'enable_indexes = false'. " + + s"File: $filePath, offset: $offset" + ) + } else { + logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset") + } + } + } finally { + dataStream2.close() + headerStream2.close() } - dataStream2.close() - headerStream2.close() } } } + } finally { + dataStream.close() + headerStream.close() } } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala index a9489df3e..b6d3cf653 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala @@ -18,6 +18,8 @@ package za.co.absa.cobrix.spark.cobol.source.streaming import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} +import java.io.IOException + class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) { val bytesInMegabyte: Int = 1048576 @@ -38,6 +40,7 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff private var bufferConitainBytes = 0 private var bytesRead = 0 + @throws[IOException] def close(): Unit = { if (!isStreamClosed) { in.close() diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index 5a1dbd8a6..cc8963552 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -22,17 +22,19 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import org.apache.hadoop.fs.ContentSummary import za.co.absa.cobrix.cobol.reader.common.Constants +import java.io.IOException + /** - * This class provides methods for streaming bytes from an Hadoop file. + * This class provides methods for streaming bytes from a Hadoop file. * * It is stateful, which means that it stores the offset until which the file has been consumed. * - * Instances of this class are not reusable, i.e. once the file is fully read it can neither be reopened nor can other + * Instances of this class are not reusable, i.e., once the file is fully read, it can neither be reopened nor can another * file be consumed. * - * @param filePath String contained the fully qualified path to the file. - * @param fileSystem Underlying FileSystem point of access. - * @throws IllegalArgumentException in case the file is not found in the underlying file system. + * @param filePath String containing the fully qualified path to the file. + * @param fileSystem Underlying Hadoop file system. + * @note This class is not thread-safe and should only be accessed from a single thread */ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream { @@ -40,10 +42,14 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = private var byteIndex = startOffset - // Use a buffer to read the data from Hadoop in big chunks - private var bufferedStream = new BufferedFSDataInputStream(getHadoopPath(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes) + // This ensures that the file is never opened if the stream is never used. This serves two purposes: + // - Safety: ensures that unused streams are closed. + // - Performance: prevents time being spent on opening unused files. + // Note: Since we are working with a network file system, opening a file is a very expensive operation. + private var wasOpened = false + private var bufferedStream: BufferedFSDataInputStream = _ - private val fileSize = getHadoopFileSize(getHadoopPath(filePath)) + private lazy val fileSize = getHadoopFileSize(new Path(filePath)) override def inputFileName: String = filePath @@ -54,18 +60,20 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = override def offset: Long = byteIndex /** - * Retrieves a given number of bytes. + * Retrieves a given number of bytes from the file stream. * * One of three situations is possible: * - * 1. There's enough data to be read, thus, the resulting array's length will be exactly ''numberOfBytes''. - * 2. There's not enough data but at least some, thus, the resulting array's length will be the number of available bytes. - * 3. The end of the file was already reached, in which case the resulting array will be empty. + * 1. There's enough data to be read, thus, the resulting array's length will be exactly `numberOfBytes`. + * 2. There's not enough data but at least some bytes are available, thus, the resulting array's length will be less than requested. + * 3. The end of the file was already reached or the stream is closed, in which case the resulting array will be empty. * - * @param numberOfBytes - * @return + * @param numberOfBytes The number of bytes to read from the stream + * @return An array containing the requested bytes, or fewer bytes if end of stream is reached, or empty array if no more data */ + @throws[IOException] override def next(numberOfBytes: Int): Array[Byte] = { + ensureOpened() val actualBytesToRead = if (maximumBytes > 0) { Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt } else { @@ -104,7 +112,9 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = } } + @throws[IOException] override def close(): Unit = { + wasOpened = true if (bufferedStream != null && !bufferedStream.isClosed) { bufferedStream.close() bufferedStream = null @@ -115,17 +125,12 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = new FileStreamer(filePath, fileSystem, startOffset, maximumBytes) } - /** - * Gets a Hadoop [[Path]] (HDFS, S3, DBFS, etc) to the file. - * - * Throws IllegalArgumentException in case the file does not exist. - */ - private def getHadoopPath(path: String) = { - val hadoopPath = new Path(path) - if (!fileSystem.exists(hadoopPath)) { - throw new IllegalArgumentException(s"File does not exist: $path") + @throws[IOException] + private def ensureOpened(): Unit = { + if (!wasOpened) { + bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes) + wasOpened = true } - hadoopPath } private def getHadoopFileSize(hadoopPath: Path): Long = { diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala index 5745f487b..0e476db64 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala @@ -16,7 +16,7 @@ package za.co.absa.cobrix.spark.cobol.source.streaming -import java.io.File +import java.io.{File, FileNotFoundException} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem @@ -43,12 +43,17 @@ class FileStreamerSpec extends AnyFlatSpec with BeforeAndAfter with Matchers { behavior of classOf[FileStreamer].getName it should "throw if file does not exist" in { - val caught = intercept[IllegalArgumentException] { - new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration())) + assertThrows[FileNotFoundException] { + val stream = new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration())) + stream.size } - assert(caught.getMessage.toLowerCase.contains("inexistent")) } + it should "not throw if the stream is never used, even if the file does not exist" in { + noException should be thrownBy { + new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration())) + } + } it should "return array of same length than expected number of bytes if enough data" in { val batchLength = 8 val iterations = 10