Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.sedona.sql.datasources.osmpbf.{HeaderFinder, StartEndStream}
import org.apache.sedona.sql.datasources.osmpbf.iterators.PbfIterator
import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity
import org.apache.spark.SerializableWritable
import org.apache.spark.{SerializableWritable, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -46,16 +46,39 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
val f = fs.open(status.getPath)

val offset = findOffset(fs, status, file.start)

f.seek(file.start + offset)
if (offset < 0) {
Iterator.empty
} else {
val f = fs.open(status.getPath)
f.seek(file.start + offset)

new PbfIterator(new StartEndStream(f, (file.length - offset) + HEADER_SIZE_LENGTH)).map(
record => {
resolveEntity(record, requiredSchema)
})
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => f.close()))

val iter =
new PbfIterator(new StartEndStream(f, (file.length - offset) + HEADER_SIZE_LENGTH)).map(
record => {
resolveEntity(record, requiredSchema)
})

new Iterator[InternalRow] {
private var closed = false
private def closeIfNeeded(): Unit = {
if (!closed) {
closed = true
f.close()
}
}
override def hasNext: Boolean = {
val has = iter.hasNext
if (!has) closeIfNeeded()
has
}
override def next(): InternalRow = iter.next()
}
}
}

def findOffset(fs: FileSystem, status: FileStatus, start: Long): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,19 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}

it("should handle file splits where last partition has no block boundary (GH-2781)") {
// Force small splits so the last partition starts inside the final PBF block,
// where no OSMData header exists. Without the fix, this causes EOFException.
withConf(Map("spark.sql.files.maxPartitionBytes" -> "100000")) {
val df = sparkSession.read
.format("osmpbf")
.load(monacoPath)

assert(df.rdd.getNumPartitions > 1)
assert(df.count() > 0)
}
}

it("should not lose precision due to float to double conversion") {
// Test for accuracy loss bug in NodeExtractor and DenseNodeExtractor
val node = sparkSession.read
Expand Down
Loading