diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index bf5229303a..272b190c47 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -2,7 +2,6 @@ import pyarrow as pa import pyarrow.parquet as pq -import s3fs from bytewax.dataflow import Dataflow # type: ignore from bytewax.execution import cluster_main from bytewax.inputs import ManualInputConfig, distribute @@ -29,7 +28,16 @@ def __init__( self._run_dataflow() def process_path(self, path): - fs = s3fs.S3FileSystem() + if path.startswith("s3://"): + import s3fs + + fs = s3fs.S3FileSystem() + elif path.startswith("gs://"): + import gcsfs + + fs = gcsfs.GCSFileSystem() + else: + raise NotImplementedError(f"Unsupported path: {path}") dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False) batches = [] for fragment in dataset.fragments: diff --git a/setup.py b/setup.py index f7b1ff0417..16a6462cc2 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,7 @@ "google-cloud-datastore>=2.1.0,<3", "google-cloud-storage>=1.34.0,<3", "google-cloud-bigtable>=2.11.0,<3", + "gcsfs>=2023.3.0,<2024.0.0", ] REDIS_REQUIRED = [