Skip to content

Commit

Permalink
feat(cubestore): partitioned indexes for faster joins
Browse files Browse the repository at this point in the history
The index will ensure the data from multiple tables is partitioned
with the same keys and collocated on the same machines. This allows
to evenly distribute the work on JOIN queries and avoid duplicating
same keys on different machines.

However, we need more work to keep partitions at async on ingestion
and compaction.

```
CREATE PARTITIONED INDEX schema.pindex(id int, url text);
CREATE TABLE schema.foo(some_id int, some_url text, data text)
    ADD TO PARTITIONED INDEX pindex(some_id, some_url);
CREATE TABLE schema.bar(other_id int, other_url text, other_data text)
    ADD TO PARTITIONED INDEX pindex(other_id, other_url);

SELECT some_id, data, other_data
FROM schema.foo f
JOIN schema.bar b ON f.some_id = b.other_id AND f.some_url = b.other_url
```

This is an early version. There are known problems and limitations:
  - `DROP TABLE` might cancel multi-index partitioning,
  - There is no way to remove the index, i.e. `DROP PARTITIONED INDEX`,
  - Performance with large number of tables is yet to be evaluated.

Mostly tested on random data and simple queries, we expect to find more
issues and fixes before we stabilize and bring this to CubeJS.
  • Loading branch information
ilya-biryukov committed Nov 2, 2021
1 parent 108a4e3 commit 8ca605f
Show file tree
Hide file tree
Showing 24 changed files with 2,725 additions and 504 deletions.
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions rust/cubestore-sql-tests/src/tests.rs
Expand Up @@ -86,6 +86,11 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("planning_hints", planning_hints),
t("planning_inplace_aggregate2", planning_inplace_aggregate2),
t("topk_large_inputs", topk_large_inputs),
t("partitioned_index", partitioned_index),
t(
"partitioned_index_if_not_exists",
partitioned_index_if_not_exists,
),
t("planning_simple", planning_simple),
t("planning_joins", planning_joins),
t("planning_3_table_joins", planning_3_table_joins),
Expand Down Expand Up @@ -2100,6 +2105,73 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
);
}

async fn partitioned_index(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE PARTITIONED INDEX s.ind(id int, url text)")
.await
.unwrap();
service
.exec_query(
"CREATE TABLE s.Data1(id int, url text, hits int) \
ADD TO PARTITIONED INDEX s.ind(id, url)",
)
.await
.unwrap();
service
.exec_query(
"CREATE TABLE s.Data2(id2 int, url2 text, location text) \
ADD TO PARTITIONED INDEX s.ind(id2, url2)",
)
.await
.unwrap();

service
.exec_query(
"INSERT INTO s.Data1(id, url, hits) VALUES (0, 'a', 10), (1, 'a', 20), (2, 'c', 30)",
)
.await
.unwrap();
service
.exec_query("INSERT INTO s.Data2(id2, url2, location) VALUES (0, 'a', 'Mars'), (1, 'c', 'Earth'), (2, 'c', 'Moon')")
.await
.unwrap();

let r = service
.exec_query(
"SELECT id, url, hits, location \
FROM s.Data1 `l` JOIN s.Data2 `r` ON l.id = r.id2 AND l.url = r.url2 \
ORDER BY 1, 2",
)
.await
.unwrap();
assert_eq!(
to_rows(&r),
rows(&[(0, "a", 10, "Mars"), (2, "c", 30, "Moon")])
);
}

async fn partitioned_index_if_not_exists(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE PARTITIONED INDEX s.ind(id int, url text)")
.await
.unwrap();
service
.exec_query("CREATE PARTITIONED INDEX s.ind(id int, url text)")
.await
.unwrap_err();
service
.exec_query("CREATE PARTITIONED INDEX IF NOT EXISTS s.ind(id int, url text)")
.await
.unwrap();

service
.exec_query("CREATE PARTITIONED INDEX IF NOT EXISTS s.other_ind(id int, url text)")
.await
.unwrap();
}

async fn topk_large_inputs(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/Cargo.toml
Expand Up @@ -18,7 +18,7 @@ base64 = "0.13.0"
bumpalo = "3.6.1"
tokio = { version = "1.0", features = ["full", "rt"] }
warp = { git = 'https://github.com/seanmonstar/warp', version = "0.3.0" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "c3c77dd2aa408a7cb0c9f27e5e9fc1b101351dcd" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "e6e90af10616c1699ecd21f27d1e67a7f646d042" }
serde_derive = "1.0.115"
serde = "1.0.115"
serde_bytes = "0.11.5"
Expand Down
118 changes: 83 additions & 35 deletions rust/cubestore/src/cluster/mod.rs
Expand Up @@ -17,7 +17,6 @@ use crate::config::{Config, ConfigObj};
use crate::import::ImportService;
use crate::metastore::chunks::chunk_file_name;
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::metastore::partition::partition_file_name;
use crate::metastore::table::Table;
use crate::metastore::{Chunk, IdRow, MetaStore, MetaStoreEvent, Partition, RowKey, TableId};
use crate::metastore::{
Expand Down Expand Up @@ -73,6 +72,8 @@ use tracing::{instrument, Instrument};
pub trait Cluster: DIService + Send + Sync {
async fn notify_job_runner(&self, node_name: String) -> Result<(), CubeError>;

fn config(&self) -> Arc<dyn ConfigObj>;

/// Send full select to a worker, which will act as the main node for the query.
async fn route_select(
&self,
Expand Down Expand Up @@ -118,9 +119,7 @@ pub trait Cluster: DIService + Send + Sync {

fn job_result_listener(&self) -> JobResultListener;

async fn node_name_by_partitions(&self, partition_ids: &[u64]) -> Result<String, CubeError>;

fn node_name_by_partition_rows(&self, partitions: &Vec<IdRow<Partition>>) -> String;
fn node_name_by_partition(&self, p: &IdRow<Partition>) -> String;

async fn node_name_for_import(
&self,
Expand Down Expand Up @@ -262,6 +261,10 @@ impl Cluster for ClusterImpl {
Ok(())
}

fn config(&self) -> Arc<dyn ConfigObj> {
self.config_obj.clone()
}

async fn route_select(
&self,
node_name: &str,
Expand Down Expand Up @@ -362,27 +365,12 @@ impl Cluster for ClusterImpl {
}
}

async fn node_name_by_partitions(&self, partition_ids: &[u64]) -> Result<String, CubeError> {
let mut partitions = Vec::new();
for partition_id in partition_ids.iter() {
partitions.push(self.meta_store.get_partition(*partition_id).await?);
}
Ok(self.node_name_by_partition_rows(&mut partitions))
}

fn node_name_by_partition_rows(&self, partitions: &Vec<IdRow<Partition>>) -> String {
let workers = self.config_obj.select_workers();
if workers.is_empty() {
return self.server_name.to_string();
}

let mut hasher = DefaultHasher::new();
for partition in partitions.iter() {
partition.get_row().get_min_val().hash(&mut hasher);
partition.get_row().get_max_val().hash(&mut hasher);
partition.get_row().get_index_id().hash(&mut hasher);
fn node_name_by_partition(&self, p: &IdRow<Partition>) -> String {
if let Some(id) = p.get_row().multi_partition_id() {
pick_worker_by_ids(self.config_obj.as_ref(), [id]).to_string()
} else {
pick_worker_by_partitions(self.config_obj.as_ref(), [p]).to_string()
}
workers[(hasher.finish() % workers.len() as u64) as usize].clone()
}

async fn node_name_for_import(
Expand All @@ -405,7 +393,7 @@ impl Cluster for ClusterImpl {
partition: IdRow<Partition>,
chunks: Vec<IdRow<Chunk>>,
) -> Result<(), CubeError> {
let node_name = self.node_name_by_partitions(&[partition.get_id()]).await?;
let node_name = self.node_name_by_partition(&partition);
let mut futures = Vec::new();
if let Some(name) = partition.get_row().get_full_name(partition.get_id()) {
futures.push(self.warmup_download(&node_name, name));
Expand Down Expand Up @@ -743,6 +731,35 @@ impl JobRunner {
Self::fail_job_row_key(job)
}
}
JobType::MultiPartitionSplit => {
if let RowKey::Table(TableId::MultiPartitions, id) = job.row_reference() {
let compaction_service = self.compaction_service.clone();
let id = *id;
Ok(cube_ext::spawn(async move {
compaction_service.split_multi_partition(id).await
}))
} else {
Self::fail_job_row_key(job)
}
}
JobType::FinishMultiSplit => {
if let RowKey::Table(TableId::MultiPartitions, multi_part_id) = job.row_reference()
{
let meta_store = self.meta_store.clone();
let compaction_service = self.compaction_service.clone();
let multi_part_id = *multi_part_id;
Ok(cube_ext::spawn(async move {
for p in meta_store.find_unsplit_partitions(multi_part_id).await? {
compaction_service
.finish_multi_split(multi_part_id, p)
.await?
}
Ok(())
}))
} else {
Self::fail_job_row_key(job)
}
}
JobType::TableImport => {
if let RowKey::Table(TableId::Tables, table_id) = job.row_reference() {
let import_service = self.import_service.clone();
Expand Down Expand Up @@ -1418,18 +1435,10 @@ impl ClusterImpl {
log::debug!("Got {} partitions, running the warmup", partitions.len());

for (p, chunks) in partitions {
let node_name = match self.node_name_by_partitions(&[p.partition_id]).await {
Ok(p) => p,
Err(e) => {
log::error!("Failed to get node by partition: {}", e);
return;
}
};
if node_name != self.server_name {
if self.node_name_by_partition(&p) != self.server_name {
continue;
}
if p.has_main_table {
let file = partition_file_name(p.partition_id);
if let Some(file) = p.get_row().get_full_name(p.get_id()) {
if self.stop_token.is_cancelled() {
log::debug!("Startup warmup cancelled");
return;
Expand Down Expand Up @@ -1532,3 +1541,42 @@ impl MessageStream for QueryStream {
fn is_self_reference(name: &str) -> bool {
name.starts_with("@loop:")
}

/// Picks a worker by opaque id for any distributing work in a cluster.
/// Ids usually come from multi-partitions of the metastore.
pub fn pick_worker_by_ids(
config: &'a dyn ConfigObj,
ids: impl IntoIterator<Item = u64>,
) -> &'a str {
let workers = config.select_workers();
if workers.is_empty() {
return config.server_name().as_str();
}

let mut hasher = DefaultHasher::new();
for p in ids {
p.hash(&mut hasher);
}
workers[(hasher.finish() % workers.len() as u64) as usize].as_str()
}

/// Same as [pick_worker_by_ids], but uses ranges of partitions. This is a hack
/// to keep the same node for partitions produced by compaction that merged
/// chunks into the main table of a single partition.
pub fn pick_worker_by_partitions(
config: &'a dyn ConfigObj,
partitions: impl IntoIterator<Item = &'a IdRow<Partition>>,
) -> &'a str {
let workers = config.select_workers();
if workers.is_empty() {
return config.server_name().as_str();
}

let mut hasher = DefaultHasher::new();
for partition in partitions {
partition.get_row().get_min_val().hash(&mut hasher);
partition.get_row().get_max_val().hash(&mut hasher);
partition.get_row().get_index_id().hash(&mut hasher);
}
workers[(hasher.finish() % workers.len() as u64) as usize].as_str()
}
1 change: 1 addition & 0 deletions rust/cubestore/src/lib.rs
Expand Up @@ -11,6 +11,7 @@
#![feature(hash_set_entry)]
#![feature(map_first_last)]
#![feature(arc_new_cyclic)]
#![feature(is_sorted)]
// #![feature(trace_macros)]

// trace_macros!(true);
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/src/metastore/index.rs
Expand Up @@ -15,6 +15,7 @@ impl Index {
columns: Vec<Column>,
sort_key_size: u64,
partition_split_key_size: Option<u64>,
multi_index_id: Option<u64>,
) -> Result<Index, CubeError> {
if sort_key_size == 0 {
return Err(CubeError::user(format!(
Expand All @@ -28,6 +29,7 @@ impl Index {
columns,
sort_key_size,
partition_split_key_size,
multi_index_id,
})
}

Expand Down Expand Up @@ -55,6 +57,10 @@ impl Index {
pub fn partition_split_key_size(&self) -> &Option<u64> {
&self.partition_split_key_size
}

pub fn multi_index_id(&self) -> Option<u64> {
self.multi_index_id
}
}

#[derive(Clone, Copy, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/src/metastore/job.rs
Expand Up @@ -15,6 +15,8 @@ pub enum JobType {
TableImport,
Repartition,
TableImportCSV(/*location*/ String),
MultiPartitionSplit,
FinishMultiSplit,
}

fn get_job_type_index(j: &JobType) -> u32 {
Expand All @@ -24,6 +26,8 @@ fn get_job_type_index(j: &JobType) -> u32 {
JobType::TableImport => 3,
JobType::Repartition => 4,
JobType::TableImportCSV(_) => 5,
JobType::MultiPartitionSplit => 6,
JobType::FinishMultiSplit => 7,
}
}

Expand Down

0 comments on commit 8ca605f

Please sign in to comment.