Skip to content

Commit

Permalink
feat: Update JWTs to support multiple topics (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
ikornaselur committed Apr 2, 2024
1 parent 19add21 commit b120327
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
4 changes: 2 additions & 2 deletions scripts/sign.py
Expand Up @@ -13,14 +13,14 @@ def main(
private_key_path: str = typer.Option("private_key.pem"),
verbose: bool = typer.Option(False),
scope: Scopes = typer.Argument(),
topic: str = typer.Argument(),
topics: list[str] = typer.Argument(),
):
private_key = open(private_key_path, "r").read()

payload = {
"sub": "notiflux",
"exp": dt.datetime.now(dt.timezone.utc) + dt.timedelta(days=365*100),
"topic": topic,
"topics": topics,
"scope": scope.value,
}
if verbose:
Expand Down
22 changes: 11 additions & 11 deletions src/auth.rs
Expand Up @@ -5,14 +5,14 @@ use serde::{Deserialize, Serialize};
pub struct Claims {
sub: String,
exp: u64,
topic: String,
topics: Vec<String>,
scope: String,
}

#[derive(Debug, PartialEq)]
pub enum Action {
Subscribe(String),
Broadcast(String),
Subscribe(Vec<String>),
Broadcast(Vec<String>),
}

pub fn get_action(token: &str, public_key: &[u8]) -> Result<Action, NotifluxError> {
Expand All @@ -30,14 +30,14 @@ pub fn get_action(token: &str, public_key: &[u8]) -> Result<Action, NotifluxErro
let Claims {
sub: _,
exp: _,
topic,
topics,
scope,
} = token_data.claims;

if scope == "subscribe" {
Ok(Action::Subscribe(topic))
Ok(Action::Subscribe(topics))
} else if scope == "broadcast" {
Ok(Action::Broadcast(topic))
Ok(Action::Broadcast(topics))
} else {
Err(NotifluxError {
message: Some(format!("Invalid scope: {}", scope)),
Expand All @@ -54,29 +54,29 @@ mod tests {
fn test_get_action_subscribe() {
let public_key = include_bytes!("../scripts/public_key.pem");

let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NDk4ODU3NiwidG9waWMiOiJmb28iLCJzY29wZSI6InN1YnNjcmliZSJ9.1sZDe6V5ccJEALFeuHQe4R0D_t35t9c1s3QP3odFxIPxxdGXJOq2G8BgrMpqO3bu4n_q0GmnbFyY7LXVgLJbPw";
let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NTY3ODI1NSwidG9waWNzIjpbImZvbyJdLCJzY29wZSI6InN1YnNjcmliZSJ9.qUIcgWAUOjG9QUvifJoAuxjFY8kwoHI-h3XrX3a_sDm3NXin4WYZIHmUN_c5XkfpHCWjOefMWQ8IplIZaj0PeA";

let action = get_action(token, public_key).unwrap();

assert_eq!(action, Action::Subscribe("foo".to_owned()));
assert_eq!(action, Action::Subscribe(vec!["foo".to_owned()]));
}

#[test]
fn test_get_action_broadcast() {
let public_key = include_bytes!("../scripts/public_key.pem");

let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NDk4ODU4OCwidG9waWMiOiJiYXIiLCJzY29wZSI6ImJyb2FkY2FzdCJ9.KEk-9_i6Z17P1cB2m4_pt_LJrvhg2X4OrYWoqBVgvA0AtmcKyCOZcwUQiuoZ8rFwjvj9_KiFWK5hE-bRRnfQsA";
let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NTY3ODMzMCwidG9waWNzIjpbImJhciJdLCJzY29wZSI6ImJyb2FkY2FzdCJ9.izDZtXaKXtUSaRPrCozPiy2nuHmdOH0djGAavhVxUszcUNAeD8_d2ndMDHNYZEs4w49cnZQTqCLsF13ksW2gzA";

let action = get_action(token, public_key).unwrap();

assert_eq!(action, Action::Broadcast("bar".to_owned()));
assert_eq!(action, Action::Broadcast(vec!["bar".to_owned()]));
}

#[test]
fn test_get_action_invalid_scope() {
let public_key = include_bytes!("../scripts/public_key.pem");

let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NDk4ODcxMywidG9waWMiOiJiYXIiLCJzY29wZSI6ImludmFsaWQifQ.rxmGLj6ykIRRUVaZMj4tzQ2Gf12yQdEdRBy_kVdesYTPCFVCVSP7G-o-JoRwcX1dAAwryt-b3nuwXTVGy_ge4w";
let token = "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJub3RpZmx1eCIsImV4cCI6NDg2NTY3ODM0OCwidG9waWNzIjpbImJhciJdLCJzY29wZSI6ImludmFsaWQifQ.X2Rjy1X0xojvIVIAimqqq3wPTy_Kv33BChzwx5wMQluhuvltzXTcfNug0bAGxmzRjogqcLKxJpqMauI_1oUt6Q";

let action = get_action(token, public_key);

Expand Down
8 changes: 4 additions & 4 deletions src/server.rs
Expand Up @@ -44,8 +44,8 @@ impl Handler<message::Broadcast> for Server {
log::debug!("handling Broadcast: {:?}", msg);

match get_action(&msg.token, &self.jwt_public_key) {
Ok(Action::Broadcast(topic)) => {
if topic == msg.topic {
Ok(Action::Broadcast(topics)) => {
if topics.contains(&msg.topic) {
log::debug!("Broadcasting message to topic: {}", msg.topic);
self.broadcast(&msg.topic, &msg.msg);
} else {
Expand Down Expand Up @@ -82,8 +82,8 @@ impl Handler<message::SubscribeToTopic> for Server {
log::debug!("{:?} subscribing topic {}", msg.id, msg.topic);

match get_action(&msg.token, &self.jwt_public_key) {
Ok(Action::Subscribe(topic)) => {
if topic == msg.topic {
Ok(Action::Subscribe(topics)) => {
if topics.contains(&msg.topic) {
log::debug!("{:?} is allowed to subscribe topic {}", msg.id, msg.topic);
self.topics
.entry(msg.topic.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/session.rs
Expand Up @@ -104,7 +104,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
let args: Vec<&str> = m.splitn(2, ' ').collect();
match args[..] {
["/subscribe", sub_args] => {
let sub_args: Vec<&str> = sub_args.splitn(2, ' ').collect();
let sub_args: Vec<&str> = sub_args.split(' ').collect();
if let [topic, token] = sub_args[..] {
self.addr.do_send(message::SubscribeToTopic {
id: self.id,
Expand Down

0 comments on commit b120327

Please sign in to comment.