Skip to content

Commit

Permalink
Improve partition printing
Browse files Browse the repository at this point in the history
  • Loading branch information
danburkert committed Oct 8, 2016
1 parent 74b83ac commit fcd22a5
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 96 deletions.
89 changes: 89 additions & 0 deletions src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,95 @@ fn increment_cell(row: &mut Row, idx: usize) -> bool {
true
}

/// Compares the cell values in two rows. If either cell is not set or null, returns false.
fn is_cell_equal(a: &Row, b: &Row, idx: usize) -> bool {
assert_eq!(a.schema(), b.schema());

if !a.is_set(idx).unwrap() || a.is_null(idx).unwrap() ||
!b.is_set(idx).unwrap() || b.is_null(idx).unwrap() {
return false;
}

match a.schema().columns()[idx].data_type() {
DataType::Bool => a.get::<bool>(idx) == b.get::<bool>(idx),
DataType::Int8 => a.get::<i8>(idx) == b.get::<i8>(idx),
DataType::Int16 => a.get::<i16>(idx) == b.get::<i16>(idx),
DataType::Int32 => a.get::<i32>(idx) == b.get::<i32>(idx),
DataType::Int64 | DataType::Timestamp => a.get::<i64>(idx) == b.get::<i64>(idx),
DataType::Binary | DataType::String => a.get::<&[u8]>(idx) == b.get::<&[u8]>(idx),
DataType::Float => a.get::<f32>(idx) == b.get::<f32>(idx),
DataType::Double => return a.get::<f64>(idx) == b.get::<f64>(idx),
}
}

fn is_cell_incremented(lower: &Row, upper: &Row, idx: usize) -> bool {
assert_eq!(lower.schema(), upper.schema());

if !lower.is_set(idx).unwrap() || lower.is_null(idx).unwrap() ||
!upper.is_set(idx).unwrap() || upper.is_null(idx).unwrap() {
return false;
}

match lower.schema().columns()[idx].data_type() {
DataType::Bool => {
lower.get::<bool>(idx).unwrap() == false && upper.get::<bool>(idx).unwrap() == true
},
DataType::Int8 => {
let lower = lower.get::<i8>(idx).unwrap();
let upper = upper.get::<i8>(idx).unwrap();
lower < i8::MAX && lower + 1 == upper
},
DataType::Int16 => {
let lower = lower.get::<i16>(idx).unwrap();
let upper = upper.get::<i16>(idx).unwrap();
lower < i16::MAX && lower + 1 == upper
},
DataType::Int32 => {
let lower = lower.get::<i32>(idx).unwrap();
let upper = upper.get::<i32>(idx).unwrap();
lower < i32::MAX && lower + 1 == upper
},
DataType::Int64 | DataType::Timestamp => {
let lower = lower.get::<i8>(idx).unwrap();
let upper = upper.get::<i8>(idx).unwrap();
lower < i8::MAX && lower + 1 == upper
},
DataType::Binary | DataType::String => {
let lower = lower.get::<&[u8]>(idx).unwrap();
let upper = upper.get::<&[u8]>(idx).unwrap();
lower.len() + 1 == upper.len() &&
lower == &upper[..lower.len()] &&
upper[upper.len() - 1] == 0
},
DataType::Float => {
let lower = lower.get::<f32>(idx).unwrap();
let upper = upper.get::<f32>(idx).unwrap();
lower != f32::INFINITY && lower.next() == upper
},
DataType::Double => {
let lower = lower.get::<f64>(idx).unwrap();
let upper = upper.get::<f64>(idx).unwrap();
lower != f64::INFINITY && lower.next() == upper
},
}
}

pub fn is_row_incremented(lower: &Row, upper: &Row, idxs: &[usize]) -> bool {
let mut equals = false;
for &idx in idxs.iter().rev() {
if equals {
if is_cell_equal(lower, upper, idx) { continue; }
else { return false; }
}

if !lower.is_set(idx).unwrap() && !upper.is_set(idx).unwrap() { continue; }

if !is_cell_incremented(lower, upper, idx) { return false; }
equals = true;
}
true
}

#[cfg(test)]
mod test {

Expand Down
16 changes: 8 additions & 8 deletions src/meta_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ impl Entry {

fn partition_lower_bound(&self) -> &[u8] {
match *self {
Entry::Tablet(ref tablet) => tablet.partition().lower_bound_encoded(),
Entry::Tablet(ref tablet) => tablet.partition().lower_bound_key(),
Entry::NonCoveredRange { ref partition_lower_bound, .. } => partition_lower_bound,
}
}

fn partition_upper_bound(&self) -> &[u8] {
match *self {
Entry::Tablet(ref tablet) => tablet.partition().upper_bound_encoded(),
Entry::Tablet(ref tablet) => tablet.partition().upper_bound_key(),
Entry::NonCoveredRange { ref partition_upper_bound, .. } => partition_upper_bound,
}
}
Expand Down Expand Up @@ -103,8 +103,8 @@ impl Entry {
// Sanity check that if the tablet IDs match, the ranges also match. If this fails,
// something is very wrong (possibly in the server).
debug_assert!(a.id() != b.id() ||
(a.partition().lower_bound_encoded() == b.partition().lower_bound_encoded() &&
a.partition().upper_bound_encoded() == b.partition().upper_bound_encoded()));
(a.partition().lower_bound_key() == b.partition().lower_bound_key() &&
a.partition().upper_bound_key() == b.partition().upper_bound_key()));
a.id() == b.id()
},
(&Entry::NonCoveredRange { partition_lower_bound: ref a_lower,
Expand Down Expand Up @@ -359,13 +359,13 @@ impl MetaCache {

for tablet in tablets {
let tablet = try!(Tablet::from_pb(&self.inner.primary_key_schema,
&self.inner.partition_schema,
self.inner.partition_schema.clone(),
tablet));
if tablet.partition().lower_bound_encoded() > &last_upper_bound {
if tablet.partition().lower_bound_key() > &last_upper_bound {
entries.push_back(Entry::non_covered_range(last_upper_bound,
tablet.partition().lower_bound_encoded().to_owned()));
tablet.partition().lower_bound_key().to_owned()));
}
last_upper_bound = tablet.partition().upper_bound_encoded().to_owned();
last_upper_bound = tablet.partition().upper_bound_key().to_owned();
entries.push_back(Entry::Tablet(tablet));
}

Expand Down
192 changes: 118 additions & 74 deletions src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use std::fmt;
use std::cmp;

Expand All @@ -12,11 +11,11 @@ use kudu_pb::common::{
SchemaPB,
};

use DataType;
use Error;
use Result;
use Row;
use Schema;
use key::decode_range_partition_key;
use key;
use util;

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -126,50 +125,7 @@ impl PartitionSchema {
}
}

#[derive(Clone, PartialEq, Eq)]
pub struct PartitionKey {
hash_buckets: Vec<u32>,
row: Row,
partition_schema: PartitionSchema,
}

impl PartitionKey {

pub fn hash_buckets(&self) -> &[u32] {
&self.hash_buckets
}

pub fn range_row(&self) -> &Row {
&self.row
}

pub fn partition_schema(&self) -> &PartitionSchema {
&self.partition_schema
}

fn from_encoded(schema: &Schema,
partition_schema: &PartitionSchema,
mut encoded_key: &[u8]) -> Result<PartitionKey> {

let mut hash_buckets = Vec::with_capacity(partition_schema.hash_partition_schemas().len());
for _ in partition_schema.hash_partition_schemas() {
if encoded_key.is_empty() { break; }
hash_buckets.push(BigEndian::read_u32(encoded_key));
encoded_key = &encoded_key[4..];
}

let range_row = try!(decode_range_partition_key(schema,
partition_schema.range_partition_schema(),
encoded_key));

Ok(PartitionKey {
hash_buckets: hash_buckets,
row: range_row,
partition_schema: partition_schema.clone(),
})
}
}

/*
impl fmt::Debug for PartitionKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut is_first = true;
Expand Down Expand Up @@ -207,66 +163,154 @@ impl fmt::Debug for PartitionKey {
Ok(())
}
}
*/

#[derive(Clone)]
pub struct Partition {
lower_bound_encoded: Vec<u8>,
upper_bound_encoded: Vec<u8>,
lower_bound: PartitionKey,
upper_bound: PartitionKey,
partition_schema: PartitionSchema,
lower_bound_key: Vec<u8>,
upper_bound_key: Vec<u8>,
hash_partitions: Vec<u32>,
range_lower_bound: Row,
range_upper_bound: Row,
}

impl fmt::Debug for Partition {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[({:?}), ({:?}))", &self.lower_bound, &self.upper_bound)
write!(f, "[({:?}), ({:?}))", &self.lower_bound_key, &self.upper_bound_key)
}
}

impl cmp::PartialEq for Partition {
fn eq(&self, other: &Partition) -> bool {
self.lower_bound_encoded == other.lower_bound_encoded &&
self.upper_bound_encoded == other.upper_bound_encoded &&
self.lower_bound.partition_schema == other.lower_bound.partition_schema
&self.lower_bound_key == &other.lower_bound_key &&
&self.upper_bound_key == &other.upper_bound_key
}
}

impl cmp::Eq for Partition { }

impl Partition {

pub fn lower_bound(&self) -> &PartitionKey {
&self.lower_bound
pub fn lower_bound_key(&self) -> &[u8] {
&self.lower_bound_key
}

pub fn upper_bound_key(&self) -> &[u8] {
&self.upper_bound_key
}

pub fn upper_bound(&self) -> &PartitionKey {
&self.upper_bound
pub fn range_lower_bound(&self) -> &Row {
&self.range_lower_bound
}

pub fn lower_bound_encoded(&self) -> &[u8] {
&self.lower_bound_encoded
pub fn range_upper_bound(&self) -> &Row {
&self.range_upper_bound
}

pub fn upper_bound_encoded(&self) -> &[u8] {
&self.upper_bound_encoded
pub fn hash_partitions(&self) -> &[u32] {
&self.hash_partitions
}

/// Formats the range partition.
///
/// VALUES = 123
/// VALUES = (123, 456)
/// 123 <= VALUES < 999
/// (123, 456) <= VALUES < (789, 102)
pub fn fmt_range_partition(&self, f: &mut fmt::Formatter) -> fmt::Result {
let idxs = &self.partition_schema.range_partition_schema().columns;

let lower_bound_idxs = idxs.iter().take_while(|&idx| self.range_lower_bound.is_set(*idx).unwrap()).count();
let upper_bound_idxs = idxs.iter().take_while(|&idx| self.range_upper_bound.is_set(*idx).unwrap()).count();

match (lower_bound_idxs, upper_bound_idxs) {
(0, 0) => write!(f, "UNBOUNDED"),
(0, count) => {
try!(write!(f, "VALUES < "));
fmt_row(f, &self.range_upper_bound, &idxs[..count])
},
(count, 0) => {
try!(write!(f, "VALUES >= "));
fmt_row(f, &self.range_lower_bound, &idxs[..count])
},
_ => {
if key::is_row_incremented(&self.range_lower_bound, &self.range_upper_bound, idxs) {
try!(write!(f, "VALUES = "));
fmt_row(f, &self.range_lower_bound, idxs)
} else {
try!(fmt_row(f, &self.range_lower_bound, idxs));
try!(write!(f, " <= VALUES < "));
fmt_row(f, &self.range_upper_bound, idxs)
}
}
}
}

#[doc(hidden)]
pub fn from_pb(primary_key_schema: &Schema,
partition_schema: &PartitionSchema,
partition_schema: PartitionSchema,
mut partition: PartitionPB)
-> Result<Partition> {
let lower_bound = try!(PartitionKey::from_encoded(primary_key_schema,
partition_schema,
partition.get_partition_key_start()));
let upper_bound = try!(PartitionKey::from_encoded(primary_key_schema,
partition_schema,
partition.get_partition_key_end()));
let lower_bound_key = partition.take_partition_key_start();
let upper_bound_key = partition.take_partition_key_end();

let hash_partition_levels = partition_schema.hash_partition_schemas().len();
let mut hash_partitions = Vec::with_capacity(hash_partition_levels);
{
let mut key = &lower_bound_key[..];
for _ in partition_schema.hash_partition_schemas() {
if key.is_empty() {
hash_partitions.push(0)
} else if key.len() < 4 {
return Err(Error::Serialization(format!("invalid lower bound partition key: {:?}",
lower_bound_key)));
} else {
hash_partitions.push(BigEndian::read_u32(key));
key = &key[4..];
}
}
}

let range_lower_bound = if lower_bound_key.len() > hash_partition_levels * 4 {
try!(key::decode_range_partition_key(primary_key_schema,
partition_schema.range_partition_schema(),
&lower_bound_key[hash_partition_levels * 4..]))
} else {
primary_key_schema.new_row()
};
let range_upper_bound = if upper_bound_key.len() > hash_partition_levels * 4 {
try!(key::decode_range_partition_key(primary_key_schema,
partition_schema.range_partition_schema(),
&upper_bound_key[hash_partition_levels * 4..]))
} else {
primary_key_schema.new_row()
};

Ok(Partition {
lower_bound_encoded: partition.take_partition_key_start(),
upper_bound_encoded: partition.take_partition_key_end(),
lower_bound: lower_bound,
upper_bound: upper_bound,
partition_schema: partition_schema,
lower_bound_key: lower_bound_key,
upper_bound_key: upper_bound_key,
hash_partitions: hash_partitions,
range_lower_bound: range_lower_bound,
range_upper_bound: range_upper_bound,
})
}
}

fn fmt_row(f: &mut fmt::Formatter, row: &Row, idxs: &[usize]) -> fmt::Result {
debug_assert!(!idxs.is_empty());

if idxs.len() == 1 {
return util::fmt_cell(f, row, idxs[0]);
}

try!(write!(f, "("));
let mut is_first = true;
for &idx in idxs {
if is_first { is_first = false; }
else { try!(write!(f, ", ")) }
try!(util::fmt_cell(f, row, idx));
}
write!(f, ")")
}
Loading

0 comments on commit fcd22a5

Please sign in to comment.