forked from shotover/shotover-proxy
/
token_map.rs
106 lines (95 loc) · 2.99 KB
/
token_map.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::transforms::cassandra::sink_cluster::CassandraNode;
use cassandra_protocol::token::Murmur3Token;
use std::collections::BTreeMap;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct TokenMap {
token_ring: BTreeMap<Murmur3Token, Uuid>,
}
impl TokenMap {
pub fn new(nodes: &[CassandraNode]) -> Self {
TokenMap {
token_ring: nodes
.iter()
.flat_map(|node| node.tokens.iter().map(|token| (*token, node.host_id)))
.collect(),
}
}
/// Returns nodes starting at given token and going in the direction of replicas.
pub fn iter_replica_nodes(
&self,
token: Murmur3Token,
replica_count: usize,
) -> impl Iterator<Item = Uuid> + '_ {
self.token_ring
.range(token..)
.chain(self.token_ring.iter())
.take(replica_count)
.map(|(_, node)| *node)
}
}
#[cfg(test)]
mod test_token_map {
use super::*;
use hex_literal::hex;
use itertools::Itertools;
use uuid::Uuid;
static NODE_1: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F71"));
static NODE_2: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F72"));
static NODE_3: Uuid = Uuid::from_bytes(hex!("2DD022D62937475489D602D2933A8F73"));
fn prepare_nodes() -> Vec<CassandraNode> {
vec![
CassandraNode::new(
"127.0.0.1:9042".parse().unwrap(),
"rack1".into(),
vec![
Murmur3Token::new(-2),
Murmur3Token::new(-1),
Murmur3Token::new(0),
],
NODE_1,
),
CassandraNode::new(
"127.0.0.1:9043".parse().unwrap(),
"rack1".into(),
vec![Murmur3Token::new(20)],
NODE_2,
),
CassandraNode::new(
"127.0.0.1:9044".parse().unwrap(),
"rack1".into(),
vec![
Murmur3Token::new(2),
Murmur3Token::new(1),
Murmur3Token::new(10),
],
NODE_3,
),
]
}
#[test]
fn should_return_replicas_in_order() {
verify_tokens(
&[NODE_1, NODE_3, NODE_3, NODE_3, NODE_2],
Murmur3Token::new(0),
);
}
#[test]
fn should_return_replicas_in_order_for_non_primary_token() {
verify_tokens(&[NODE_3, NODE_2], Murmur3Token::new(3));
}
#[test]
fn should_return_replicas_in_a_ring() {
verify_tokens(
&[NODE_2, NODE_1, NODE_1, NODE_1, NODE_3],
Murmur3Token::new(20),
);
}
fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) {
let token_map = TokenMap::new(prepare_nodes().as_slice());
let nodes = token_map
.iter_replica_nodes(token, node_host_ids.len())
.collect_vec();
assert_eq!(nodes, node_host_ids);
}
}