From 9616cca9e60e6d8c269bab2dbb8f9fb69bd7a074 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 10 Jun 2022 18:11:21 +0800 Subject: [PATCH] refactor: Use the same http client across project (#364) * refactor: Use the same http client across project Signed-off-by: Xuanwo * Remove concurrency until we find better way Signed-off-by: Xuanwo * Remove not needed actions Signed-off-by: Xuanwo --- .github/workflows/service_test_hdfs.yml | 36 -------------------- .github/workflows/service_test_memory.yml | 1 - src/io_util/http_client.rs | 40 +++++++++++++++++++++++ src/io_util/mod.rs | 3 ++ src/operator.rs | 15 ++------- src/services/azblob/backend.rs | 5 +-- src/services/s3/backend.rs | 12 +++---- 7 files changed, 53 insertions(+), 59 deletions(-) create mode 100644 src/io_util/http_client.rs diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml index a9bc48090c4..15f19e241af 100644 --- a/.github/workflows/service_test_hdfs.yml +++ b/.github/workflows/service_test_hdfs.yml @@ -7,42 +7,6 @@ concurrency: cancel-in-progress: true jobs: - default: - runs-on: ${{ matrix.os }} - strategy: - matrix: - hdfs-version: [ "2.10.1", "3.2.3", "3.3.2" ] - os: - - ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Checkout python env - uses: actions/setup-python@v4 - with: - python-version: '3.8' - - name: Checkout java env - uses: actions/setup-java@v3 - with: - distribution: temurin - java-version: '11' - - name: Setup-hdfs env - uses: beyondstorage/setup-hdfs@master - with: - hdfs-version: ${{ matrix.hdfs-version }} - - - name: Test - shell: bash - run: cargo test hdfs --features compress,retry,testing,services-hdfs -- --nocapture - env: - RUST_BACKTRACE: full - RUST_LOG: debug - OPENDAL_HDFS_TEST: on - OPENDAL_HDFS_ROOT: /tmp/ - OPENDAL_HDFS_NAME_NODE: default - LD_LIBRARY_PATH: ${{ env.JAVA_HOME }}/lib/server:${{ env.LD_LIBRARY_PATH }} - - hdfs: runs-on: ${{ matrix.os }} strategy: diff --git a/.github/workflows/service_test_memory.yml b/.github/workflows/service_test_memory.yml index a2cc71dd09b..8dd00bc70a8 100644 --- a/.github/workflows/service_test_memory.yml +++ b/.github/workflows/service_test_memory.yml @@ -13,7 +13,6 @@ jobs: matrix: os: - ubuntu-latest - - macos-11 steps: - uses: actions/checkout@v3 - name: Test diff --git a/src/io_util/http_client.rs b/src/io_util/http_client.rs new file mode 100644 index 00000000000..5487d16b51e --- /dev/null +++ b/src/io_util/http_client.rs @@ -0,0 +1,40 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +/// HttpClient that used across opendal. +/// +/// NOTE: we could change or support more underlying http backend. +#[derive(Debug, Clone)] +pub struct HttpClient( + hyper::Client, hyper::Body>, +); + +impl HttpClient { + /// Create a new http client. + pub fn new() -> Self { + HttpClient(hyper::Client::builder().build(hyper_tls::HttpsConnector::new())) + } +} + +/// Forward all function to http backend. +impl Deref for HttpClient { + type Target = + hyper::Client, hyper::Body>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/io_util/mod.rs b/src/io_util/mod.rs index d515172a4aa..752c7be5560 100644 --- a/src/io_util/mod.rs +++ b/src/io_util/mod.rs @@ -40,6 +40,9 @@ mod http_body; pub(crate) use http_body::new_http_channel; pub(crate) use http_body::HttpBodyWriter; +mod http_client; +pub(crate) use http_client::HttpClient; + mod seekable_reader; pub use seekable_reader::seekable_read; pub use seekable_reader::SeekableReader; diff --git a/src/operator.rs b/src/operator.rs index 5e7a2dc43e5..0d4d7b91189 100644 --- a/src/operator.rs +++ b/src/operator.rs @@ -194,22 +194,11 @@ impl Operator { #[derive(Clone, Debug)] pub struct BatchOperator { src: Operator, - - concurrency: usize, } impl BatchOperator { pub(crate) fn new(op: Operator) -> Self { - BatchOperator { - src: op, - concurrency: 4, - } - } - - /// Specify the concurrency of batch operators. - pub fn with_concurrency(mut self, concurrency: usize) -> Self { - self.concurrency = concurrency; - self + BatchOperator { src: op } } /// Walk a dir in top down way: list current dir first and than list nested dir. @@ -244,7 +233,7 @@ impl BatchOperator { } let obs = self.walk_bottom_up(path)?; - obs.try_for_each_concurrent(self.concurrency, |v| async move { + obs.try_for_each(|v| async move { debug!("deleting {}", v.path()); v.into_object().delete().await }) diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 742f2882194..fa7d4722c54 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -49,6 +49,7 @@ use crate::error::BackendError; use crate::error::ObjectError; use crate::io_util::new_http_channel; use crate::io_util::HttpBodyWriter; +use crate::io_util::HttpClient; use crate::object::ObjectMetadata; use crate::ops::BytesRange; use crate::ops::OpCreate; @@ -202,7 +203,7 @@ impl Builder { ("endpoint".to_string(), endpoint.to_string()), ]); - let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); + let client = HttpClient::new(); let mut signer_builder = Signer::builder(); if let (Some(name), Some(key)) = (&self.account_name, &self.account_key) { @@ -228,7 +229,7 @@ impl Builder { #[derive(Debug, Clone)] pub struct Backend { container: String, - client: hyper::Client, hyper::Body>, + client: HttpClient, root: String, // root will be "/" or /abc/ endpoint: String, signer: Arc, diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 90c9e67f057..b49292e4d77 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -50,6 +50,7 @@ use crate::error::BackendError; use crate::error::ObjectError; use crate::io_util::new_http_channel; use crate::io_util::HttpBodyWriter; +use crate::io_util::HttpClient; use crate::ops::BytesRange; use crate::ops::OpCreate; use crate::ops::OpDelete; @@ -415,10 +416,7 @@ impl Builder { // Read RFC-0057: Auto Region for detailed behavior. async fn detect_region( &self, - client: &hyper::Client< - hyper_tls::HttpsConnector, - hyper::Body, - >, + client: &HttpClient, bucket: &str, context: &HashMap, ) -> Result<(String, String)> { @@ -628,7 +626,7 @@ impl Builder { })?), }; - let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); + let client = HttpClient::new(); let (endpoint, region) = self.detect_region(&client, bucket, &context).await?; context.insert("endpoint".to_string(), endpoint.clone()); @@ -680,7 +678,7 @@ pub struct Backend { bucket: String, endpoint: String, signer: Arc, - client: hyper::Client, hyper::Body>, + client: HttpClient, // root will be "/" or "/abc/" root: String, @@ -1295,7 +1293,7 @@ mod tests { #[tokio::test] async fn test_detect_region() { - let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); + let client = HttpClient::new(); let endpoint_cases = vec![ Some("s3.amazonaws.com"),