Skip to content

Commit

Permalink
feat: Support GCS filesystem for bytewax engine
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
  • Loading branch information
sudohainguyen committed Sep 28, 2023
1 parent 6a728fe commit a92e56d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Expand Up @@ -2,7 +2,6 @@

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
Expand All @@ -29,7 +28,16 @@ def __init__(
self._run_dataflow()

def process_path(self, path):
fs = s3fs.S3FileSystem()
if path.startswith("s3://"):
import s3fs

fs = s3fs.S3FileSystem()
elif path.startswith("gs://"):
import gcsfs

fs = gcsfs.GCSFileSystem()
else:
raise NotImplementedError(f"Unsupported path: {path}")
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -91,6 +91,7 @@
"google-cloud-datastore>=2.1.0,<3",
"google-cloud-storage>=1.34.0,<3",
"google-cloud-bigtable>=2.11.0,<3",
"gcsfs>=2023.3.0,<2024.0.0",
]

REDIS_REQUIRED = [
Expand Down

0 comments on commit a92e56d

Please sign in to comment.