Skip to content

Commit

Permalink
feat(cubestore): import separate CSV files in parallel
Browse files Browse the repository at this point in the history
Speeds up imports with multiple workers.
  • Loading branch information
ilya-biryukov committed May 12, 2021
1 parent 90350a3 commit ca896b3
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 126 deletions.
27 changes: 24 additions & 3 deletions rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ pub trait Cluster: DIService + Send + Sync {

fn node_name_by_partitions(&self, partition_ids: &[u64]) -> String;

async fn node_name_for_import(&self, table_id: u64) -> Result<String, CubeError>;
async fn node_name_for_import(
&self,
table_id: u64,
location: &str,
) -> Result<String, CubeError>;

async fn process_message_on_worker(&self, m: NetworkMessage) -> NetworkMessage;

Expand Down Expand Up @@ -276,12 +280,19 @@ impl Cluster for ClusterImpl {
workers[(hasher.finish() % workers.len() as u64) as usize].clone()
}

async fn node_name_for_import(&self, table_id: u64) -> Result<String, CubeError> {
async fn node_name_for_import(
&self,
table_id: u64,
location: &str,
) -> Result<String, CubeError> {
let workers = self.config_obj.select_workers();
if workers.is_empty() {
return Ok(self.server_name.to_string());
}
Ok(workers[(table_id % workers.len() as u64) as usize].to_string())
let mut hasher = DefaultHasher::new();
table_id.hash(&mut hasher);
location.hash(&mut hasher);
Ok(workers[(hasher.finish() % workers.len() as u64) as usize].to_string())
}

async fn warmup_partition(
Expand Down Expand Up @@ -542,6 +553,16 @@ impl JobRunner {
Self::fail_job_row_key(job);
}
}
JobType::TableImportCSV(location) => {
if let RowKey::Table(TableId::Tables, table_id) = job.row_reference() {
self.import_service
.clone()
.import_table_part(*table_id, location)
.await?
} else {
Self::fail_job_row_key(job);
}
}
}
Ok(())
}
Expand Down
201 changes: 104 additions & 97 deletions rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
pub mod limits;
use core::mem;
use core::slice::memchr;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;

use async_compression::tokio::bufread::GzipDecoder;
use async_std::io::SeekFrom;
use async_std::task::{Context, Poll};
use async_trait::async_trait;
use bigdecimal::{BigDecimal, Num};
use futures::future::join_all;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use mockall::automock;
use pin_project_lite::pin_project;
use tokio::fs::File;
use tokio::io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;

use crate::config::injection::DIService;
use crate::config::ConfigObj;
Expand All @@ -14,25 +32,9 @@ use crate::table::{Row, TableValue};
use crate::util::maybe_owned::MaybeOwnedStr;
use crate::util::ordfloat::OrdF64;
use crate::CubeError;
use async_compression::tokio::bufread::GzipDecoder;
use async_std::io::SeekFrom;
use async_std::task::{Context, Poll};
use async_trait::async_trait;
use bigdecimal::{BigDecimal, Num};
use core::mem;
use core::slice::memchr;
use futures::future::join_all;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use mockall::automock;
use pin_project_lite::pin_project;
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
use tempfile::TempPath;

pub mod limits;

impl ImportFormat {
async fn row_stream(
Expand Down Expand Up @@ -313,6 +315,7 @@ impl<R: AsyncBufRead> Stream for CsvLineStream<R> {
#[async_trait]
pub trait ImportService: DIService + Send + Sync {
async fn import_table(&self, table_id: u64) -> Result<(), CubeError>;
async fn import_table_part(&self, table_id: u64, location: &str) -> Result<(), CubeError>;
}

crate::di_service!(MockImportService, [ImportService]);
Expand Down Expand Up @@ -346,65 +349,77 @@ impl ImportServiceImpl {

pub async fn resolve_location(
&self,
location: String,
location: &str,
table_id: u64,
temp_files: &mut TempFiles,
) -> Result<File, CubeError> {
let file = if location.starts_with("http") {
let tmp_file = temp_files.new_file(table_id);
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(tmp_file)
.await?;
let mut stream = reqwest::get(&location).await?.bytes_stream();
temp_dir: &Path,
) -> Result<(File, Option<TempPath>), CubeError> {
if location.starts_with("http") {
let (file, path) = tempfile::Builder::new()
.prefix(&table_id.to_string())
.tempfile_in(temp_dir)?
.into_parts();
let mut file = File::from_std(file);
let mut stream = reqwest::get(location).await?.bytes_stream();
while let Some(bytes) = stream.next().await {
file.write_all(bytes?.as_ref()).await?;
}
file.seek(SeekFrom::Start(0)).await?;
file
Ok((file, Some(path)))
} else if location.starts_with("temp://") {
self.download_temp_file(location.clone()).await?
Ok((self.download_temp_file(location).await?, None))
} else {
File::open(location.clone()).await?
};

Ok(file)
Ok((File::open(location.clone()).await?, None))
}
}

async fn download_temp_file(&self, location: String) -> Result<File, CubeError> {
async fn download_temp_file(&self, location: &str) -> Result<File, CubeError> {
let to_download = location.replace("temp://", "temp-uploads/");
let local_file = self.remote_fs.download_file(&to_download).await?;
Ok(File::open(local_file).await?)
}
}

pub struct TempFiles {
temp_path: PathBuf,
files: Vec<PathBuf>,
}
async fn do_import(
&self,
table: &IdRow<Table>,
format: ImportFormat,
location: &str,
) -> Result<(), CubeError> {
let temp_dir = self.config_obj.data_dir().join("tmp");
tokio::fs::create_dir_all(temp_dir.clone()).await?;

impl TempFiles {
pub fn new(temp_path: PathBuf) -> Self {
Self {
temp_path,
files: Vec::new(),
let (file, tmp_path) = self
.resolve_location(location.clone(), table.get_id(), &temp_dir)
.await?;
let mut row_stream = format
.row_stream(
file,
location.to_string(),
table.get_row().get_columns().clone(),
)
.await?;

let mut ingestion = Ingestion::new(
self.meta_store.clone(),
self.chunk_store.clone(),
self.limits.clone(),
table.clone(),
);
let mut rows = MutRows::new(table.get_row().get_columns().len());
while let Some(row) = row_stream.next().await {
if let Some(row) = row? {
rows.add_row_heap_allocated(&row);
if rows.num_rows() >= self.config_obj.wal_split_threshold() as usize {
let mut to_add = MutRows::new(table.get_row().get_columns().len());
mem::swap(&mut rows, &mut to_add);
ingestion.queue_data_frame(to_add.freeze()).await?;
}
}
}
}

pub fn new_file(&mut self, table_id: u64) -> PathBuf {
let path = self.temp_path.join(format!("{}.csv", table_id));
self.files.push(path.clone());
path
}
}
mem::drop(tmp_path);

impl Drop for TempFiles {
fn drop(&mut self) {
for f in self.files.iter() {
let _ = fs::remove_file(f);
}
ingestion.queue_data_frame(rows.freeze()).await?;
ingestion.wait_completion().await
}
}

Expand All @@ -427,47 +442,39 @@ impl ImportService for ImportServiceImpl {
"Trying to import table without location: {:?}",
table
)))?;
let temp_dir = self.config_obj.data_dir().join("tmp");
tokio::fs::create_dir_all(temp_dir.clone()).await?;
for location in locations.into_iter() {
let mut temp_files = TempFiles::new(temp_dir.clone());
let file = self
.resolve_location(location.clone(), table_id, &mut temp_files)
.await?;
let mut row_stream = format
.row_stream(
file,
location.to_string(),
table.get_row().get_columns().clone(),
)
.await?;

let mut ingestion = Ingestion::new(
self.meta_store.clone(),
self.chunk_store.clone(),
self.limits.clone(),
table.clone(),
);
let mut rows = MutRows::new(table.get_row().get_columns().len());
while let Some(row) = row_stream.next().await {
if let Some(row) = row? {
rows.add_row_heap_allocated(&row);
if rows.num_rows() >= self.config_obj.wal_split_threshold() as usize {
let mut to_add = MutRows::new(table.get_row().get_columns().len());
mem::swap(&mut rows, &mut to_add);
ingestion.queue_data_frame(to_add.freeze()).await?;
}
}
}

mem::drop(temp_files);

ingestion.queue_data_frame(rows.freeze()).await?;
ingestion.wait_completion().await?;
self.do_import(&table, *format, &location).await?;
}

Ok(())
}

async fn import_table_part(&self, table_id: u64, location: &str) -> Result<(), CubeError> {
let table = self.meta_store.get_table_by_id(table_id).await?;
let format = table
.get_row()
.import_format()
.as_ref()
.ok_or(CubeError::internal(format!(
"Trying to import table without import format: {:?}",
table
)))?;
let locations = table
.get_row()
.locations()
.ok_or(CubeError::internal(format!(
"Trying to import table without location: {:?}",
table
)))?;

if locations.iter().find(|l| **l == location).is_none() {
return Err(CubeError::internal(format!(
"Location not found in table spec: table = {:?}, location = {}",
table, location
)));
}
self.do_import(&table, *format, location).await
}
}

/// Handles row-based data ingestion, e.g. on CSV import and SQL insert.
Expand Down
23 changes: 21 additions & 2 deletions rust/cubestore/src/metastore/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ use std::io::{Cursor, Write};

#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)]
pub enum JobType {
WalPartitioning = 1,
WalPartitioning,
PartitionCompaction,
TableImport,
Repartition,
TableImportCSV(/*location*/ String),
}

fn get_job_type_index(j: &JobType) -> u32 {
match j {
JobType::WalPartitioning => 1,
JobType::PartitionCompaction => 2,
JobType::TableImport => 3,
JobType::Repartition => 4,
JobType::TableImportCSV(_) => 5,
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Hash, Eq, PartialEq)]
Expand Down Expand Up @@ -122,7 +133,15 @@ impl RocksSecondaryIndex<Job, JobIndexKey> for JobRocksIndex {
JobIndexKey::RowReference(row_key, job_type) => {
let mut buf = Cursor::new(Vec::new());
buf.write_all(row_key.to_bytes().as_slice()).unwrap();
buf.write_u32::<BigEndian>(job_type.clone() as u32).unwrap();
buf.write_u32::<BigEndian>(get_job_type_index(job_type))
.unwrap();
match job_type {
JobType::TableImportCSV(l) => {
buf.write_u64::<BigEndian>(l.len() as u64).unwrap();
buf.write(l.as_bytes()).unwrap();
}
_ => {}
}
buf.into_inner()
}
JobIndexKey::ScheduledByShard(shard) => {
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl fmt::Display for Column {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum ImportFormat {
CSV,
}
Expand Down

0 comments on commit ca896b3

Please sign in to comment.