Skip to content

Commit

Permalink
feat(stream common): add internal cache for SortBuffer (risingwavel…
Browse files Browse the repository at this point in the history
…abs#8963)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc committed Apr 7, 2023
1 parent 09e6a63 commit 5169ce9
Show file tree
Hide file tree
Showing 21 changed files with 899 additions and 470 deletions.
67 changes: 67 additions & 0 deletions src/common/src/row/ascent_owned_row.rs
@@ -0,0 +1,67 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(deprecated)]

use std::ops::Deref;

use super::{OwnedRow, Row};

/// A simple wrapper for [`OwnedRow`], which assumes that all fields are defined as `ASC` order.
/// TODO(rc): This is used by `sort_v0` and `sort_buffer_v0`, now we can remove it.
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
#[deprecated(note = "The ordering of this `AscentOwnedRow` is not correct. Don't use it anymore.")]
pub struct AscentOwnedRow(OwnedRow);

impl AscentOwnedRow {
pub fn into_inner(self) -> OwnedRow {
self.0
}
}

impl Deref for AscentOwnedRow {
type Target = OwnedRow;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Row for AscentOwnedRow {
type Iter<'a> = <OwnedRow as Row>::Iter<'a>;

deref_forward_row! {}

fn into_owned_row(self) -> OwnedRow {
self.into_inner()
}
}

impl PartialOrd for AscentOwnedRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.0.as_inner().partial_cmp(other.0.as_inner())
}
}

impl Ord for AscentOwnedRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.partial_cmp(other)
.unwrap_or_else(|| panic!("cannot compare rows with different types"))
}
}

impl From<OwnedRow> for AscentOwnedRow {
fn from(row: OwnedRow) -> Self {
Self(row)
}
}
9 changes: 6 additions & 3 deletions src/common/src/row/mod.rs
Expand Up @@ -182,11 +182,11 @@ impl<R: Row> RowExt for R {}
/// Forward the implementation of [`Row`] to the deref target.
macro_rules! deref_forward_row {
() => {
fn datum_at(&self, index: usize) -> DatumRef<'_> {
fn datum_at(&self, index: usize) -> crate::types::DatumRef<'_> {
(**self).datum_at(index)
}

unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
unsafe fn datum_at_unchecked(&self, index: usize) -> crate::types::DatumRef<'_> {
(**self).datum_at_unchecked(index)
}

Expand Down Expand Up @@ -375,17 +375,20 @@ impl<R: Row> Row for Option<R> {
}
}

mod ascent_owned_row;
mod chain;
mod compacted_row;
mod empty;
mod once;
mod owned_row;
mod project;
mod repeat_n;
#[allow(deprecated)]
pub use ascent_owned_row::AscentOwnedRow;
pub use chain::Chain;
pub use compacted_row::CompactedRow;
pub use empty::{empty, Empty};
pub use once::{once, Once};
pub use owned_row::{AscentOwnedRow, OwnedRow, RowDeserializer};
pub use owned_row::{OwnedRow, RowDeserializer};
pub use project::Project;
pub use repeat_n::{repeat_n, RepeatN};
51 changes: 1 addition & 50 deletions src/common/src/row/owned_row.rs
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{self, Deref};

use super::Row;
use crate::collection::estimate_size::EstimateSize;
use crate::types::{
Expand All @@ -28,7 +26,7 @@ use crate::util::value_encoding::deserialize_datum;
pub struct OwnedRow(Vec<Datum>);

/// Do not implement `IndexMut` to make it immutable.
impl ops::Index<usize> for OwnedRow {
impl std::ops::Index<usize> for OwnedRow {
type Output = Datum;

fn index(&self, index: usize) -> &Self::Output {
Expand Down Expand Up @@ -229,53 +227,6 @@ impl<D: AsRef<[DataType]>> RowDeserializer<D> {
}
}

/// A simple wrapper for [`OwnedRow`], which assumes that all fields are defined as `ASC` order.
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct AscentOwnedRow(OwnedRow);

impl AscentOwnedRow {
pub fn into_inner(self) -> OwnedRow {
self.0
}
}

impl Deref for AscentOwnedRow {
type Target = OwnedRow;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Row for AscentOwnedRow {
type Iter<'a> = <OwnedRow as Row>::Iter<'a>;

deref_forward_row! {}

fn into_owned_row(self) -> OwnedRow {
self.into_inner()
}
}

impl PartialOrd for AscentOwnedRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.0.as_inner().partial_cmp(other.0.as_inner())
}
}

impl Ord for AscentOwnedRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.partial_cmp(other)
.unwrap_or_else(|| panic!("cannot compare rows with different types"))
}
}

impl From<OwnedRow> for AscentOwnedRow {
fn from(row: OwnedRow) -> Self {
Self(row)
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
Expand Down
19 changes: 19 additions & 0 deletions src/stream/src/common/cache/mod.rs
@@ -0,0 +1,19 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod state_cache;
mod top_n_cache;

pub use state_cache::*;
pub use top_n_cache::*;
69 changes: 69 additions & 0 deletions src/stream/src/common/cache/state_cache/mod.rs
@@ -0,0 +1,69 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub use ordered::*;
use risingwave_common::array::Op;
pub use top_n::*;

mod ordered;
mod top_n;

/// A common interface for state table cache.
pub trait StateCache {
type Key: Ord;
type Value;

/// Type of state cache filler, for syncing the cache with the state table.
type Filler<'a>: StateCacheFiller<Key = Self::Key, Value = Self::Value> + 'a
where
Self: 'a;

/// Check if the cache is synced with the state table.
fn is_synced(&self) -> bool;

/// Begin syncing the cache with the state table.
fn begin_syncing(&mut self) -> Self::Filler<'_>;

/// Insert an entry into the cache. Should not break cache validity.
fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value>;

/// Delete an entry from the cache. Should not break cache validity.
fn delete(&mut self, key: &Self::Key) -> Option<Self::Value>;

/// Apply a batch of operations to the cache. Should not break cache validity.
fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>);

/// Clear the cache.
fn clear(&mut self);

/// Iterate over the values in the cache.
fn values(&self) -> impl Iterator<Item = &Self::Value>;

/// Get the reference of first key-value pair in the cache.
fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)>;
}

pub trait StateCacheFiller {
type Key: Ord;
type Value;

/// Get the capacity of the cache.
fn capacity(&self) -> Option<usize>;

/// Insert an entry into the cache without cache validity check.
fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value);

/// Finish syncing the cache with the state table. This should mark the cache as synced.
fn finish(self);
}
120 changes: 120 additions & 0 deletions src/stream/src/common/cache/state_cache/ordered.rs
@@ -0,0 +1,120 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use risingwave_common::array::Op;

use super::{StateCache, StateCacheFiller};

/// An implementation of [`StateCache`] that uses a [`BTreeMap`] as the underlying cache, with no
/// capacity limit.
pub struct OrderedStateCache<K: Ord, V> {
cache: BTreeMap<K, V>,
synced: bool,
}

impl<K: Ord, V> OrderedStateCache<K, V> {
pub fn new() -> Self {
Self {
cache: BTreeMap::new(),
synced: false,
}
}
}

impl<K: Ord, V> Default for OrderedStateCache<K, V> {
fn default() -> Self {
Self::new()
}
}

impl<K: Ord, V> StateCache for OrderedStateCache<K, V> {
type Filler<'a> = &'a mut Self where Self: 'a;
type Key = K;
type Value = V;

fn is_synced(&self) -> bool {
self.synced
}

fn begin_syncing(&mut self) -> Self::Filler<'_> {
self.synced = false;
self.cache.clear();
self
}

fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
if self.synced {
self.cache.insert(key, value)
} else {
None
}
}

fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
if self.synced {
self.cache.remove(key)
} else {
None
}
}

fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
if self.synced {
for (op, key, value) in batch {
match op {
Op::Insert | Op::UpdateInsert => {
self.cache.insert(key, value);
}
Op::Delete | Op::UpdateDelete => {
self.cache.remove(&key);
}
}
}
}
}

fn clear(&mut self) {
self.cache.clear();
self.synced = false;
}

fn values(&self) -> impl Iterator<Item = &Self::Value> {
assert!(self.synced);
self.cache.values()
}

fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
assert!(self.synced);
self.cache.first_key_value()
}
}

impl<K: Ord, V> StateCacheFiller for &mut OrderedStateCache<K, V> {
type Key = K;
type Value = V;

fn capacity(&self) -> Option<usize> {
None
}

fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) {
self.cache.insert(key, value);
}

fn finish(self) {
self.synced = true;
}
}

0 comments on commit 5169ce9

Please sign in to comment.