Skip to content

Commit

Permalink
Add hdfs support in python
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed May 12, 2023
1 parent 37a0f09 commit ea2a460
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 3 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,28 @@ jobs:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '17'

- uses: Swatinem/rust-cache@v2

- name: Enable manylinux Python targets
run: |
echo "/opt/python/cp37-cp37m/bin" >> $GITHUB_PATH
# hdfs support requires clang 3.5+, which requires an scl package on this image
- name: Build and install deltalake
run: |
yum install -y llvm-toolset-7
pip install virtualenv
virtualenv venv
source venv/bin/activate
make setup
# Install minimum PyArrow version
pip install -e .[pandas,devel] pyarrow==7.0.0
scl enable llvm-toolset-7 'pip install -e .[pandas,devel] pyarrow==7.0.0'
- name: Run tests
run: |
Expand All @@ -88,6 +95,10 @@ jobs:
override: true
components: rustfmt, clippy

- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.2'

- uses: Swatinem/rust-cache@v2

- uses: actions/setup-python@v3
Expand All @@ -110,7 +121,7 @@ jobs:
- name: Run tests
run: |
source venv/bin/activate
python -m pytest -m '((s3 or azure) and integration) or not integration'
python -m pytest -m '((s3 or azure or hdfs) and integration) or not integration'
- name: Test without pandas
run: |
Expand Down
10 changes: 10 additions & 0 deletions dev/hdfs/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>000</value>
</property>
</configuration>
10 changes: 10 additions & 0 deletions dev/hdfs/hdfs-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value>0.0.0.0</value>
</property>
</configuration>
10 changes: 10 additions & 0 deletions dev/hdfs/start-hdfs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

[ ! -d "/tmp/hadoop-hadoop/dfs/name" ] && hdfs namenode -format

hdfs --daemon start namenode
hdfs --daemon start datanode

hdfs dfs -chmod 777 /

exec tail -n 1000 -f /var/log/hadoop/*
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,17 @@ services:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- 10000:10000

hdfs:
image: apache/hadoop:3
ports:
- 9000:9000
- 9866:9866
volumes:
- ./dev/hdfs/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml
- ./dev/hdfs/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml
- ./dev/hdfs/start-hdfs.sh:/start-hdfs.sh
entrypoint: /start-hdfs.sh
# Only run on a profile so we can use the github action for HDFS support during CI
profiles:
- hdfs
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["azure", "gcs", "python", "datafusion", "unity-experimental"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs"]

[features]
default = ["rustls"]
Expand Down
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ markers = [
"integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
"s3: marks tests as integration tests with S3 (deselect with '-m \"not s3\"')",
"azure: marks tests as integration tests with Azure Blob Store",
"hdfs: marks tests as integrations tests with HDFS",
"pandas: marks tests that require pandas",
"pyspark: marks tests that require pyspark",
]
44 changes: 44 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,47 @@ def existing_table(tmp_path: pathlib.Path, sample_data: pa.Table):
path = str(tmp_path)
write_deltalake(path, sample_data)
return DeltaTable(path)


@pytest.fixture()
def hdfs_url(monkeypatch):
url = "hdfs://localhost:9000"

setup_commands = [
[
"hdfs",
"dfs",
"-mkdir",
f"{url}/deltars",
],
[
"hdfs",
"dfs",
"-copyFromLocal",
"../rust/tests/data/simple_table",
f"{url}/deltars/simple",
],
]

try:
classpath = subprocess.run(["hadoop", "classpath"], capture_output=True).stdout
monkeypatch.setenv("CLASSPATH", classpath)
for args in setup_commands:
subprocess.run(args)
except OSError:
pytest.skip("hdfs cli not installed")

yield url

shutdown_commands = [
[
"hdfs",
"dfs",
"-rm",
"-r",
f"{url}/deltars",
],
]

for args in shutdown_commands:
subprocess.run(args)
17 changes: 17 additions & 0 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,20 @@ def test_pickle_roundtrip(tmp_path):

infos = store_pkl.get_file_info(["asd.pkl"])
assert infos[0].size > 0


@pytest.mark.hdfs
@pytest.mark.integration
@pytest.mark.timeout(timeout=30, method="thread")
def test_read_hdfs(hdfs_url):
table_path = f"{hdfs_url}/deltars/simple"
handler = DeltaStorageHandler(table_path)
dt = DeltaTable(table_path)
files = dt.file_uris()
assert len(files) > 0
for file in files:
rel_path = file[len(table_path) :]
with handler.open_input_file(rel_path) as f_:
table = pq.read_table(f_)
assert isinstance(table, pa.Table)
assert table.shape > (0, 0)

0 comments on commit ea2a460

Please sign in to comment.