Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 97 additions & 21 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::{
collections::HashMap,
sync::{Arc, LazyLock},
};

use futures::future::try_join;
use google_drive3::{
api::Scope,
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
Expand All @@ -10,11 +12,62 @@ use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use indexmap::IndexSet;
use log::debug;
use log::warn;

use crate::ops::sdk::*;

struct ExportMimeType {
text: &'static str,
binary: &'static str,
}

const FOLDER_MIME_TYPE: &'static str = "application/vnd.google-apps.folder";
const FILE_MIME_TYPE: &'static str = "application/vnd.google-apps.file";
static EXPORT_MIME_TYPES: LazyLock<HashMap<&'static str, ExportMimeType>> = LazyLock::new(|| {
HashMap::from([
(
"application/vnd.google-apps.document",
ExportMimeType {
text: "text/markdown",
binary: "application/pdf",
},
),
(
"application/vnd.google-apps.spreadsheet",
ExportMimeType {
text: "text/csv",
binary: "application/pdf",
},
),
(
"application/vnd.google-apps.presentation",
ExportMimeType {
text: "text/plain",
binary: "application/pdf",
},
),
(
"application/vnd.google-apps.drawing",
ExportMimeType {
text: "image/svg+xml",
binary: "image/png",
},
),
(
"application/vnd.google-apps.script",
ExportMimeType {
text: "application/vnd.google-apps.script+json",
binary: "application/vnd.google-apps.script+json",
},
),
])
});

fn is_supported_file_type(mime_type: &str) -> bool {
!mime_type.starts_with("application/vnd.google-apps.")
|| EXPORT_MIME_TYPES.contains_key(mime_type)
|| mime_type == FILE_MIME_TYPE
}

#[derive(Debug, Deserialize)]
pub struct Spec {
Expand Down Expand Up @@ -91,11 +144,21 @@ impl Executor {
let (_, files) = list_call.doit().await?;
if let Some(files) = files.files {
for file in files {
if let Some(id) = file.id {
if file.mime_type.as_ref() == Some(&FOLDER_MIME_TYPE.to_string()) {
Box::pin(self.traverse_folder(&id, visited_folder_ids, result)).await?;
} else {
result.insert(KeyValue::Str(Arc::from(id)));
match (file.id, file.mime_type) {
(Some(id), Some(mime_type)) => {
if mime_type == FOLDER_MIME_TYPE {
Box::pin(self.traverse_folder(&id, visited_folder_ids, result))
.await?;
} else if is_supported_file_type(&mime_type) {
result.insert(KeyValue::Str(Arc::from(id)));
} else {
warn!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
}
}
(id, mime_type) => {
warn!(
"Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",
);
}
}
}
Expand All @@ -121,17 +184,32 @@ impl SourceExecutor for Executor {
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
let file_id = key.str_value()?;

let filename = async {
let (_, file) = self
.drive_hub
let (_, file) = self
.drive_hub
.files()
.get(file_id)
.add_scope(Scope::Readonly)
.doit()
.await?;

let resp_body = if let Some(export_mime_type) = file
.mime_type
.as_ref()
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
{
let target_mime_type = if self.binary {
export_mime_type.binary
} else {
export_mime_type.text
};
self.drive_hub
.files()
.get(file_id)
.export(&file_id, target_mime_type)
.add_scope(Scope::Readonly)
.doit()
.await?;
anyhow::Ok(file.name.unwrap_or_default())
};
let body = async {
.await?
.into_body()
} else {
let (resp, _) = self
.drive_hub
.files()
Expand All @@ -140,13 +218,11 @@ impl SourceExecutor for Executor {
.param("alt", "media")
.doit()
.await?;
let content = resp.into_body().collect().await?;
anyhow::Ok(content)
resp.into_body()
};
let (filename, content) = try_join(filename, body).await?;

let content = resp_body.collect().await?;
let mut fields = Vec::with_capacity(2);
fields.push(filename.into());
fields.push(file.name.unwrap_or_default().into());
if self.binary {
fields.push(content.to_bytes().to_vec().into());
} else {
Expand Down