Skip to content

Commit

Permalink
feat(services/onedrive): Add read and write support for OneDrive (#2129)
Browse files Browse the repository at this point in the history
* basic onedrive

* fix data model

* implement basic

* set hints

* fix fix fix

* fix all

* fix

* Squashed commit

* update res

* improve builder

* update

* finish builder

* fix read

* fix expose

* read impl

* update

* finish write

* Squashed commit of the following:

* clean up

* fix

* update

* update

* onedrive read and write

* update license header

* update

* fix formatting

* revert

* 201

* clean up

* use parse_location

* trigger ci

* update

* fix debug fmt

* remove access token

* update
  • Loading branch information
imWildCat committed Apr 28, 2023
1 parent e871a6f commit 69dd454
Show file tree
Hide file tree
Showing 9 changed files with 507 additions and 1 deletion.
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ default = [
"services-s3",
"services-webdav",
"services-webhdfs",
"services-onedrive",
]

# Build docs or not.
Expand Down Expand Up @@ -121,6 +122,7 @@ services-obs = [
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
]
services-onedrive = []
services-oss = [
"dep:reqsign",
"reqsign?/services-aliyun",
Expand Down
6 changes: 6 additions & 0 deletions core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,11 @@ pub use webdav::Webdav;

#[cfg(feature = "services-webhdfs")]
mod webhdfs;

#[cfg(feature = "services-onedrive")]
mod onedrive;
#[cfg(feature = "services-onedrive")]
pub use onedrive::Onedrive;

#[cfg(feature = "services-webhdfs")]
pub use webhdfs::Webhdfs;
200 changes: 200 additions & 0 deletions core/src/services/onedrive/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use http::{header, Request, Response, StatusCode};
use std::fmt::Debug;

use crate::{
ops::{OpRead, OpWrite},
raw::{
build_rooted_abs_path, new_request_build_error, parse_into_metadata, parse_location,
percent_encode_path, Accessor, AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody,
RpRead, RpWrite,
},
types::Result,
Capability, Error, ErrorKind,
};

use super::{error::parse_error, writer::OneDriveWriter};

#[derive(Clone)]
pub struct OnedriveBackend {
root: String,
access_token: String,
client: HttpClient,
}

impl OnedriveBackend {
pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self {
Self {
root,
access_token,
client: http_client,
}
}
}

impl Debug for OnedriveBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut de = f.debug_struct("OneDriveBackend");
de.field("root", &self.root);
de.field("access_token", &self.access_token);
de.finish()
}
}

#[async_trait]
impl Accessor for OnedriveBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = OneDriveWriter;
type BlockingWriter = ();
type Pager = ();
type BlockingPager = ();

fn info(&self) -> AccessorInfo {
let mut ma = AccessorInfo::default();
ma.set_scheme(crate::Scheme::Onedrive)
.set_root(&self.root)
.set_capability(Capability {
read: true,
read_can_next: true,
write: true,
list: true,
copy: true,
rename: true,
..Default::default()
});

ma
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.onedrive_get(path).await?;

let status = resp.status();

if status.is_redirection() {
let headers = resp.headers();
let location = parse_location(headers)?;

match location {
None => {
return Err(Error::new(
ErrorKind::ContentIncomplete,
"redirect location not found in response",
));
}
Some(location) => {
let resp = self.onedrive_get_redirection(location).await?;
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}
}
} else {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}

_ => Err(parse_error(resp).await?),
}
}
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
"write without content length is not supported",
));
}

let path = build_rooted_abs_path(&self.root, path);

Ok((
RpWrite::default(),
OneDriveWriter::new(self.clone(), args, path),
))
}
}

impl OnedriveBackend {
async fn onedrive_get(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let path = build_rooted_abs_path(&self.root, path);
let url: String = format!(
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/content",
percent_encode_path(&path),
);

let mut req = Request::get(&url);

let auth_header_content = format!("Bearer {}", self.access_token);
req = req.header(header::AUTHORIZATION, auth_header_content);

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
}

async fn onedrive_get_redirection(&self, url: &str) -> Result<Response<IncomingAsyncBody>> {
let mut req = Request::get(url);

let auth_header_content = format!("Bearer {}", self.access_token);
req = req.header(header::AUTHORIZATION, auth_header_content);

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
}

pub async fn onedrive_put(
&self,
path: &str,
size: Option<usize>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let url = format!(
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/content",
percent_encode_path(path)
);

let mut req = Request::put(&url);

let auth_header_content = format!("Bearer {}", self.access_token);
req = req.header(header::AUTHORIZATION, auth_header_content);

if let Some(size) = size {
req = req.header(header::CONTENT_LENGTH, size)
}

if let Some(mime) = content_type {
req = req.header(header::CONTENT_TYPE, mime)
}

let req = req.body(body).map_err(new_request_build_error)?;

self.client.send(req).await
}
}
149 changes: 149 additions & 0 deletions core/src/services/onedrive/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};

use log::debug;

use super::backend::OnedriveBackend;
use crate::raw::{normalize_root, HttpClient};
use crate::Scheme;
use crate::*;

/// [OneDrive](https://onedrive.com) backend support.
///
/// # Capabilities
///
/// This service can be used to:
///
/// - [x] read
/// - [x] write
/// - [ ] copy
/// - [ ] rename
/// - [ ] list
/// - [ ] ~~scan~~
/// - [ ] ~~presign~~
/// - [ ] blocking
///
/// # Notes
///
/// Currently, only OneDrive Personal is supported.
/// For uploading, only files under 4MB are supported via the Simple Upload API (<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online>).
///
/// # Configuration
///
/// - `access_token`: set the access_token for Graph API
/// - `root`: Set the work directory for backend
///
/// You can refer to [`OneDriveBuilder`]'s docs for more information
///
/// # Example
///
/// ## Via Builder
///
/// ```no_run
/// use anyhow::Result;
/// use opendal::services::Onedrive;
/// use opendal::Operator;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // create backend builder
/// let mut builder = Onedrive::default();
///
/// builder
/// .access_token("xxx")
/// .root("/path/to/root");
///
/// let op: Operator = Operator::new(builder)?.finish();
/// Ok(())
/// }
/// ```
#[derive(Default)]
pub struct OnedriveBuilder {
access_token: Option<String>,
root: Option<String>,
http_client: Option<HttpClient>,
}

impl Debug for OnedriveBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend").field("root", &self.root).finish()
}
}

impl OnedriveBuilder {
/// set the bearer access token for OneDrive
///
/// default: no access token, which leads to failure
pub fn access_token(&mut self, access_token: &str) -> &mut Self {
self.access_token = Some(access_token.to_string());
self
}

/// Set root path of OneDrive folder.
pub fn root(&mut self, root: &str) -> &mut Self {
self.root = Some(root.to_string());
self
}

/// Specify the http client that used by this service.
///
/// # Notes
///
/// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
/// during minor updates.
pub fn http_client(&mut self, http_client: HttpClient) -> &mut Self {
self.http_client = Some(http_client);
self
}
}

impl Builder for OnedriveBuilder {
const SCHEME: Scheme = Scheme::Onedrive;

type Accessor = OnedriveBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = Self::default();

map.get("root").map(|v| builder.root(v));
map.get("access_token").map(|v| builder.access_token(v));

builder
}

fn build(&mut self) -> Result<Self::Accessor> {
let root = normalize_root(&self.root.take().unwrap_or_default());
debug!("backend use root {}", root);

let client = if let Some(client) = self.http_client.take() {
client
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::Onedrive)
})?
};

match self.access_token.clone() {
Some(access_token) => Ok(OnedriveBackend::new(root, access_token, client)),
None => Err(Error::new(ErrorKind::ConfigInvalid, "access_token not set")),
}
}
}
Loading

0 comments on commit 69dd454

Please sign in to comment.