Skip to content

Commit

Permalink
refactor: Use the same http client across project (#364)
Browse files Browse the repository at this point in the history
* refactor: Use the same http client across project

Signed-off-by: Xuanwo <github@xuanwo.io>

* Remove concurrency until we find better way

Signed-off-by: Xuanwo <github@xuanwo.io>

* Remove not needed actions

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jun 10, 2022
1 parent f5fad2a commit 9616cca
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 59 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/service_test_hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,6 @@ concurrency:
cancel-in-progress: true

jobs:
default:
runs-on: ${{ matrix.os }}
strategy:
matrix:
hdfs-version: [ "2.10.1", "3.2.3", "3.3.2" ]
os:
- ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Checkout python env
uses: actions/setup-python@v4
with:
python-version: '3.8'
- name: Checkout java env
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: '11'
- name: Setup-hdfs env
uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: ${{ matrix.hdfs-version }}

- name: Test
shell: bash
run: cargo test hdfs --features compress,retry,testing,services-hdfs -- --nocapture
env:
RUST_BACKTRACE: full
RUST_LOG: debug
OPENDAL_HDFS_TEST: on
OPENDAL_HDFS_ROOT: /tmp/
OPENDAL_HDFS_NAME_NODE: default
LD_LIBRARY_PATH: ${{ env.JAVA_HOME }}/lib/server:${{ env.LD_LIBRARY_PATH }}


hdfs:
runs-on: ${{ matrix.os }}
strategy:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/service_test_memory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
matrix:
os:
- ubuntu-latest
- macos-11
steps:
- uses: actions/checkout@v3
- name: Test
Expand Down
40 changes: 40 additions & 0 deletions src/io_util/http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;

/// HttpClient that used across opendal.
///
/// NOTE: we could change or support more underlying http backend.
#[derive(Debug, Clone)]
pub struct HttpClient(
hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
);

impl HttpClient {
/// Create a new http client.
pub fn new() -> Self {
HttpClient(hyper::Client::builder().build(hyper_tls::HttpsConnector::new()))
}
}

/// Forward all function to http backend.
impl Deref for HttpClient {
type Target =
hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
3 changes: 3 additions & 0 deletions src/io_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ mod http_body;
pub(crate) use http_body::new_http_channel;
pub(crate) use http_body::HttpBodyWriter;

mod http_client;
pub(crate) use http_client::HttpClient;

mod seekable_reader;
pub use seekable_reader::seekable_read;
pub use seekable_reader::SeekableReader;
Expand Down
15 changes: 2 additions & 13 deletions src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,11 @@ impl Operator {
#[derive(Clone, Debug)]
pub struct BatchOperator {
src: Operator,

concurrency: usize,
}

impl BatchOperator {
pub(crate) fn new(op: Operator) -> Self {
BatchOperator {
src: op,
concurrency: 4,
}
}

/// Specify the concurrency of batch operators.
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
BatchOperator { src: op }
}

/// Walk a dir in top down way: list current dir first and than list nested dir.
Expand Down Expand Up @@ -244,7 +233,7 @@ impl BatchOperator {
}

let obs = self.walk_bottom_up(path)?;
obs.try_for_each_concurrent(self.concurrency, |v| async move {
obs.try_for_each(|v| async move {
debug!("deleting {}", v.path());
v.into_object().delete().await
})
Expand Down
5 changes: 3 additions & 2 deletions src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::error::BackendError;
use crate::error::ObjectError;
use crate::io_util::new_http_channel;
use crate::io_util::HttpBodyWriter;
use crate::io_util::HttpClient;
use crate::object::ObjectMetadata;
use crate::ops::BytesRange;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -202,7 +203,7 @@ impl Builder {
("endpoint".to_string(), endpoint.to_string()),
]);

let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
let client = HttpClient::new();

let mut signer_builder = Signer::builder();
if let (Some(name), Some(key)) = (&self.account_name, &self.account_key) {
Expand All @@ -228,7 +229,7 @@ impl Builder {
#[derive(Debug, Clone)]
pub struct Backend {
container: String,
client: hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
client: HttpClient,
root: String, // root will be "/" or /abc/
endpoint: String,
signer: Arc<Signer>,
Expand Down
12 changes: 5 additions & 7 deletions src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::error::BackendError;
use crate::error::ObjectError;
use crate::io_util::new_http_channel;
use crate::io_util::HttpBodyWriter;
use crate::io_util::HttpClient;
use crate::ops::BytesRange;
use crate::ops::OpCreate;
use crate::ops::OpDelete;
Expand Down Expand Up @@ -415,10 +416,7 @@ impl Builder {
// Read RFC-0057: Auto Region for detailed behavior.
async fn detect_region(
&self,
client: &hyper::Client<
hyper_tls::HttpsConnector<hyper::client::HttpConnector>,
hyper::Body,
>,
client: &HttpClient,
bucket: &str,
context: &HashMap<String, String>,
) -> Result<(String, String)> {
Expand Down Expand Up @@ -628,7 +626,7 @@ impl Builder {
})?),
};

let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
let client = HttpClient::new();

let (endpoint, region) = self.detect_region(&client, bucket, &context).await?;
context.insert("endpoint".to_string(), endpoint.clone());
Expand Down Expand Up @@ -680,7 +678,7 @@ pub struct Backend {
bucket: String,
endpoint: String,
signer: Arc<Signer>,
client: hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
client: HttpClient,
// root will be "/" or "/abc/"
root: String,

Expand Down Expand Up @@ -1295,7 +1293,7 @@ mod tests {

#[tokio::test]
async fn test_detect_region() {
let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
let client = HttpClient::new();

let endpoint_cases = vec![
Some("s3.amazonaws.com"),
Expand Down

1 comment on commit 9616cca

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-npnps9m6p-databend.vercel.app

Built with commit 9616cca.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.