Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add hdfs support in python #1359

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,28 @@ jobs:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '17'

- uses: Swatinem/rust-cache@v2

- name: Enable manylinux Python targets
run: |
echo "/opt/python/cp37-cp37m/bin" >> $GITHUB_PATH

# hdfs support requires clang 3.5+, which requires an scl package on this image
- name: Build and install deltalake
run: |
yum install -y llvm-toolset-7
pip install virtualenv
virtualenv venv
source venv/bin/activate
make setup
# Install minimum PyArrow version
pip install -e .[pandas,devel] pyarrow==7.0.0
scl enable llvm-toolset-7 'pip install -e .[pandas,devel] pyarrow==7.0.0'

- name: Run tests
run: |
Expand All @@ -88,6 +95,10 @@ jobs:
override: true
components: rustfmt, clippy

- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.2'

- uses: Swatinem/rust-cache@v2

- uses: actions/setup-python@v3
Expand All @@ -110,7 +121,7 @@ jobs:
- name: Run tests
run: |
source venv/bin/activate
python -m pytest -m '((s3 or azure) and integration) or not integration'
python -m pytest -m '((s3 or azure or hdfs) and integration) or not integration'

- name: Test without pandas
run: |
Expand Down
10 changes: 10 additions & 0 deletions dev/hdfs/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>000</value>
</property>
</configuration>
10 changes: 10 additions & 0 deletions dev/hdfs/hdfs-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value>0.0.0.0</value>
</property>
</configuration>
10 changes: 10 additions & 0 deletions dev/hdfs/start-hdfs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

[ ! -d "/tmp/hadoop-hadoop/dfs/name" ] && hdfs namenode -format

hdfs --daemon start namenode
hdfs --daemon start datanode

hdfs dfs -chmod 777 /

exec tail -n 1000 -f /var/log/hadoop/*
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,17 @@ services:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- 10000:10000

hdfs:
image: apache/hadoop:3
ports:
- 9000:9000
- 9866:9866
volumes:
- ./dev/hdfs/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml
- ./dev/hdfs/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml
- ./dev/hdfs/start-hdfs.sh:/start-hdfs.sh
entrypoint: /start-hdfs.sh
# Only run on a profile so we can use the github action for HDFS support during CI
profiles:
- hdfs
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["azure", "gcs", "python", "datafusion", "unity-experimental"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this an optional feature of the Python library, since it has that extra java dependency?

You can add below:

hdfs = ["deltalake/hdfs"]

And then in Python release add it to the feature flags for release:

FEATURES_FLAG: --no-default-features --features native-tls


[features]
default = ["rustls"]
Expand Down
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ markers = [
"integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
"s3: marks tests as integration tests with S3 (deselect with '-m \"not s3\"')",
"azure: marks tests as integration tests with Azure Blob Store",
"hdfs: marks tests as integrations tests with HDFS",
"pandas: marks tests that require pandas",
"pyspark: marks tests that require pyspark",
]
44 changes: 44 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,47 @@ def existing_table(tmp_path: pathlib.Path, sample_data: pa.Table):
path = str(tmp_path)
write_deltalake(path, sample_data)
return DeltaTable(path)


@pytest.fixture()
def hdfs_url(monkeypatch):
url = "hdfs://localhost:9000"

setup_commands = [
[
"hdfs",
"dfs",
"-mkdir",
f"{url}/deltars",
],
[
"hdfs",
"dfs",
"-copyFromLocal",
"../rust/tests/data/simple_table",
f"{url}/deltars/simple",
],
]

try:
classpath = subprocess.run(["hadoop", "classpath"], capture_output=True).stdout
monkeypatch.setenv("CLASSPATH", classpath)
for args in setup_commands:
subprocess.run(args)
except OSError:
pytest.skip("hdfs cli not installed")

yield url

shutdown_commands = [
[
"hdfs",
"dfs",
"-rm",
"-r",
f"{url}/deltars",
],
]

for args in shutdown_commands:
subprocess.run(args)
17 changes: 17 additions & 0 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,20 @@ def test_pickle_roundtrip(tmp_path):

infos = store_pkl.get_file_info(["asd.pkl"])
assert infos[0].size > 0


@pytest.mark.hdfs
@pytest.mark.integration
@pytest.mark.timeout(timeout=30, method="thread")
def test_read_hdfs(hdfs_url):
table_path = f"{hdfs_url}/deltars/simple"
handler = DeltaStorageHandler(table_path)
dt = DeltaTable(table_path)
files = dt.file_uris()
assert len(files) > 0
for file in files:
rel_path = file[len(table_path) :]
with handler.open_input_file(rel_path) as f_:
table = pq.read_table(f_)
assert isinstance(table, pa.Table)
assert table.shape > (0, 0)
Loading