Skip to content

Commit

Permalink
feat(rumqttd): Shared subscriptions with configurable strategy (#668)
Browse files Browse the repository at this point in the history
Co-authored-by: swanandx <73115739+swanandx@users.noreply.github.com>
  • Loading branch information
Nickztar and swanandx committed Aug 15, 2023
1 parent 13e350e commit 1963b3e
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Subscription IDs in v5 publish packets (#632)
- Shared Subscriptions with configurable strategies (#668)

### Changed

Expand Down
1 change: 1 addition & 0 deletions rumqttd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
clap = { version = "4.2", features = ["derive"] }
axum = "0.6.4"
rand = "0.8.5"

[features]
default = ["use-rustls"]
Expand Down
1 change: 1 addition & 0 deletions rumqttd/rumqttd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ max_connections = 10010
max_outgoing_packet_count = 200
max_segment_size = 104857600
max_segment_count = 10
# shared_subscriptions_strategy = "random" # "sticky" | "roundrobin" ( default ) | "random"
# Any filters that match to configured filter will have custom segment size.
# [router.custom_segment.'/office/+/devices/status']
# max_segment_size = 102400
Expand Down
5 changes: 5 additions & 0 deletions rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub use link::meters;
pub use router::{Alert, IncomingMeter, Meter, Notification, OutgoingMeter};
pub use server::Broker;

use self::router::shared_subs::Strategy;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct Config {
pub id: usize,
Expand Down Expand Up @@ -144,6 +146,9 @@ pub struct RouterConfig {
pub max_segment_count: usize,
pub custom_segment: Option<HashMap<String, SegmentConfig>>,
pub initialized_filters: Option<Vec<Filter>>,
// defaults to Round Robin
#[serde(default)]
pub shared_subscriptions_strategy: Strategy,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
Expand Down
3 changes: 3 additions & 0 deletions rumqttd/src/router/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ impl AckLog {
#[cfg(test)]
mod test {
use super::DataLog;
use crate::router::shared_subs::Strategy;
use crate::RouterConfig;

#[test]
Expand All @@ -426,6 +427,7 @@ mod test {
max_outgoing_packet_count: 1024,
custom_segment: None,
initialized_filters: None,
shared_subscriptions_strategy: Strategy::RoundRobin,
};
let mut data = DataLog::new(config).unwrap();
data.next_native_offset("topic/a");
Expand All @@ -445,6 +447,7 @@ mod test {
max_outgoing_packet_count: 1024,
custom_segment: None,
initialized_filters: None,
shared_subscriptions_strategy: Strategy::RoundRobin,
};
let mut data = DataLog::new(config).unwrap();
data.next_native_offset("+/+");
Expand Down
2 changes: 2 additions & 0 deletions rumqttd/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod iobufs;
mod logs;
mod routing;
mod scheduler;
pub(crate) mod shared_subs;
mod waiters;

pub use alertlog::Alert;
Expand Down Expand Up @@ -191,6 +192,7 @@ pub struct DataRequest {
pub read_count: usize,
/// Maximum count of payload buffer per replica
max_count: usize,
pub(crate) group: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down

0 comments on commit 1963b3e

Please sign in to comment.