Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ sqlx = { version = "0.8.3", features = [
"runtime-tokio",
"uuid",
] }
tokio = { version = "1.44.1", features = [
"macros",
"rt-multi-thread",
"full",
"tracing",
] }
tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] }
tower = "0.5.2"
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
indexmap = { version = "2.8.0", features = ["serde"] }
Expand Down
37 changes: 22 additions & 15 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use futures::future::try_join_all;
use futures::StreamExt;
use indexmap::IndexMap;
use itertools::Itertools;
use serde::ser::SerializeSeq;
Expand All @@ -14,6 +15,7 @@ use super::indexer;
use super::memoization::EvaluationMemoryOptions;
use crate::base::{schema, value};
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
use crate::ops::interface::SourceExecutorListOptions;
use crate::utils::yaml_ser::YamlSerializer;

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -163,22 +165,27 @@ impl<'a> Dumper<'a> {
}

async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> {
let all_keys = source_op.executor.list_keys().await?;

let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
for key in all_keys {
let mut s = key
.to_strs()
.into_iter()
.map(|s| urlencoding::encode(&s).into_owned())
.join(":");
s.truncate(
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
.rev()
.find(|i| s.is_char_boundary(*i))
.unwrap_or(0),
);
keys_by_filename_prefix.entry(s).or_default().push(key);

let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
include_ordinal: false,
});
while let Some(rows) = rows_stream.next().await {
for row in rows?.into_iter() {
let mut s = row
.key
.to_strs()
.into_iter()
.map(|s| urlencoding::encode(&s).into_owned())
.join(":");
s.truncate(
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
.rev()
.find(|i| s.is_char_boundary(*i))
.unwrap_or(0),
);
keys_by_filename_prefix.entry(s).or_default().push(row.key);
}
}
let output_dir = Path::new(&self.options.output_dir);
let evaluate_futs =
Expand Down
25 changes: 16 additions & 9 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::prelude::*;

use futures::future::{join, join_all, try_join, try_join_all};
use futures::future::{join, join_all, try_join_all};
use itertools::Itertools;
use log::error;
use serde::Serialize;
Expand All @@ -14,7 +14,9 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz
use crate::base::schema;
use crate::base::value::{self, FieldValues, KeyValue};
use crate::builder::plan::*;
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
use crate::ops::interface::{
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions,
};
use crate::utils::db::WriteAction;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

Expand Down Expand Up @@ -638,16 +640,21 @@ async fn update_source(
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<SourceUpdateInfo> {
let (keys, existing_keys_json) = try_join(
source_op.executor.list_keys(),
db_tracking::list_source_tracking_keys(
source_op.source_id,
&plan.tracking_table_setup,
pool,
),
let existing_keys_json = db_tracking::list_source_tracking_keys(
source_op.source_id,
&plan.tracking_table_setup,
pool,
)
.await?;

let mut keys = Vec::new();
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
include_ordinal: false,
});
while let Some(rows) = rows_stream.next().await {
keys.extend(rows?.into_iter().map(|row| row.key));
}

let stats = UpdateStats::default();
let upsert_futs = join_all(keys.iter().map(|key| {
update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats)
Expand Down
16 changes: 15 additions & 1 deletion src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ pub struct SourceData<'a> {
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
}

pub struct SourceRowMetadata {
pub key: KeyValue,
/// None means the ordinal is unavailable.
pub ordinal: Option<Ordinal>,
}

pub struct SourceChange<'a> {
/// Last update/deletion ordinal. None means unavailable.
pub ordinal: Option<Ordinal>,
Expand All @@ -55,10 +61,18 @@ pub struct SourceChange<'a> {
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
}

#[derive(Debug, Default)]
pub struct SourceExecutorListOptions {
pub include_ordinal: bool,
}

#[async_trait]
pub trait SourceExecutor: Send + Sync {
/// Get the list of keys for the source.
async fn list_keys(&self) -> Result<Vec<KeyValue>>;
fn list<'a>(
&'a self,
options: SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;

// Get the value for the given key.
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;
Expand Down
149 changes: 92 additions & 57 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::{Arc, LazyLock},
};

use async_stream::try_stream;
use google_drive3::{
api::Scope,
api::{File, Scope},
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
DriveHub,
};
use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use indexmap::IndexSet;
use log::warn;
use log::{trace, warn};

use crate::base::field_attrs;
use crate::ops::sdk::*;
Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct Spec {
struct Executor {
drive_hub: DriveHub<HttpsConnector<HttpConnector>>,
binary: bool,
root_folder_ids: Vec<String>,
root_folder_ids: Vec<Arc<str>>,
}

impl Executor {
Expand All @@ -105,7 +105,7 @@ impl Executor {
Ok(Self {
drive_hub,
binary: spec.binary,
root_folder_ids: spec.root_folder_ids,
root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(),
})
}
}
Expand All @@ -123,55 +123,60 @@ fn escape_string(s: &str) -> String {
}

impl Executor {
async fn traverse_folder(
fn visit_file(
&self,
folder_id: &str,
visited_folder_ids: &mut IndexSet<String>,
result: &mut IndexSet<KeyValue>,
) -> Result<()> {
if !visited_folder_ids.insert(folder_id.to_string()) {
return Ok(());
file: File,
new_folder_ids: &mut Vec<Arc<str>>,
seen_ids: &mut HashSet<Arc<str>>,
) -> Result<Option<SourceRowMetadata>> {
if file.trashed == Some(true) {
return Ok(None);
}
let query = format!("'{}' in parents", escape_string(folder_id));
let mut next_page_token: Option<String> = None;
loop {
let mut list_call = self
.drive_hub
.files()
.list()
.add_scope(Scope::Readonly)
.q(&query);
if let Some(next_page_token) = &next_page_token {
list_call = list_call.page_token(next_page_token);
}
let (_, files) = list_call.doit().await?;
if let Some(files) = files.files {
for file in files {
match (file.id, file.mime_type) {
(Some(id), Some(mime_type)) => {
if mime_type == FOLDER_MIME_TYPE {
Box::pin(self.traverse_folder(&id, visited_folder_ids, result))
.await?;
} else if is_supported_file_type(&mime_type) {
result.insert(KeyValue::Str(Arc::from(id)));
} else {
warn!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
}
}
(id, mime_type) => {
warn!(
"Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",
);
}
}
}
}
next_page_token = files.next_page_token;
if next_page_token.is_none() {
break;
let (id, mime_type) = match (file.id, file.mime_type) {
(Some(id), Some(mime_type)) => (Arc::<str>::from(id), mime_type),
(id, mime_type) => {
warn!("Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",);
return Ok(None);
}
};
if !seen_ids.insert(id.clone()) {
return Ok(None);
}
let result = if mime_type == FOLDER_MIME_TYPE {
new_folder_ids.push(id);
None
} else if is_supported_file_type(&mime_type) {
Some(SourceRowMetadata {
key: KeyValue::Str(Arc::from(id)),
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
})
} else {
trace!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
None
};
Ok(result)
}

async fn list_files(
&self,
folder_id: &str,
fields: &str,
next_page_token: &mut Option<String>,
) -> Result<impl Iterator<Item = File>> {
let query = format!("'{}' in parents", escape_string(folder_id));
let mut list_call = self
.drive_hub
.files()
.list()
.add_scope(Scope::Readonly)
.q(&query)
.param("fields", fields);
if let Some(next_page_token) = &next_page_token {
list_call = list_call.page_token(next_page_token);
}
Ok(())
let (_, files) = list_call.doit().await?;
let file_iter = files.files.into_iter().flat_map(|file| file.into_iter());
Ok(file_iter)
}
}

Expand Down Expand Up @@ -202,13 +207,43 @@ impl<T> ResultExt<T> for google_drive3::Result<T> {

#[async_trait]
impl SourceExecutor for Executor {
async fn list_keys(&self) -> Result<Vec<KeyValue>> {
let mut result = IndexSet::new();
for root_folder_id in &self.root_folder_ids {
self.traverse_folder(root_folder_id, &mut IndexSet::new(), &mut result)
.await?;
fn list<'a>(
&'a self,
options: SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
let mut seen_ids = HashSet::new();
let mut folder_ids = self.root_folder_ids.clone();
let fields = format!(
"files(id,name,mimeType,trashed{})",
if options.include_ordinal {
",modifiedTime"
} else {
""
}
);
let mut new_folder_ids = Vec::new();
try_stream! {
while let Some(folder_id) = folder_ids.pop() {
let mut next_page_token = None;
loop {
let mut curr_rows = Vec::new();
let files = self
.list_files(&folder_id, &fields, &mut next_page_token)
.await?;
for file in files {
curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids)?);
}
if !curr_rows.is_empty() {
yield curr_rows;
}
if next_page_token.is_none() {
break;
}
}
folder_ids.extend(new_folder_ids.drain(..).rev());
}
}
Ok(result.into_iter().collect())
.boxed()
}

async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {
Expand Down
Loading