From ea2a460492d175a986b816c0cd1afcde85fb7c75 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 11 May 2023 20:14:29 -0400 Subject: [PATCH] Add hdfs support in python --- .github/workflows/python_build.yml | 15 ++++++++-- dev/hdfs/core-site.xml | 10 +++++++ dev/hdfs/hdfs-site.xml | 10 +++++++ dev/hdfs/start-hdfs.sh | 10 +++++++ docker-compose.yml | 14 ++++++++++ python/Cargo.toml | 2 +- python/pyproject.toml | 1 + python/tests/conftest.py | 44 ++++++++++++++++++++++++++++++ python/tests/test_fs.py | 17 ++++++++++++ 9 files changed, 120 insertions(+), 3 deletions(-) create mode 100644 dev/hdfs/core-site.xml create mode 100644 dev/hdfs/hdfs-site.xml create mode 100755 dev/hdfs/start-hdfs.sh diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index 819c56683a..c631f2c627 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -50,6 +50,11 @@ jobs: toolchain: stable override: true components: rustfmt, clippy + + - uses: actions/setup-java@v3 + with: + distribution: 'zulu' + java-version: '17' - uses: Swatinem/rust-cache@v2 @@ -57,14 +62,16 @@ jobs: run: | echo "/opt/python/cp37-cp37m/bin" >> $GITHUB_PATH + # hdfs support requires clang 3.5+, which requires an scl package on this image - name: Build and install deltalake run: | + yum install -y llvm-toolset-7 pip install virtualenv virtualenv venv source venv/bin/activate make setup # Install minimum PyArrow version - pip install -e .[pandas,devel] pyarrow==7.0.0 + scl enable llvm-toolset-7 'pip install -e .[pandas,devel] pyarrow==7.0.0' - name: Run tests run: | @@ -88,6 +95,10 @@ jobs: override: true components: rustfmt, clippy + - uses: beyondstorage/setup-hdfs@master + with: + hdfs-version: '3.3.2' + - uses: Swatinem/rust-cache@v2 - uses: actions/setup-python@v3 @@ -110,7 +121,7 @@ jobs: - name: Run tests run: | source venv/bin/activate - python -m pytest -m '((s3 or azure) and integration) or not integration' + python -m pytest -m '((s3 or azure or hdfs) and integration) or not integration' - name: Test without pandas run: | diff --git a/dev/hdfs/core-site.xml b/dev/hdfs/core-site.xml new file mode 100644 index 0000000000..048fdfe57f --- /dev/null +++ b/dev/hdfs/core-site.xml @@ -0,0 +1,10 @@ + + + fs.defaultFS + hdfs://localhost:9000 + + + fs.permissions.umask-mode + 000 + + \ No newline at end of file diff --git a/dev/hdfs/hdfs-site.xml b/dev/hdfs/hdfs-site.xml new file mode 100644 index 0000000000..b164f15739 --- /dev/null +++ b/dev/hdfs/hdfs-site.xml @@ -0,0 +1,10 @@ + + + dfs.replication + 1 + + + dfs.namenode.rpc-bind-host + 0.0.0.0 + + \ No newline at end of file diff --git a/dev/hdfs/start-hdfs.sh b/dev/hdfs/start-hdfs.sh new file mode 100755 index 0000000000..6d0821fbe9 --- /dev/null +++ b/dev/hdfs/start-hdfs.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +[ ! -d "/tmp/hadoop-hadoop/dfs/name" ] && hdfs namenode -format + +hdfs --daemon start namenode +hdfs --daemon start datanode + +hdfs dfs -chmod 777 / + +exec tail -n 1000 -f /var/log/hadoop/* \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 444f0edc15..bde5c00494 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,3 +27,17 @@ services: image: mcr.microsoft.com/azure-storage/azurite ports: - 10000:10000 + + hdfs: + image: apache/hadoop:3 + ports: + - 9000:9000 + - 9866:9866 + volumes: + - ./dev/hdfs/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml + - ./dev/hdfs/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml + - ./dev/hdfs/start-hdfs.sh:/start-hdfs.sh + entrypoint: /start-hdfs.sh + # Only run on a profile so we can use the github action for HDFS support during CI + profiles: + - hdfs diff --git a/python/Cargo.toml b/python/Cargo.toml index 3b86b1d663..fac21b675e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["azure", "gcs", "python", "datafusion", "unity-experimental"] +features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs"] [features] default = ["rustls"] diff --git a/python/pyproject.toml b/python/pyproject.toml index cf7ea7c787..957d10aeac 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -98,6 +98,7 @@ markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", "s3: marks tests as integration tests with S3 (deselect with '-m \"not s3\"')", "azure: marks tests as integration tests with Azure Blob Store", + "hdfs: marks tests as integrations tests with HDFS", "pandas: marks tests that require pandas", "pyspark: marks tests that require pyspark", ] diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 2b9e349b36..cfaab97915 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -230,3 +230,47 @@ def existing_table(tmp_path: pathlib.Path, sample_data: pa.Table): path = str(tmp_path) write_deltalake(path, sample_data) return DeltaTable(path) + + +@pytest.fixture() +def hdfs_url(monkeypatch): + url = "hdfs://localhost:9000" + + setup_commands = [ + [ + "hdfs", + "dfs", + "-mkdir", + f"{url}/deltars", + ], + [ + "hdfs", + "dfs", + "-copyFromLocal", + "../rust/tests/data/simple_table", + f"{url}/deltars/simple", + ], + ] + + try: + classpath = subprocess.run(["hadoop", "classpath"], capture_output=True).stdout + monkeypatch.setenv("CLASSPATH", classpath) + for args in setup_commands: + subprocess.run(args) + except OSError: + pytest.skip("hdfs cli not installed") + + yield url + + shutdown_commands = [ + [ + "hdfs", + "dfs", + "-rm", + "-r", + f"{url}/deltars", + ], + ] + + for args in shutdown_commands: + subprocess.run(args) diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 28f50c4393..57e16a92f5 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -254,3 +254,20 @@ def test_pickle_roundtrip(tmp_path): infos = store_pkl.get_file_info(["asd.pkl"]) assert infos[0].size > 0 + + +@pytest.mark.hdfs +@pytest.mark.integration +@pytest.mark.timeout(timeout=30, method="thread") +def test_read_hdfs(hdfs_url): + table_path = f"{hdfs_url}/deltars/simple" + handler = DeltaStorageHandler(table_path) + dt = DeltaTable(table_path) + files = dt.file_uris() + assert len(files) > 0 + for file in files: + rel_path = file[len(table_path) :] + with handler.open_input_file(rel_path) as f_: + table = pq.read_table(f_) + assert isinstance(table, pa.Table) + assert table.shape > (0, 0)