Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
headerStream: SimpleStream,
startingFileOffset: Long,
fileNumber: Int,
startingRecordIndex: Long): Iterator[Seq[Any]] =
startingRecordIndex: Long): Iterator[Seq[Any]] = {
val recordExtractorOpt = recordExtractor(startingRecordIndex, dataStream, headerStream)
if (recordExtractorOpt.isEmpty) {
headerStream.close()
}
if (cobolSchema.copybook.isHierarchical) {
new VarLenHierarchicalIterator(cobolSchema.copybook,
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream),
recordExtractorOpt,
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand All @@ -116,13 +120,14 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream),
recordExtractorOpt,
fileNumber,
startingRecordIndex,
startingFileOffset,
cobolSchema.segmentIdPrefix,
handler)
}
}

/**
* Traverses the data sequentially as fast as possible to generate record index.
Expand Down Expand Up @@ -166,12 +171,17 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
case None => false
}

val recordExtractorOpt = recordExtractor(0L, dataStream, headerStream)
if (recordExtractorOpt.isEmpty) {
headerStream.close()
}

segmentIdField match {
case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber,
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, dataStream, headerStream),
recordExtractorOpt,
inputSplitSizeRecords,
inputSplitSizeMB,
Some(copybook),
Expand All @@ -182,7 +192,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, dataStream, headerStream),
recordExtractorOpt,
inputSplitSizeRecords,
inputSplitSizeMB,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ final class VarLenNestedReader(copybookContents: Seq[String],
headerStream: SimpleStream,
startingFileOffset: Long,
fileNumber: Int,
startingRecordIndex: Long): Iterator[Row] =
startingRecordIndex: Long): Iterator[Row] = {
val recordExtractorOpt = recordExtractor(startingRecordIndex, dataStream, headerStream)
if (recordExtractorOpt.isEmpty) {
headerStream.close()
}

if (cobolSchema.copybook.isHierarchical) {
new RowIterator(
new VarLenHierarchicalIterator(cobolSchema.copybook,
dataStream,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream),
recordExtractorOpt,
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand All @@ -72,12 +77,13 @@ final class VarLenNestedReader(copybookContents: Seq[String],
dataStream,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream),
recordExtractorOpt,
fileNumber,
startingRecordIndex,
startingFileOffset,
cobolSchema.segmentIdPrefix,
new RowHandler())
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,56 +215,64 @@ private[cobol] object IndexBuilder extends Logging {
readerProperties.recordExtractor.foreach { recordExtractorClass =>
val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config)

val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream)
try {
val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream)

var offset = -1L
var record = Array[Byte]()

extractorOpt.foreach { extractor =>
if (extractor.hasNext) {
// Getting the first record, if available
extractor.next()
offset = extractor.offset // Saving offset to jump to later
var offset = -1L
var record = Array[Byte]()

extractorOpt.foreach { extractor =>
if (extractor.hasNext) {
// Getting the second record, if available
record = extractor.next() // Saving the record to check later

dataStream.close()
headerStream.close()

// Getting new streams and record extractor that points directly to the second record
val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)

extractorOpt2.foreach { extractor2 =>
if (!extractor2.hasNext) {
// If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
throw new RuntimeException(
s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
}

// Getting the second record from the extractor pointing to the second record offset at the start.
val expectedRecord = extractor2.next()

if (!expectedRecord.sameElements(record)) {
// Records should match. If they don't, the record extractor is faulty in terms of indexing support..
throw new RuntimeException(
s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
} else {
logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset")
// Getting the first record, if available
extractor.next()
offset = extractor.offset // Saving offset to jump to later

if (extractor.hasNext) {
// Getting the second record, if available
record = extractor.next() // Saving the record to check later

dataStream.close()
headerStream.close()

// Getting new streams and record extractor that points directly to the second record
val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
try {
val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)

extractorOpt2.foreach { extractor2 =>
if (!extractor2.hasNext) {
// If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
throw new RuntimeException(
s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
}

// Getting the second record from the extractor pointing to the second record offset at the start.
val expectedRecord = extractor2.next()

if (!expectedRecord.sameElements(record)) {
// Records should match. If they don't, the record extractor is faulty in terms of indexing support..
throw new RuntimeException(
s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
} else {
logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset")
}
}
} finally {
dataStream2.close()
headerStream2.close()
}
dataStream2.close()
headerStream2.close()
}
}
}
} finally {
dataStream.close()
headerStream.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package za.co.absa.cobrix.spark.cobol.source.streaming

import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}

import java.io.IOException

class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) {
val bytesInMegabyte: Int = 1048576

Expand All @@ -38,6 +40,7 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff
private var bufferConitainBytes = 0
private var bytesRead = 0

@throws[IOException]
def close(): Unit = {
if (!isStreamClosed) {
in.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,34 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import org.apache.hadoop.fs.ContentSummary
import za.co.absa.cobrix.cobol.reader.common.Constants

import java.io.IOException

/**
* This class provides methods for streaming bytes from an Hadoop file.
* This class provides methods for streaming bytes from a Hadoop file.
*
* It is stateful, which means that it stores the offset until which the file has been consumed.
*
* Instances of this class are not reusable, i.e. once the file is fully read it can neither be reopened nor can other
* Instances of this class are not reusable, i.e., once the file is fully read, it can neither be reopened nor can another
* file be consumed.
*
* @param filePath String contained the fully qualified path to the file.
* @param fileSystem Underlying FileSystem point of access.
* @throws IllegalArgumentException in case the file is not found in the underlying file system.
* @param filePath String containing the fully qualified path to the file.
* @param fileSystem Underlying Hadoop file system.
* @note This class is not thread-safe and should only be accessed from a single thread
*/
class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {

private val logger = Logger.getLogger(FileStreamer.this.getClass)

private var byteIndex = startOffset

// Use a buffer to read the data from Hadoop in big chunks
private var bufferedStream = new BufferedFSDataInputStream(getHadoopPath(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
// This ensures that the file is never opened if the stream is never used. This serves two purposes:
// - Safety: ensures that unused streams are closed.
// - Performance: prevents time being spent on opening unused files.
// Note: Since we are working with a network file system, opening a file is a very expensive operation.
private var wasOpened = false
private var bufferedStream: BufferedFSDataInputStream = _

private val fileSize = getHadoopFileSize(getHadoopPath(filePath))
private lazy val fileSize = getHadoopFileSize(new Path(filePath))

override def inputFileName: String = filePath

Expand All @@ -54,18 +60,20 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
override def offset: Long = byteIndex

/**
* Retrieves a given number of bytes.
* Retrieves a given number of bytes from the file stream.
*
* One of three situations is possible:
*
* 1. There's enough data to be read, thus, the resulting array's length will be exactly ''numberOfBytes''.
* 2. There's not enough data but at least some, thus, the resulting array's length will be the number of available bytes.
* 3. The end of the file was already reached, in which case the resulting array will be empty.
* 1. There's enough data to be read, thus, the resulting array's length will be exactly `numberOfBytes`.
* 2. There's not enough data but at least some bytes are available, thus, the resulting array's length will be less than requested.
* 3. The end of the file was already reached or the stream is closed, in which case the resulting array will be empty.
*
* @param numberOfBytes
* @return
* @param numberOfBytes The number of bytes to read from the stream
* @return An array containing the requested bytes, or fewer bytes if end of stream is reached, or empty array if no more data
*/
@throws[IOException]
override def next(numberOfBytes: Int): Array[Byte] = {
ensureOpened()
val actualBytesToRead = if (maximumBytes > 0) {
Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt
} else {
Expand Down Expand Up @@ -104,7 +112,9 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
}
}

@throws[IOException]
override def close(): Unit = {
wasOpened = true
if (bufferedStream != null && !bufferedStream.isClosed) {
bufferedStream.close()
bufferedStream = null
Expand All @@ -115,17 +125,12 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
}

/**
* Gets a Hadoop [[Path]] (HDFS, S3, DBFS, etc) to the file.
*
* Throws IllegalArgumentException in case the file does not exist.
*/
private def getHadoopPath(path: String) = {
val hadoopPath = new Path(path)
if (!fileSystem.exists(hadoopPath)) {
throw new IllegalArgumentException(s"File does not exist: $path")
@throws[IOException]
private def ensureOpened(): Unit = {
if (!wasOpened) {
bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
wasOpened = true
}
hadoopPath
}

private def getHadoopFileSize(hadoopPath: Path): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.cobrix.spark.cobol.source.streaming

import java.io.File
import java.io.{File, FileNotFoundException}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
Expand All @@ -43,12 +43,17 @@ class FileStreamerSpec extends AnyFlatSpec with BeforeAndAfter with Matchers {
behavior of classOf[FileStreamer].getName

it should "throw if file does not exist" in {
val caught = intercept[IllegalArgumentException] {
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
assertThrows[FileNotFoundException] {
val stream = new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
stream.size
}
assert(caught.getMessage.toLowerCase.contains("inexistent"))
}

it should "not throw if the stream is never used, even if the file does not exist" in {
noException should be thrownBy {
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
}
}
it should "return array of same length than expected number of bytes if enough data" in {
val batchLength = 8
val iterations = 10
Expand Down
Loading