Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs_native): Add read,write,list implementation for hdfs_native #4505

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8cfcd8b
implementation for hdfs-native read,write.list
shbhmrzd Apr 19, 2024
5b8bce6
add behvaviour tests for hdfs_native
shbhmrzd Apr 19, 2024
1a709df
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 19, 2024
e9e417b
fix doc build
shbhmrzd Apr 19, 2024
af6e959
review comments
shbhmrzd Apr 22, 2024
42848ea
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 22, 2024
a8a6d17
review comments
shbhmrzd Apr 22, 2024
5b80bec
use url
shbhmrzd Apr 22, 2024
75ef053
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 22, 2024
bcc4aaf
use localhost url
shbhmrzd Apr 22, 2024
c23d576
java bindings hdfs-native
shbhmrzd Apr 22, 2024
76ea03d
add hdfs-native scheme
shbhmrzd Apr 22, 2024
a274223
restrict node and python bindings for hdfs-native
shbhmrzd Apr 22, 2024
b3b4435
remove hdfs-native from java binding
shbhmrzd Apr 22, 2024
77acd20
revert scheme and test plan changes
shbhmrzd Apr 23, 2024
c3178c0
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 23, 2024
fb8965d
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd Apr 28, 2024
914ead7
update supporting list, read and write
shbhmrzd Apr 28, 2024
63c2919
add read, write, list capability in accessor info
shbhmrzd Apr 28, 2024
3346eef
Merge remote-tracking branch 'upstream/main' into native_hdfs_read_write
shbhmrzd May 26, 2024
e795ffb
use read instead of read_range as it panics
shbhmrzd May 26, 2024
7be3838
revert changes
shbhmrzd May 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/services/hdfs/hdfs_native/action.yml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be placed in hdfs_native/hdfs.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: hdfs_native
description: 'Behavior test for hdfs_native'

runs:
using: "composite"
steps:
- name: Setup java env
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: "11"
- name: Setup
shell: bash
run: |
curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner

export HADOOP_HOME="/home/runner/hadoop-3.3.5"
export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)

cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml

cat << EOF >> $GITHUB_ENV
HADOOP_HOME=${HADOOP_HOME}
CLASSPATH=${CLASSPATH}
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native
OPENDAL_HDFS_ROOT=/tmp/opendal/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be OPENDAL_HDFS_NATIVE_ROOT

OPENDAL_HDFS_NAME_NODE=default
OPENDAL_HDFS_ENABLE_APPEND=false
EOF
3 changes: 2 additions & 1 deletion core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ impl Accessor for HdfsNativeBackend {

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);
let l = HdfsNativeLister::new(p, self.client.clone());
let list_status_iterator = self.client.list_status_iter(&p, false);
let l = HdfsNativeLister::new(&self.root, list_status_iterator);
Ok((RpList::default(), Some(l)))
}
}
52 changes: 43 additions & 9 deletions core/src/services/hdfs_native/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,62 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use chrono::DateTime;
use hdfs_native::client::{FileStatus, ListStatusIterator};

use crate::raw::oio;
use crate::raw::oio::Entry;
use crate::raw::{build_rel_path, oio};
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeLister {
_path: String,
_client: Arc<hdfs_native::Client>,
root: String,
lsi: ListStatusIterator,
}

impl HdfsNativeLister {
pub fn new(path: String, client: Arc<hdfs_native::Client>) -> Self {
HdfsNativeLister {
_path: path,
_client: client,
pub fn new(root: &str, lsi: ListStatusIterator) -> Self {
Self {
root: root.to_string(),
lsi,
}
}
}

impl oio::List for HdfsNativeLister {
async fn next(&mut self) -> Result<Option<Entry>> {
todo!()
let de: FileStatus = match self.lsi.next().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use code like the following for better reading:

let Ok(de) = self
    .lsi
    .next()
    .await
    .transpose()
    .map_err(parse_hdfs_error)?
else {
    return Ok(None);
};

Some(res) => match res {
Ok(fs) => fs,
Err(e) => return Err(parse_hdfs_error(e)),
},
None => return Ok(None),
};

let path = build_rel_path(&self.root, &de.path);

let entry = if !de.isdir {
let odt = DateTime::from_timestamp(de.modification_time as i64, 0);
let dt = match odt {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't match an Option if the None case returns an error, use let-else instead.

Some(dt) => dt,
None => {
return Err(Error::new(
ErrorKind::Unexpected,
&format!("Failure in extracting modified_time for {}", path),
))
}
};
let meta = Metadata::new(EntryMode::FILE)
.with_content_length(de.length as u64)
.with_last_modified(dt);
oio::Entry::new(&path, meta)
} else if de.isdir {
// Make sure we are returning the correct path.
oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
} else {
oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
};

Ok(Some(entry))
}
}
28 changes: 24 additions & 4 deletions core/src/services/hdfs_native/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,40 @@
use hdfs_native::file::FileReader;

use crate::raw::*;
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeReader {
_f: FileReader,
f: FileReader,
}

impl HdfsNativeReader {
pub fn new(f: FileReader) -> Self {
HdfsNativeReader { _f: f }
HdfsNativeReader { f }
}
}

impl oio::Read for HdfsNativeReader {
async fn read_at(&self, _offset: u64, _limit: usize) -> Result<Buffer> {
todo!()
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
// Check for offset being too large for usize on 32-bit systems
if offset > usize::MAX as u64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't run checks in services.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can we put this check ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can we put this check ?

We don't need this check for current. It's more like an upstream issue that can't range from i32::MAX..i64::MAX. You can create an opendal issue to track this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, created issue #4506

return Err(Error::new(
ErrorKind::InvalidInput,
"Offset is too large for this platform",
));
}

// Perform the read operation using read_range
let bytes = match self
.f
.read_range(offset as usize, limit)
.await
.map_err(parse_hdfs_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ? instead of matching an Result.

{
Ok(data) => data,
Err(e) => return Err(e),
};

Ok(Buffer::from(bytes))
}
}
14 changes: 9 additions & 5 deletions core/src/services/hdfs_native/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@
use hdfs_native::file::FileWriter;

use crate::raw::oio;
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;

pub struct HdfsNativeWriter {
_f: FileWriter,
f: FileWriter,
}

impl HdfsNativeWriter {
pub fn new(f: FileWriter) -> Self {
HdfsNativeWriter { _f: f }
HdfsNativeWriter { f }
}
}

impl oio::Write for HdfsNativeWriter {
async fn write(&mut self, _bs: Buffer) -> Result<usize> {
todo!()
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let bytes = bs.to_bytes();
let total_bytes = bytes.len();
self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(total_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does self.f.write() make sure that all bytes are written?

Copy link
Contributor Author

@shbhmrzd shbhmrzd Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, based on the definition, it continues to write until complete buf has been written.

    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
        let bytes_to_write = buf.len();
        // Create a shallow copy of the bytes instance to mutate and track what's been read
        while !buf.is_empty() {
            let block_writer = self.get_block_writer().await?;

            block_writer.write(&mut buf).await?;
        }

        self.bytes_written += bytes_to_write;

        Ok(bytes_to_write)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't rely this behavior. Return written like this:

let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)

}

async fn close(&mut self) -> Result<()> {
todo!()
self.f.close().await.map_err(parse_hdfs_error)
}

async fn abort(&mut self) -> Result<()> {
Expand Down
Loading