Skip to content

Commit

Permalink
fix(cubestore): use spawn_blocking on potentially expensive operati…
Browse files Browse the repository at this point in the history
…ons (#2219)

Concretely:
    - `NamedTempFile` usages call system APIs,
    - `batch_to_dataframe` are potentially compute-intensive.
      Meta plans should generally be cheap, but put them into
      `spawn_blocking` too for consistency.
  • Loading branch information
ilya-biryukov committed Feb 27, 2021
1 parent d1d4d54 commit a0f92e3
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
3 changes: 2 additions & 1 deletion rust/cubestore/src/queryplanner/mod.rs
Expand Up @@ -99,7 +99,8 @@ impl QueryPlanner for QueryPlannerImpl {
"Meta query data processing time: {:?}",
execution_time.elapsed()?
);
let data_frame = batch_to_dataframe(&results)?;
let data_frame =
tokio::task::spawn_blocking(move || batch_to_dataframe(&results)).await??;
Ok(data_frame)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/queryplanner/query_executor.rs
Expand Up @@ -124,7 +124,7 @@ impl QueryExecutor for QueryExecutorImpl {
&split_plan
);
}
let data_frame = batch_to_dataframe(&results?)?;
let data_frame = tokio::task::spawn_blocking(|| batch_to_dataframe(&results?)).await??;
Ok(data_frame)
}

Expand Down
16 changes: 12 additions & 4 deletions rust/cubestore/src/remotefs/gcs.rs
Expand Up @@ -9,7 +9,7 @@ use regex::{NoExpand, Regex};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
use tempfile::NamedTempFile;
use tempfile::{NamedTempFile, PathPersistError};
use tokio::fs;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
Expand Down Expand Up @@ -71,15 +71,18 @@ impl RemoteFs for GCSRemoteFs {
}

async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let mut local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
let downloads_dirs = local_dir.join("downloads");

fs::create_dir_all(&downloads_dirs).await?;
if !local_file.exists() {
let time = SystemTime::now();
debug!("Downloading {}", remote_path);
let (temp_file, temp_path) = NamedTempFile::new_in(&downloads_dirs)?.into_parts();
let (temp_file, temp_path) =
tokio::task::spawn_blocking(move || NamedTempFile::new_in(downloads_dirs))
.await??
.into_parts();
let mut writer = BufWriter::new(tokio::fs::File::from_std(temp_file));
let mut stream = Object::download_streamed(
self.bucket.as_str(),
Expand All @@ -95,7 +98,12 @@ impl RemoteFs for GCSRemoteFs {
}
writer.flush().await?;

temp_path.persist(&local_file)?;
local_file =
tokio::task::spawn_blocking(move || -> Result<PathBuf, PathPersistError> {
temp_path.persist(&local_file)?;
Ok(local_file)
})
.await??;

info!(
"Downloaded {} ({:?}) ({} bytes)",
Expand Down
16 changes: 12 additions & 4 deletions rust/cubestore/src/remotefs/mod.rs
Expand Up @@ -13,7 +13,7 @@ use log::debug;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::NamedTempFile;
use tempfile::{NamedTempFile, PathPersistError};
use tokio::fs;
use tokio::sync::{Mutex, RwLock};

Expand Down Expand Up @@ -105,14 +105,17 @@ impl RemoteFs for LocalDirRemoteFs {
}

async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let mut local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
let downloads_dir = local_dir.join("downloads");
fs::create_dir_all(&downloads_dir).await?;
if !local_file.exists() {
debug!("Downloading {}", remote_path);
let remote_dir = self.remote_dir.read().await;
let temp_path = NamedTempFile::new_in(&downloads_dir)?.into_temp_path();
let temp_path =
tokio::task::spawn_blocking(move || NamedTempFile::new_in(downloads_dir))
.await??
.into_temp_path();
fs::copy(remote_dir.as_path().join(remote_path), &temp_path)
.await
.map_err(|e| {
Expand All @@ -121,7 +124,12 @@ impl RemoteFs for LocalDirRemoteFs {
remote_path, e
))
})?;
temp_path.persist(&local_file)?;
local_file =
tokio::task::spawn_blocking(move || -> Result<PathBuf, PathPersistError> {
temp_path.persist(&local_file)?;
Ok(local_file)
})
.await??;
}
Ok(local_file.into_os_string().into_string().unwrap())
}
Expand Down

0 comments on commit a0f92e3

Please sign in to comment.