Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,10 @@ pub async fn evaluate_source_entry_with_memory(
None
};
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
let source_data = match source_op.executor.get_value(key).await? {
let source_value = match source_op.executor.get_value(key).await? {
Some(d) => d,
None => return Ok(None),
};
let source_value = match source_data.value.await? {
Some(value) => value,
None => return Ok(None),
};
let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?;
Ok(Some(output))
}
Expand Down
4 changes: 1 addition & 3 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ impl SourceContext {
let source_op = &plan.source_ops[self.source_idx];
let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted {
None
} else if let Some(d) = source_op.executor.get_value(&key).await? {
d.value.await?
} else {
// Even if the source version kind is not Deleted, the source value might be gone one polling.
// In this case, we still use the current source version even if it's already stale - actually this version skew
// also happens for update cases and there's no way to keep them always in sync for many sources.
//
// We only need source version <= actual version for value.
None
source_op.executor.get_value(&key).await?
};
let schema = &self.flow.data_schema;
let result = row_indexer::update_source_row(
Expand Down
9 changes: 1 addition & 8 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
}
}

pub struct SourceData<'a> {
/// Value that must increase monotonically after change. E.g. can be from the update time.
pub ordinal: Option<Ordinal>,
/// None means the item is gone when polling.
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
}

pub struct SourceRowMetadata {
pub key: KeyValue,
/// None means the ordinal is unavailable.
Expand Down Expand Up @@ -75,7 +68,7 @@ pub trait SourceExecutor: Send + Sync {
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;

// Get the value for the given key.
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;

fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
None
Expand Down
106 changes: 48 additions & 58 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,80 +246,70 @@ impl SourceExecutor for Executor {
.boxed()
}

async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
let file_id = key.str_value()?;

let resp = self
.drive_hub
.files()
.get(file_id)
.add_scope(Scope::Readonly)
.param("fields", "id,name,mimeType,trashed,modifiedTime")
.param("fields", "id,name,mimeType,trashed")
.doit()
.await
.or_not_found()?;
let file = match resp {
Some((_, file)) if file.trashed != Some(true) => file,
_ => return Ok(None),
};

let modified_time = file.modified_time;
let value = async move {
let type_n_body = if let Some(export_mime_type) = file
.mime_type
.as_ref()
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
{
let target_mime_type = if self.binary {
export_mime_type.binary
} else {
export_mime_type.text
};
self.drive_hub
.files()
.export(file_id, target_mime_type)
.add_scope(Scope::Readonly)
.doit()
.await
.or_not_found()?
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
let type_n_body = if let Some(export_mime_type) = file
.mime_type
.as_ref()
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
{
let target_mime_type = if self.binary {
export_mime_type.binary
} else {
self.drive_hub
.files()
.get(file_id)
.add_scope(Scope::Readonly)
.param("alt", "media")
.doit()
.await
.or_not_found()?
.map(|(resp, _)| (file.mime_type, resp.into_body()))
export_mime_type.text
};
let value = match type_n_body {
Some((mime_type, resp_body)) => {
let content = resp_body.collect().await?;
self.drive_hub
.files()
.export(file_id, target_mime_type)
.add_scope(Scope::Readonly)
.doit()
.await
.or_not_found()?
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
} else {
self.drive_hub
.files()
.get(file_id)
.add_scope(Scope::Readonly)
.param("alt", "media")
.doit()
.await
.or_not_found()?
.map(|(resp, _)| (file.mime_type, resp.into_body()))
};
let value = match type_n_body {
Some((mime_type, resp_body)) => {
let content = resp_body.collect().await?;

let fields = vec![
file.name.unwrap_or_default().into(),
mime_type.into(),
if self.binary {
content.to_bytes().to_vec().into()
} else {
String::from_utf8_lossy(&content.to_bytes())
.to_string()
.into()
},
];
Some(FieldValues { fields })
}
None => None,
};
Ok(value)
}
.boxed();
Ok(Some(SourceData {
ordinal: modified_time.map(|t| t.try_into()).transpose()?,
value,
}))
let fields = vec![
file.name.unwrap_or_default().into(),
mime_type.into(),
if self.binary {
content.to_bytes().to_vec().into()
} else {
String::from_utf8_lossy(&content.to_bytes())
.to_string()
.into()
},
];
Some(FieldValues { fields })
}
None => None,
};
Ok(value)
}
}

Expand Down
38 changes: 12 additions & 26 deletions src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,38 +84,24 @@ impl SourceExecutor for Executor {
.boxed()
}

async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
if !self.is_file_included(key.str_value()?.as_ref()) {
return Ok(None);
}
let path = self.root_path.join(key.str_value()?.as_ref());
let modified_time = match std::fs::metadata(&path) {
Ok(metadata) => metadata.modified()?,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(None);
let value = match std::fs::read(path) {
Ok(content) => {
let content = if self.binary {
fields_value!(content)
} else {
fields_value!(String::from_utf8_lossy(&content).to_string())
};
Some(content)
}
Err(e) => return Err(e.into()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => Err(e)?,
};
let value = async move {
let value = match std::fs::read(path) {
Ok(content) => {
let content = if self.binary {
fields_value!(content)
} else {
fields_value!(String::from_utf8_lossy(&content).to_string())
};
Some(content)
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => Err(e)?,
};
Ok(value)
}
.boxed();
Ok(Some(SourceData {
ordinal: Some(modified_time.try_into()?),
value,
}))
Ok(value)
}
}

Expand Down