From 916788ef3ecbd975dda9557c29f8b1a9a52e9688 Mon Sep 17 00:00:00 2001 From: Chimey Rock Date: Fri, 26 Sep 2025 10:52:30 +0700 Subject: [PATCH 1/2] feat: support hdfs staging for Spark offline store Signed-off-by: Chimey Rock --- .../contrib/spark_offline_store/spark.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index b81ca5ab1bb..b927b954394 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -442,6 +442,7 @@ def persist( self.to_spark_df().write.format(file_format).saveAsTable(table_name) else: self.to_spark_df().createOrReplaceTempView(table_name) + def _has_remote_warehouse_in_config(self) -> bool: """ @@ -497,15 +498,35 @@ def to_remote_storage(self) -> List[str]: return aws_utils.list_s3_files( self._config.offline_store.region, output_uri ) - + elif self._config.offline_store.staging_location.startswith("hdfs://"): + output_uri = os.path.join( + self._config.offline_store.staging_location, str(uuid.uuid4()) + ) + sdf.write.parquet(output_uri) + spark_session = get_spark_session_or_start_new_with_repoconfig( + store_config=self._config.offline_store + ) + return self._list_hdfs_files(spark_session, output_uri) else: raise NotImplementedError( - "to_remote_storage is only implemented for file:// and s3:// uri schemes" + "to_remote_storage is only implemented for file://, s3:// and hdfs:// uri schemes" ) else: raise NotImplementedError() + def _list_hdfs_files(self, spark_session: SparkSession, uri: str) -> List[str]: + jvm = spark_session._jvm + conf = spark_session._jsc.hadoopConfiguration() + path = jvm.org.apache.hadoop.fs.Path(uri) + fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf) + statuses = fs.listStatus(path) + files = [] + for f in statuses: + if f.isFile(): + files.append(f.getPath().toString()) + return files + @property def metadata(self) -> Optional[RetrievalMetadata]: """ From df17b70c78a52c285190ce340a2bac369565dd2b Mon Sep 17 00:00:00 2001 From: Chimey Rock Date: Fri, 26 Sep 2025 15:56:39 +0700 Subject: [PATCH 2/2] fix: add safety check for _jvm and _jsc in _list_hdfs_files Signed-off-by: Chimey Rock --- .../contrib/spark_offline_store/spark.py | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index b927b954394..f1ba4baa939 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -442,7 +442,6 @@ def persist( self.to_spark_df().write.format(file_format).saveAsTable(table_name) else: self.to_spark_df().createOrReplaceTempView(table_name) - def _has_remote_warehouse_in_config(self) -> bool: """ @@ -506,7 +505,7 @@ def to_remote_storage(self) -> List[str]: spark_session = get_spark_session_or_start_new_with_repoconfig( store_config=self._config.offline_store ) - return self._list_hdfs_files(spark_session, output_uri) + return _list_hdfs_files(spark_session, output_uri) else: raise NotImplementedError( "to_remote_storage is only implemented for file://, s3:// and hdfs:// uri schemes" @@ -515,18 +514,6 @@ def to_remote_storage(self) -> List[str]: else: raise NotImplementedError() - def _list_hdfs_files(self, spark_session: SparkSession, uri: str) -> List[str]: - jvm = spark_session._jvm - conf = spark_session._jsc.hadoopConfiguration() - path = jvm.org.apache.hadoop.fs.Path(uri) - fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf) - statuses = fs.listStatus(path) - files = [] - for f in statuses: - if f.isFile(): - files.append(f.getPath().toString()) - return files - @property def metadata(self) -> Optional[RetrievalMetadata]: """ @@ -650,6 +637,22 @@ def _list_files_in_folder(folder): return files +def _list_hdfs_files(spark_session: SparkSession, uri: str) -> List[str]: + jvm = spark_session._jvm + jsc = spark_session._jsc + if jvm is None or jsc is None: + raise RuntimeError("Spark JVM or JavaSparkContext is not available") + conf = jsc.hadoopConfiguration() + path = jvm.org.apache.hadoop.fs.Path(uri) + fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf) + statuses = fs.listStatus(path) + files = [] + for f in statuses: + if f.isFile(): + files.append(f.getPath().toString()) + return files + + def _cast_data_frame( df_new: pyspark.sql.DataFrame, df_existing: pyspark.sql.DataFrame ) -> pyspark.sql.DataFrame: