diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala index 580a9572e6f..37d009d2601 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.sedona.sql.datasources.osmpbf.{HeaderFinder, StartEndStream} import org.apache.sedona.sql.datasources.osmpbf.iterators.PbfIterator import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity -import org.apache.spark.SerializableWritable +import org.apache.spark.{SerializableWritable, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -46,16 +46,39 @@ case class OsmPartitionReader( val path = new Path(new URI(file.filePath.toString())) val fs = path.getFileSystem(broadcastedHadoopConf.value.value) val status = fs.getFileStatus(path) - val f = fs.open(status.getPath) val offset = findOffset(fs, status, file.start) - f.seek(file.start + offset) + if (offset < 0) { + Iterator.empty + } else { + val f = fs.open(status.getPath) + f.seek(file.start + offset) - new PbfIterator(new StartEndStream(f, (file.length - offset) + HEADER_SIZE_LENGTH)).map( - record => { - resolveEntity(record, requiredSchema) - }) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => f.close())) + + val iter = + new PbfIterator(new StartEndStream(f, (file.length - offset) + HEADER_SIZE_LENGTH)).map( + record => { + resolveEntity(record, requiredSchema) + }) + + new Iterator[InternalRow] { + private var closed = false + private def closeIfNeeded(): Unit = { + if (!closed) { + closed = true + f.close() + } + } + override def hasNext: Boolean = { + val has = iter.hasNext + if (!has) closeIfNeeded() + has + } + override def next(): InternalRow = iter.next() + } + } } def findOffset(fs: FileSystem, status: FileStatus, start: Long): Long = { diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala index 09c083b3457..5c82acf51d7 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala @@ -339,6 +339,19 @@ class OsmReaderTest extends TestBaseScala with Matchers { fieldNames should contain("visible") } + it("should handle file splits where last partition has no block boundary (GH-2781)") { + // Force small splits so the last partition starts inside the final PBF block, + // where no OSMData header exists. Without the fix, this causes EOFException. + withConf(Map("spark.sql.files.maxPartitionBytes" -> "100000")) { + val df = sparkSession.read + .format("osmpbf") + .load(monacoPath) + + assert(df.rdd.getNumPartitions > 1) + assert(df.count() > 0) + } + } + it("should not lose precision due to float to double conversion") { // Test for accuracy loss bug in NodeExtractor and DenseNodeExtractor val node = sparkSession.read