-
Notifications
You must be signed in to change notification settings - Fork 748
[SEDONA-2360] Support fetching libpostal model data from HDFS/object store #2637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
7d39f23 to
18190fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds support for resolving libpostal model data directories from remote Hadoop-compatible filesystems by copying them into a per-node local cache before initializing jpostal.
Changes:
- Introduces
HadoopFileSystemUtilsto copy files/directories from remote FS to local, and reuses it from GeoPackage utilities. - Adds
LibPostalDataLoaderto detect remote URIs, download to a hashed local cache directory, and guard concurrent downloads with per-key locks. - Updates libpostal initialization and documentation; adds unit tests using
MiniDFSCluster.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala | New shared Hadoop FS → local copy helpers for files and directory trees. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala | New remote libpostal dataDir resolver with local caching and concurrency control. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala | Resolves dataDir via loader and disables jpostal download for remote URIs. |
| spark/common/src/main/scala/org/apache/sedona/sql/datasources/geopackage/connection/FileSystemUtils.scala | Refactors GeoPackage copy utility to delegate to shared Hadoop FS helper. |
| spark/common/src/test/scala/org/apache/sedona/sql/HadoopFileSystemUtilsTest.scala | New tests for local/remote detection and HDFS copy behaviors. |
| spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala | New tests for remote-path detection, caching behavior, and concurrent access. |
| docs/api/sql/Function.md | Documents remote URI support for libpostal dataDir configuration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Outdated
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Outdated
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Outdated
Show resolved
Hide resolved
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala
Outdated
Show resolved
Hide resolved
18190fd to
8464efc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Show resolved
Hide resolved
spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Show resolved
Hide resolved
.../common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
Show resolved
Hide resolved
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala
Show resolved
Hide resolved
| val futures = (1 to numThreads).map { _ => | ||
| executor.submit(new Runnable { | ||
| override def run(): Unit = { | ||
| try { | ||
| // All threads wait here until all are ready, then start simultaneously | ||
| barrier.await(30, TimeUnit.SECONDS) | ||
| val localPath = LibPostalDataLoader.resolveDataDir(remotePath) | ||
| resultsLock.synchronized { | ||
| results += localPath | ||
| } | ||
| } catch { | ||
| case e: Throwable => | ||
| resultsLock.synchronized { | ||
| errors += e | ||
| } | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // Wait for all threads to complete | ||
| futures.foreach(_.get(60, TimeUnit.SECONDS)) | ||
| executor.shutdown() | ||
|
|
||
| // No errors should have occurred | ||
| errors shouldBe empty | ||
|
|
||
| // All threads should have resolved to the same local path | ||
| results.size shouldBe numThreads | ||
| results.distinct.size shouldBe 1 | ||
|
|
||
| val localPath = results.head | ||
|
|
||
| // Verify the data is intact | ||
| for (subdir <- subdirs) { | ||
| val localSubdir = new File(localPath, subdir) | ||
| localSubdir.exists() shouldBe true | ||
| new File(localSubdir, "model.dat").exists() shouldBe true | ||
| } | ||
|
|
||
| // Exactly one marker file should exist | ||
| new File(localPath, ".sedona_libpostal_complete").exists() shouldBe true | ||
|
|
||
| // Clean up | ||
| deleteDirectory(new File(localPath)) |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the test throws before executor.shutdown() (e.g., a get() timeout or unexpected exception), the thread pool can leak and destabilize subsequent tests. Wrap the executor lifecycle in a try/finally that always calls shutdownNow() (or shutdown() + awaitTermination) to ensure cleanup even on failure.
| val futures = (1 to numThreads).map { _ => | |
| executor.submit(new Runnable { | |
| override def run(): Unit = { | |
| try { | |
| // All threads wait here until all are ready, then start simultaneously | |
| barrier.await(30, TimeUnit.SECONDS) | |
| val localPath = LibPostalDataLoader.resolveDataDir(remotePath) | |
| resultsLock.synchronized { | |
| results += localPath | |
| } | |
| } catch { | |
| case e: Throwable => | |
| resultsLock.synchronized { | |
| errors += e | |
| } | |
| } | |
| } | |
| }) | |
| } | |
| // Wait for all threads to complete | |
| futures.foreach(_.get(60, TimeUnit.SECONDS)) | |
| executor.shutdown() | |
| // No errors should have occurred | |
| errors shouldBe empty | |
| // All threads should have resolved to the same local path | |
| results.size shouldBe numThreads | |
| results.distinct.size shouldBe 1 | |
| val localPath = results.head | |
| // Verify the data is intact | |
| for (subdir <- subdirs) { | |
| val localSubdir = new File(localPath, subdir) | |
| localSubdir.exists() shouldBe true | |
| new File(localSubdir, "model.dat").exists() shouldBe true | |
| } | |
| // Exactly one marker file should exist | |
| new File(localPath, ".sedona_libpostal_complete").exists() shouldBe true | |
| // Clean up | |
| deleteDirectory(new File(localPath)) | |
| try { | |
| val futures = (1 to numThreads).map { _ => | |
| executor.submit(new Runnable { | |
| override def run(): Unit = { | |
| try { | |
| // All threads wait here until all are ready, then start simultaneously | |
| barrier.await(30, TimeUnit.SECONDS) | |
| val localPath = LibPostalDataLoader.resolveDataDir(remotePath) | |
| resultsLock.synchronized { | |
| results += localPath | |
| } | |
| } catch { | |
| case e: Throwable => | |
| resultsLock.synchronized { | |
| errors += e | |
| } | |
| } | |
| } | |
| }) | |
| } | |
| // Wait for all threads to complete | |
| futures.foreach(_.get(60, TimeUnit.SECONDS)) | |
| executor.shutdown() | |
| // No errors should have occurred | |
| errors shouldBe empty | |
| // All threads should have resolved to the same local path | |
| results.size shouldBe numThreads | |
| results.distinct.size shouldBe 1 | |
| val localPath = results.head | |
| // Verify the data is intact | |
| for (subdir <- subdirs) { | |
| val localSubdir = new File(localPath, subdir) | |
| localSubdir.exists() shouldBe true | |
| new File(localSubdir, "model.dat").exists() shouldBe true | |
| } | |
| // Exactly one marker file should exist | |
| new File(localPath, ".sedona_libpostal_complete").exists() shouldBe true | |
| // Clean up | |
| deleteDirectory(new File(localPath)) | |
| } finally { | |
| executor.shutdownNow() | |
| } |
|
|
||
| private def deleteDirectory(dir: File): Unit = { | ||
| if (dir.isDirectory) { | ||
| dir.listFiles().foreach(deleteDirectory) |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dir.listFiles() can return null (I/O error, permissions), which would throw an NPE in the test cleanup path and potentially mask the real failure. Consider null-guarding (e.g., Option(dir.listFiles()).getOrElse(Array.empty).foreach(...)) similar to the approach used in LibPostalDataLoaderTest.
| dir.listFiles().foreach(deleteDirectory) | |
| Option(dir.listFiles()).getOrElse(Array.empty).foreach(deleteDirectory) |
8464efc to
bea9f8c
Compare
…store - Add shared HadoopFileSystemUtils with copyFileToLocal and copyDirectoryToLocal - Refactor GeoPackage FileSystemUtils to delegate to shared utility - Add LibPostalDataLoader to resolve remote dataDir (HDFS, S3, GCS, ABFS) to local cache - Update LibPostalUtils to use LibPostalDataLoader for remote path resolution - Disable jpostal auto-download when data is fetched from remote store - Add tests for HadoopFileSystemUtils and LibPostalDataLoader using MiniDFSCluster - Update docs for ExpandAddress and ParseAddress with remote dataDir instructions
bea9f8c to
6f7d89d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (localCacheDir.exists()) { | ||
| if (!localCacheDir.isDirectory) { | ||
| throw new IllegalStateException( | ||
| s"Libpostal local cache path exists but is not a directory: ${localCacheDir.getAbsolutePath}") | ||
| } | ||
| } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) { | ||
| throw new IllegalStateException( | ||
| s"Failed to create libpostal local cache directory at ${localCacheDir.getAbsolutePath}") | ||
| } | ||
|
|
||
| validateCacheDirectory(localCacheDir) | ||
|
|
||
| val hadoopConf = | ||
| try { | ||
| SparkHadoopUtil.get.conf | ||
| } catch { | ||
| case _: Exception => new Configuration() | ||
| } | ||
| val remoteHadoopPath = new Path(remotePath) | ||
|
|
||
| try { | ||
| HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) | ||
|
|
||
| // Write marker file to indicate successful completion | ||
| markerFile.createNewFile() | ||
|
|
||
| logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) | ||
| } finally { | ||
| // Always remove the lock entry to avoid unbounded growth. | ||
| // Use value-based remove to avoid interfering with any updated mapping. | ||
| locks.remove(cacheKey, lock) | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a previous attempt left a partially-copied cache directory (marker missing), this implementation copies into the existing directory without clearing it. That can leave stale/extra files from an older/failed download, producing a mixed model directory even though the new marker gets written. Prefer copying into a fresh temp directory and atomically renaming into place, or deleting/emptying localCacheDir when markerFile is missing before starting a new copy.
| // POSIX permissions are best-effort — on non-POSIX systems we fall back to | ||
| // java.io.File permission methods. |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validateCacheDirectory verifies the base dir is not a symlink, but permissions are only tightened on cacheDir, not on the parent sedona-libpostal-cache directory (which is the predictable shared entry point under java.io.tmpdir). To reduce cache poisoning risk, also apply restrictive permissions to baseCacheDir (and consider best-effort tightening on newly created subdirectories/files after the copy, since copyDirectoryToLocal will create content with default permissions/umask).
| // POSIX permissions are best-effort — on non-POSIX systems we fall back to | |
| // java.io.File permission methods. | |
| // First tighten permissions on the predictable shared base directory, | |
| // then on the hash-specific cache directory itself. POSIX permissions are | |
| // best-effort — on non-POSIX systems we fall back to java.io.File methods. | |
| setRestrictivePermissions(baseCacheDir) |
| out.writeBytes(s"data for $subdir") | ||
| out.close() |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test code manually closes HDFS output streams; if writeBytes throws, the stream may leak and make failures harder to diagnose. Wrap stream usage in a try/finally (or equivalent resource-management helper) to guarantee close() runs.
| out.writeBytes(s"data for $subdir") | |
| out.close() | |
| try { | |
| out.writeBytes(s"data for $subdir") | |
| } finally { | |
| out.close() | |
| } |
Did you read the Contributor Guide?
Is this PR related to a ticket?
[SEDONA-XXX] my subject. Closes Enhancements to libpostal integration: Fetch model from HDFS/object store #2360What changes were proposed in this PR?
This PR enables libpostal model data (
spark.sedona.libpostal.dataDir) to be loaded from remote filesystems such as HDFS, S3 (s3a://), GCS (gs://), and ABFS (abfs://), in addition to local paths.Problem
jpostal requires the ~2 GB libpostal model data to reside on the local filesystem. In cloud deployments, users must manually pre-install the data on every executor node, which is operationally cumbersome.
Solution
When the configured
dataDirpoints to a remote URI, the data is automatically downloaded to a per-node local cache directory (java.io.tmpdir/sedona-libpostal-cache/<hash>) before initializing jpostal. Key design decisions:HadoopFileSystemUtilswithisLocalFileSystem,copyFileToLocal, andcopyDirectoryToLocal. The existing GeoPackageFileSystemUtilsis refactored to delegate to this shared utility — zero changes to its 9 call sites.LibPostalDataLoader: New object that resolves remote paths to cached local directories. Uses SHA-256 hash of the remote URI as cache key, and a.sedona_libpostal_completemarker file to detect partial copies.ConcurrentHashMap-based per-cache-key locking with double-checked locking prevents concurrent downloads of the same data. Lock entries are cleaned up after successful copy.SparkHadoopUtil.get.conf(with fallback tonew Configuration()) to inheritspark.hadoop.*config (e.g., S3 credentials) from the Spark session.Files changed
spark/common/.../utils/HadoopFileSystemUtils.scalaspark/common/.../expressions/LibPostalDataLoader.scalaspark/common/.../expressions/LibPostalUtils.scalaLibPostalDataLoader.resolveDataDir()before jpostal initspark/common/.../geopackage/connection/FileSystemUtils.scalaHadoopFileSystemUtilsdocs/api/sql/Function.mdExpandAddressandParseAddressspark/common/.../sql/HadoopFileSystemUtilsTest.scalaspark/common/.../sql/LibPostalDataLoaderTest.scalaHow was this patch tested?
isLocalFileSystem— local paths,file://URIscopyFileToLocal— local passthrough, HDFS round-tripcopyDirectoryToLocal— recursive directory tree, error handlingisRemotePath— 12 URI schemes (local, relative,file://,hdfs://,s3a://,s3://,gs://,abfs://,wasb://, empty, Windows)resolveDataDir— local unchanged, HDFS round-trip with subdirectoriesCyclicBarrierstress testMiniDFSClusterfor real HDFS operationsmvn test -pl spark/common -Dspark=3.5 -Dscala=2.12 -Dtest=none -DfailIfNoTests=false -DwildcardSuites="org.apache.sedona.sql.HadoopFileSystemUtilsTest,org.apache.sedona.sql.LibPostalDataLoaderTest"Did this PR include necessary documentation updates?
!!!Noteblocks to bothST_ExpandAddressandST_ParseAddressindocs/api/sql/Function.mddocumentingspark.sedona.libpostal.dataDirsupport for remote URIs (hdfs://,s3a://,gs://,abfs://).