Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::state::models::CreateConsumerGroupWithId;
use crate::streaming::session::Session;
Expand All @@ -40,12 +41,11 @@ impl ServerCommandHandler for CreateConsumerGroup {
async fn handle(
self,
sender: &mut SenderKind,
length: u32,
_length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

let consumer_group_id = shard
.create_consumer_group(
session,
Expand All @@ -60,6 +60,13 @@ impl ServerCommandHandler for CreateConsumerGroup {
self.stream_id, self.topic_id, self.group_id
)
})?;
let event = ShardEvent::CreatedConsumerGroup {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
consumer_group_id: self.group_id,
name: self.name.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

let stream = shard.find_stream(session, &self.stream_id)
.with_error_context(|error| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind};
use crate::shard::transmission::event::ShardEvent;
use crate::shard::IggyShard;
use crate::state::command::EntryCommand;
use crate::state::models::CreatePersonalAccessTokenWithHash;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

let token = shard
let (personal_access_token, token) = shard
.create_personal_access_token(session, &self.name, self.expiry)
.with_error_context(|error| {
format!(
Expand All @@ -56,7 +57,11 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
)
})?;
let bytes = mapper::map_raw_pat(&token);
let token_hash = PersonalAccessToken::hash_token(&token);
let hash = personal_access_token.token.to_string();
let event = ShardEvent::CreatedPersonalAccessToken {
personal_access_token: personal_access_token.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

shard
.state
Expand All @@ -67,7 +72,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
name: self.name.to_owned(),
expiry: self.expiry,
},
hash: token_hash,
hash,
}),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
"{COMPONENT} (error: {error}) - failed to delete personal access token with name: {token_name}, session: {session}"
)})?;

// Broadcast the event to other shards
let event = crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken {
user_id: session.get_user_id(),
name: self.name.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

shard
.state
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::utils::crypto;
Expand All @@ -39,7 +40,7 @@ impl ServerCommandHandler for ChangePassword {
async fn handle(
self,
sender: &mut SenderKind,
length: u32,
_length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
Expand All @@ -59,6 +60,13 @@ impl ServerCommandHandler for ChangePassword {
)
})?;

let event = ShardEvent::ChangedPassword {
user_id: self.user_id.clone(),
current_password: self.current_password.clone(),
new_password: self.new_password.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

// For the security of the system, we hash the password before storing it in metadata.
shard
.state
Expand Down
10 changes: 9 additions & 1 deletion core/server/src/binary/handlers/users/create_user_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use std::rc::Rc;

use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler};
Expand All @@ -31,6 +32,7 @@ use anyhow::Result;
use error_set::ErrContext;
use iggy_common::IggyError;
use iggy_common::create_user::CreateUser;
use tower_http::map_response_body;
use tracing::{debug, instrument};

impl ServerCommandHandler for CreateUser {
Expand All @@ -56,13 +58,19 @@ impl ServerCommandHandler for CreateUser {
self.status,
self.permissions.clone(),
)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to create user with name: {}, session: {session}",
self.username
)
})?;
let event = ShardEvent::CreatedUser {
username: self.username.to_owned(),
password: self.password.to_owned(),
status: self.status,
permissions: self.permissions.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;
let user_id = user.id;
let response = mapper::map_user(&user);

Expand Down
8 changes: 7 additions & 1 deletion core/server/src/binary/handlers/users/delete_user_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
Expand All @@ -39,7 +40,7 @@ impl ServerCommandHandler for DeleteUser {
async fn handle(
self,
sender: &mut SenderKind,
length: u32,
_length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
Expand All @@ -54,6 +55,11 @@ impl ServerCommandHandler for DeleteUser {
)
})?;

let event = ShardEvent::DeletedUser {
user_id: self.user_id.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

let user_id = self.user_id.clone();
shard
.state
Expand Down
7 changes: 6 additions & 1 deletion core/server/src/binary/handlers/users/logout_user_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
Expand All @@ -38,14 +39,18 @@ impl ServerCommandHandler for LogoutUser {
async fn handle(
self,
sender: &mut SenderKind,
length: u32,
_length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
shard.logout_user(session).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to logout user, session: {session}")
})?;
let event = ShardEvent::LogoutUser {
client_id: session.client_id,
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;
session.clear_user_id();
sender.send_empty_ok_response().await?;
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::transmission::event::ShardEvent;
use crate::shard::IggyShard;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
Expand Down Expand Up @@ -50,7 +51,12 @@ impl ServerCommandHandler for UpdatePermissions {
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to update permissions for user_id: {}, session: {session}",
self.user_id
))?;

let event = ShardEvent::UpdatedPermissions {
user_id: self.user_id.clone(),
permissions: self.permissions.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

shard
.state
.apply(
Expand Down
9 changes: 8 additions & 1 deletion core/server/src/binary/handlers/users/update_user_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
Expand Down Expand Up @@ -59,8 +60,14 @@ impl ServerCommandHandler for UpdateUser {
)
})?;

let user_id = self.user_id.clone();
let event = ShardEvent::UpdatedUser {
user_id: self.user_id.clone(),
username: self.username.clone(),
status: self.status,
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

let user_id = self.user_id.clone();
shard
.state
.apply(session.get_user_id(), &EntryCommand::UpdateUser(self))
Expand Down
5 changes: 2 additions & 3 deletions core/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ use server::state::StateKind;
use server::state::command::EntryCommand;
use server::state::file::FileState;
use server::state::models::CreateUserWithId;
use server::state::system::SystemState;
use server::streaming::utils::{MemoryPool, crypto};
use server::streaming::utils::MemoryPool;
use server::versioning::SemanticVersion;
use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
use tokio::time::Instant;
Expand Down Expand Up @@ -183,7 +182,7 @@ fn main() -> Result<(), ServerError> {

// We can't use std::sync::Once because it doesn't support async.
// Trait bound on the closure is FnOnce.
// Peak into the state to check if the root user exists.
// Peek into the state to check if the root user exists.
// If it does not exist, create it.
barrier.with_async::<Result<(), IggyError>>(async |barrier_state| {
// A thread already initialized state
Expand Down
Loading