Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared subscriptions with configurable strategy #668

Merged
merged 27 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4297b75
feat: move cursor logic to datalog and scheduling logic to router
swanandx Jul 17, 2023
0539e49
feat: update next_client state per publish
swanandx Jul 17, 2023
821f4c9
feat: refactor
swanandx Jul 17, 2023
d776484
feat: Move clients by index & make sure filter matches wildcards
Nickztar Jul 17, 2023
0b7c9d6
fix: Removing a client should keep the round-robin
Nickztar Jul 17, 2023
12bf166
feat: Remove shared group if no remaining clients
Nickztar Jul 17, 2023
29565bb
fix: Make sure to clean up when client disconnect
Nickztar Jul 17, 2023
0c3991f
fix: perform cleanup on disconnect instead of publish
Nickztar Jul 18, 2023
ae85d19
fix: store correct subscription in map for shared subscriptions
Nickztar Jul 18, 2023
f7cdf52
feat: move share logic in forward_device_data and some cleanup
swanandx Jul 21, 2023
feeee03
feat: cursor in shared group
swanandx Jul 21, 2023
4ef5ee4
remove unused import
swanandx Jul 21, 2023
55a30d2
return after reading
swanandx Jul 24, 2023
862b2e9
feat: SkipRequest variant in ConsumeStatus
swanandx Jul 24, 2023
61261c4
feat: add skipped req even if we run out of max iter
swanandx Jul 24, 2023
87ec436
feat: specify strategy in rumqttd.toml
swanandx Jul 26, 2023
f1835bf
feat: random strategy
swanandx Jul 27, 2023
1400fc8
Merge branch 'main' into new-shared-subs
swanandx Jul 27, 2023
f6e249a
update Cargo.lock as well
swanandx Jul 27, 2023
9152f25
fix: tests
swanandx Jul 27, 2023
45c612b
improved comments
swanandx Jul 28, 2023
8ea0e4e
feat: skipped_requests in Router
swanandx Jul 28, 2023
9e0343b
feat: skipped_requests back in consume for better redability
swanandx Jul 28, 2023
83fc3fc
Merge branch 'bytebeamio:main' into new-shared-subs
Nickztar Jul 30, 2023
9edba1d
Merge branch 'main' into new-shared-subs
swanandx Aug 14, 2023
8d77c6e
feat: refactored prepare_filter
swanandx Aug 14, 2023
648a9d2
entry in CHANGELOG
swanandx Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
clap = { version = "4.2", features = ["derive"] }
axum = "0.6.4"
rand = "0.8.5"

[features]
default = ["use-rustls"]
Expand Down
1 change: 1 addition & 0 deletions rumqttd/rumqttd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ max_connections = 10010
max_outgoing_packet_count = 200
max_segment_size = 104857600
max_segment_count = 10
# shared_subscriptions_strategy = "random" # "sticky" | "roundrobin" ( default ) | "random"
# Any filters that match to configured filter will have custom segment size.
# [router.custom_segment.'/office/+/devices/status']
# max_segment_size = 102400
Expand Down
5 changes: 5 additions & 0 deletions rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub use link::meters;
pub use router::{Alert, IncomingMeter, Meter, Notification, OutgoingMeter};
pub use server::Broker;

use self::router::shared_subs::Strategy;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct Config {
pub id: usize,
Expand Down Expand Up @@ -144,6 +146,9 @@ pub struct RouterConfig {
pub max_segment_count: usize,
pub custom_segment: Option<HashMap<String, SegmentConfig>>,
pub initialized_filters: Option<Vec<Filter>>,
// defaults to Round Robin
#[serde(default)]
pub shared_subscriptions_strategy: Strategy,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions rumqttd/src/router/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ impl AckLog {
#[cfg(test)]
mod test {
use super::DataLog;
use crate::router::shared_subs::Strategy;
use crate::RouterConfig;

#[test]
Expand All @@ -426,6 +427,7 @@ mod test {
max_outgoing_packet_count: 1024,
custom_segment: None,
initialized_filters: None,
shared_subscriptions_strategy: Strategy::RoundRobin,
};
let mut data = DataLog::new(config).unwrap();
data.next_native_offset("topic/a");
Expand All @@ -445,6 +447,7 @@ mod test {
max_outgoing_packet_count: 1024,
custom_segment: None,
initialized_filters: None,
shared_subscriptions_strategy: Strategy::RoundRobin,
};
let mut data = DataLog::new(config).unwrap();
data.next_native_offset("+/+");
Expand Down
2 changes: 2 additions & 0 deletions rumqttd/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod iobufs;
mod logs;
mod routing;
mod scheduler;
pub(crate) mod shared_subs;
mod waiters;

pub use alertlog::Alert;
Expand Down Expand Up @@ -191,6 +192,7 @@ pub struct DataRequest {
pub read_count: usize,
/// Maximum count of payload buffer per replica
max_count: usize,
pub(crate) group: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down