diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index e6e1f2396f..51d60b0c04 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; use crate::streaming::session::Session; @@ -40,12 +41,11 @@ impl ServerCommandHandler for CreateConsumerGroup { async fn handle( self, sender: &mut SenderKind, - length: u32, + _length: u32, session: &Rc, shard: &Rc, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let consumer_group_id = shard .create_consumer_group( session, @@ -60,6 +60,13 @@ impl ServerCommandHandler for CreateConsumerGroup { self.stream_id, self.topic_id, self.group_id ) })?; + let event = ShardEvent::CreatedConsumerGroup { + stream_id: self.stream_id.clone(), + topic_id: self.topic_id.clone(), + consumer_group_id: self.group_id, + name: self.name.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; let stream = shard.find_stream(session, &self.stream_id) .with_error_context(|error| { diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index c28ac77b42..bfeda1641f 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; +use crate::shard::transmission::event::ShardEvent; use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; @@ -47,7 +48,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken { ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let token = shard + let (personal_access_token, token) = shard .create_personal_access_token(session, &self.name, self.expiry) .with_error_context(|error| { format!( @@ -56,7 +57,11 @@ impl ServerCommandHandler for CreatePersonalAccessToken { ) })?; let bytes = mapper::map_raw_pat(&token); - let token_hash = PersonalAccessToken::hash_token(&token); + let hash = personal_access_token.token.to_string(); + let event = ShardEvent::CreatedPersonalAccessToken { + personal_access_token: personal_access_token.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; shard .state @@ -67,7 +72,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken { name: self.name.to_owned(), expiry: self.expiry, }, - hash: token_hash, + hash, }), ) .await diff --git a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index bb8dd46324..13a06e55c0 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -51,6 +51,13 @@ impl ServerCommandHandler for DeletePersonalAccessToken { "{COMPONENT} (error: {error}) - failed to delete personal access token with name: {token_name}, session: {session}" )})?; + // Broadcast the event to other shards + let event = crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken { + user_id: session.get_user_id(), + name: self.name.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + shard .state .apply( diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index 32b376cda5..1e89aeb09d 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::utils::crypto; @@ -39,7 +40,7 @@ impl ServerCommandHandler for ChangePassword { async fn handle( self, sender: &mut SenderKind, - length: u32, + _length: u32, session: &Rc, shard: &Rc, ) -> Result<(), IggyError> { @@ -59,6 +60,13 @@ impl ServerCommandHandler for ChangePassword { ) })?; + let event = ShardEvent::ChangedPassword { + user_id: self.user_id.clone(), + current_password: self.current_password.clone(), + new_password: self.new_password.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + // For the security of the system, we hash the password before storing it in metadata. shard .state diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index e51b147b8b..77ce6249e5 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -17,6 +17,7 @@ */ use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use std::rc::Rc; use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; @@ -31,6 +32,7 @@ use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_user::CreateUser; +use tower_http::map_response_body; use tracing::{debug, instrument}; impl ServerCommandHandler for CreateUser { @@ -56,13 +58,19 @@ impl ServerCommandHandler for CreateUser { self.status, self.permissions.clone(), ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to create user with name: {}, session: {session}", self.username ) })?; + let event = ShardEvent::CreatedUser { + username: self.username.to_owned(), + password: self.password.to_owned(), + status: self.status, + permissions: self.permissions.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; let user_id = user.id; let response = mapper::map_user(&user); diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index 10de56a458..cd2f082c42 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -39,7 +40,7 @@ impl ServerCommandHandler for DeleteUser { async fn handle( self, sender: &mut SenderKind, - length: u32, + _length: u32, session: &Rc, shard: &Rc, ) -> Result<(), IggyError> { @@ -54,6 +55,11 @@ impl ServerCommandHandler for DeleteUser { ) })?; + let event = ShardEvent::DeletedUser { + user_id: self.user_id.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + let user_id = self.user_id.clone(); shard .state diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs b/core/server/src/binary/handlers/users/logout_user_handler.rs index d1ef95bf27..cfa37cd55e 100644 --- a/core/server/src/binary/handlers/users/logout_user_handler.rs +++ b/core/server/src/binary/handlers/users/logout_user_handler.rs @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::session::Session; use anyhow::Result; use error_set::ErrContext; @@ -38,7 +39,7 @@ impl ServerCommandHandler for LogoutUser { async fn handle( self, sender: &mut SenderKind, - length: u32, + _length: u32, session: &Rc, shard: &Rc, ) -> Result<(), IggyError> { @@ -46,6 +47,10 @@ impl ServerCommandHandler for LogoutUser { shard.logout_user(session).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to logout user, session: {session}") })?; + let event = ShardEvent::LogoutUser { + client_id: session.client_id, + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; session.clear_user_id(); sender.send_empty_ok_response().await?; Ok(()) diff --git a/core/server/src/binary/handlers/users/update_permissions_handler.rs b/core/server/src/binary/handlers/users/update_permissions_handler.rs index dea49a1ca3..d28d381258 100644 --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@ -21,6 +21,7 @@ use std::rc::Rc; use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::transmission::event::ShardEvent; use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; @@ -50,7 +51,12 @@ impl ServerCommandHandler for UpdatePermissions { .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to update permissions for user_id: {}, session: {session}", self.user_id ))?; - + let event = ShardEvent::UpdatedPermissions { + user_id: self.user_id.clone(), + permissions: self.permissions.clone(), + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + shard .state .apply( diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index 982c4ae142..b90cb44279 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -59,8 +60,14 @@ impl ServerCommandHandler for UpdateUser { ) })?; - let user_id = self.user_id.clone(); + let event = ShardEvent::UpdatedUser { + user_id: self.user_id.clone(), + username: self.username.clone(), + status: self.status, + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + let user_id = self.user_id.clone(); shard .state .apply(session.get_user_id(), &EntryCommand::UpdateUser(self)) diff --git a/core/server/src/main.rs b/core/server/src/main.rs index e01f03ecd7..2971347e25 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -48,8 +48,7 @@ use server::state::StateKind; use server::state::command::EntryCommand; use server::state::file::FileState; use server::state::models::CreateUserWithId; -use server::state::system::SystemState; -use server::streaming::utils::{MemoryPool, crypto}; +use server::streaming::utils::MemoryPool; use server::versioning::SemanticVersion; use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str}; use tokio::time::Instant; @@ -183,7 +182,7 @@ fn main() -> Result<(), ServerError> { // We can't use std::sync::Once because it doesn't support async. // Trait bound on the closure is FnOnce. - // Peak into the state to check if the root user exists. + // Peek into the state to check if the root user exists. // If it does not exist, create it. barrier.with_async::>(async |barrier_state| { // A thread already initialized state diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index a8158bc15f..ecbe2be140 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -64,7 +64,6 @@ use crate::{ streaming::{ clients::client_manager::ClientManager, diagnostics::metrics::Metrics, - partitions::partition, personal_access_tokens::personal_access_token::PersonalAccessToken, session::Session, storage::SystemStorage, @@ -512,240 +511,271 @@ impl IggyShard { async fn handle_event(&self, event: Arc) -> Result<(), IggyError> { match &*event { ShardEvent::CreatedStream { stream_id, name } => { - self.create_stream_bypass_auth(*stream_id, name) - } + self.create_stream_bypass_auth(*stream_id, name) + } ShardEvent::CreatedTopic { - stream_id, - topic_id, - name, - partitions_count, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - } => { - let topic_id = topic_id.get_u32_value().ok(); - self.create_topic_bypass_auth( - stream_id, - topic_id, - name, - *partitions_count, - *message_expiry, - *compression_algorithm, - *max_topic_size, - *replication_factor, - ) - .await - } + stream_id, + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + } => { + let topic_id = topic_id.get_u32_value().ok(); + self.create_topic_bypass_auth( + stream_id, + topic_id, + name, + *partitions_count, + *message_expiry, + *compression_algorithm, + *max_topic_size, + *replication_factor, + ) + .await + } ShardEvent::LoginUser { - client_id, - username, - password, - } => self.login_user_event(*client_id, username, password), + client_id, + username, + password, + } => self.login_user_event(*client_id, username, password), ShardEvent::NewSession { address, transport } => { - let session = self.add_client(address, *transport); - self.add_active_session(session); - Ok(()) - } + let session = self.add_client(address, *transport); + self.add_active_session(session); + Ok(()) + } ShardEvent::CreatedShardTableRecords { - stream_id, - topic_id, - partition_ids, - } => { - let records = self - .create_shard_table_records(&partition_ids, *stream_id, *topic_id) - .collect::>(); - let stream = self.get_stream(&Identifier::numeric(*stream_id)?)?; - let topic = stream.get_topic(&Identifier::numeric(*topic_id)?)?; - // Open partition and segments for that particular shard. - for (ns, shard_info) in records.iter() { - if shard_info.id() == self.id { - let partition_id = ns.partition_id; - let partition = topic.get_partition(partition_id)?; - let mut partition = partition.write().await; - partition.open().await.with_error_context(|error| { + stream_id, + topic_id, + partition_ids, + } => { + let records = self + .create_shard_table_records(&partition_ids, *stream_id, *topic_id) + .collect::>(); + let stream = self.get_stream(&Identifier::numeric(*stream_id)?)?; + let topic = stream.get_topic(&Identifier::numeric(*topic_id)?)?; + // Open partition and segments for that particular shard. + for (ns, shard_info) in records.iter() { + if shard_info.id() == self.id { + let partition_id = ns.partition_id; + let partition = topic.get_partition(partition_id)?; + let mut partition = partition.write().await; + partition.open().await.with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to open partition with ID: {partition_id} in topic with ID: {topic_id} for stream with ID: {stream_id}" + ) + })?; + } + } + self.insert_shard_table_records(records); + Ok(()) + } + ShardEvent::CreatedPartitions { + stream_id, + topic_id, + partitions_count, + } => { + let mut stream = self.get_stream_mut(stream_id)?; + let topic = stream.get_topic_mut(topic_id)?; + topic.add_persisted_partitions(*partitions_count) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to create partitions for topic with ID: {topic_id} in stream with ID: {stream_id}" + ) + })?; + topic.reassign_consumer_groups(); + self.metrics.increment_partitions(*partitions_count); + self.metrics.increment_segments(*partitions_count); + Ok(()) + } + ShardEvent::DeletedPartitions { + stream_id, + topic_id, + partition_ids, + } => { + let mut stream = self.get_stream_mut(stream_id)?; + let topic = stream.get_topic_mut(topic_id)?; + let partitions = topic + . delete_persisted_partitions_by_ids(partition_ids) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to delete persisted partitions for topic: {topic}") + })?; + drop(stream); + + let mut segments_count = 0; + let mut messages_count = 0; + let partitions_count = partitions.len(); + for partition in &partitions { + let partition = partition.read().await; + let partition_messages_count = partition.get_messages_count(); + segments_count += partition.get_segments_count(); + messages_count += partition_messages_count; + } + + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}" + ) + })?; + let topic = stream.get_topic_mut(topic_id).with_error_context(|error| { format!( - "{COMPONENT} (error: {error}) - failed to open partition with ID: {partition_id} in topic with ID: {topic_id} for stream with ID: {stream_id}" + "{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id}" ) })?; + topic.reassign_consumer_groups(); + if partitions.len() > 0 { + self.metrics.decrement_partitions(partitions_count as u32); + self.metrics.decrement_segments(segments_count); + self.metrics.decrement_messages(messages_count); + } + Ok(()) } - } - self.insert_shard_table_records(records); - Ok(()) - } - ShardEvent::CreatedPartitions { - stream_id, - topic_id, - partitions_count, - } => { - let mut stream = self.get_stream_mut(stream_id)?; - let topic = stream.get_topic_mut(topic_id)?; - topic.add_persisted_partitions(*partitions_count) - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to create partitions for topic with ID: {topic_id} in stream with ID: {stream_id}" - ) - })?; - topic.reassign_consumer_groups(); - self.metrics.increment_partitions(*partitions_count); - self.metrics.increment_segments(*partitions_count); - Ok(()) - } - ShardEvent::DeletedPartitions { - stream_id, - topic_id, - partition_ids, - } => { - let mut stream = self.get_stream_mut(stream_id)?; - let topic = stream.get_topic_mut(topic_id)?; - let partitions = topic - . delete_persisted_partitions_by_ids(partition_ids) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to delete persisted partitions for topic: {topic}") - })?; - drop(stream); - - let mut segments_count = 0; - let mut messages_count = 0; - let partitions_count = partitions.len(); - for partition in &partitions { - let partition = partition.read().await; - let partition_messages_count = partition.get_messages_count(); - segments_count += partition.get_segments_count(); - messages_count += partition_messages_count; - } - - let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}" - ) - })?; - let topic = stream.get_topic_mut(topic_id).with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id}" - ) - })?; - topic.reassign_consumer_groups(); - if partitions.len() > 0 { - self.metrics.decrement_partitions(partitions_count as u32); - self.metrics.decrement_segments(segments_count); - self.metrics.decrement_messages(messages_count); - } - Ok(()) - } ShardEvent::DeletedShardTableRecords { namespaces } => { - let (stream_id, topic_id) = namespaces - .first() - .map(|ns| (ns.stream_id, ns.topic_id)) - .unwrap(); - let stream = self.get_stream(&Identifier::numeric(stream_id).unwrap())?; - let topic = stream.get_topic(&Identifier::numeric(topic_id).unwrap())?; - let records = self.remove_shard_table_records(&namespaces); - for (ns, shard_info) in records.iter() { - if shard_info.id() == self.id { - let partition = topic.get_partition(ns.partition_id)?; - let mut partition = partition.write().await; - partition.delete().await.with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to delete partition with ID: {} in topic with ID: {}", - ns.partition_id, - topic_id - ) - })?; + let (stream_id, topic_id) = namespaces + .first() + .map(|ns| (ns.stream_id, ns.topic_id)) + .unwrap(); + let stream = self.get_stream(&Identifier::numeric(stream_id).unwrap())?; + let topic = stream.get_topic(&Identifier::numeric(topic_id).unwrap())?; + let records = self.remove_shard_table_records(&namespaces); + for (ns, shard_info) in records.iter() { + if shard_info.id() == self.id { + let partition = topic.get_partition(ns.partition_id)?; + let mut partition = partition.write().await; + partition.delete().await.with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete partition with ID: {} in topic with ID: {}", + ns.partition_id, + topic_id + ) + })?; + } + } + Ok(()) } - } - Ok(()) - } ShardEvent::DeletedStream { stream_id } => { - let shard_id = self.id; - self.delete_stream_bypass_auth(stream_id).with_error_context(|err| { - format!( - "{COMPONENT} (error: {err}) - failed to delete, when handling event on shard: {shard_id} stream with ID: {stream_id}", - ) - })?; - Ok(()) - } + let shard_id = self.id; + self.delete_stream_bypass_auth(stream_id).with_error_context(|err| { + format!( + "{COMPONENT} (error: {err}) - failed to delete, when handling event on shard: {shard_id} stream with ID: {stream_id}", + ) + })?; + Ok(()) + } ShardEvent::UpdatedStream { stream_id, name } => { - self.update_stream_bypass_auth(stream_id, name)?; - Ok(()) - } + self.update_stream_bypass_auth(stream_id, name)?; + Ok(()) + } ShardEvent::UpdatedTopic { - stream_id, - topic_id, - name, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - } => { - self.update_topic_bypass_auth( - stream_id, - topic_id, - name, - *message_expiry, - *compression_algorithm, - *max_topic_size, - *replication_factor, - ) - .await?; - Ok(()) - } + stream_id, + topic_id, + name, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + } => { + self.update_topic_bypass_auth( + stream_id, + topic_id, + name, + *message_expiry, + *compression_algorithm, + *max_topic_size, + *replication_factor, + ) + .await?; + Ok(()) + } ShardEvent::PurgedStream { stream_id: _ } => todo!(), - ShardEvent::CreatedConsumerGroup { - stream_id: _, - topic_id: _, - consumer_group_id: _, - name: _, - } => todo!(), - ShardEvent::DeletedConsumerGroup { - stream_id: _, - topic_id: _, - consumer_group_id: _, - } => todo!(), ShardEvent::PurgedTopic { - stream_id: _, - topic_id: _, - } => todo!(), + stream_id: _, + topic_id: _, + } => todo!(), ShardEvent::DeletedTopic { - stream_id, - topic_id, - } => { - self.delete_topic_bypass_auth(stream_id, topic_id) - .await - .with_error_context(|err| { - format!( - "{COMPONENT} (error: {err}) - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}" - ) - })?; - Ok(()) - } + stream_id, + topic_id, + } => { + self.delete_topic_bypass_auth(stream_id, topic_id) + .await + .with_error_context(|err| { + format!( + "{COMPONENT} (error: {err}) - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}" + ) + })?; + Ok(()) + } + ShardEvent::CreatedConsumerGroup { + stream_id, + topic_id, + consumer_group_id, + name, + } => self.create_consumer_group_bypass_auth( + stream_id, + topic_id, + *consumer_group_id, + name, + ), + ShardEvent::DeletedConsumerGroup { + stream_id, + topic_id, + consumer_group_id, + } => { + self.delete_consumer_group_bypass_auth(stream_id, topic_id, consumer_group_id)?; + Ok(()) + } ShardEvent::CreatedUser { - username: _, - password: _, - status: _, - permissions: _, - } => todo!(), - ShardEvent::DeletedUser { user_id: _ } => todo!(), - ShardEvent::LogoutUser { client_id: _ } => todo!(), - ShardEvent::UpdatedUser { - user_id: _, - username: _, - status: _, - } => todo!(), + username, + password, + status, + permissions, + } => { + self.create_user_bypass_auth(username, password, *status, permissions.clone())?; + Ok(()) + } + ShardEvent::DeletedUser { user_id } => { + self.delete_user_bypass_auth(user_id)?; + Ok(()) + } + ShardEvent::LogoutUser { client_id } => { + let sessions = self.active_sessions.borrow(); + let session = sessions.iter().find(|s| s.client_id == *client_id).unwrap(); + self.logout_user(session)?; + self.remove_active_session(session.get_user_id()); + + Ok(()) + } ShardEvent::ChangedPassword { - user_id: _, - current_password: _, - new_password: _, - } => todo!(), - ShardEvent::CreatedPersonalAccessToken { name: _, expiry: _ } => todo!(), - ShardEvent::DeletedPersonalAccessToken { name: _ } => todo!(), + user_id, + current_password, + new_password, + } => { + self.change_password_bypass_auth(user_id, current_password, new_password)?; + Ok(()) + } + ShardEvent::CreatedPersonalAccessToken {personal_access_token} => { + self.create_personal_access_token_bypass_auth(personal_access_token.to_owned())?; + Ok(()) + }, + ShardEvent::DeletedPersonalAccessToken { user_id, name } => { + self.delete_personal_access_token_bypass_auth(*user_id, name)?; + Ok(()) + }, ShardEvent::LoginWithPersonalAccessToken { token: _ } => todo!(), - ShardEvent::StoredConsumerOffset { - stream_id: _, - topic_id: _, - consumer: _, - offset: _, - } => todo!(), + ShardEvent::UpdatedUser { + user_id, + username, + status, + } => { + self.update_user_bypass_auth(user_id, username.to_owned(), *status)?; + Ok(()) + }, + ShardEvent::UpdatedPermissions { user_id, permissions } => { + self.update_permissions_bypass_auth(user_id, permissions.to_owned())?; + Ok(()) + } } } @@ -779,9 +809,13 @@ impl IggyShard { } } - pub async fn broadcast_event_to_all_shards(&self, event: Arc) -> Vec { + pub async fn broadcast_event_to_all_shards( + &self, + event: Arc, + ) -> Vec { let mut responses = Vec::with_capacity(self.get_available_shards_count() as usize); - for maybe_receiver in self.shards + for maybe_receiver in self + .shards .iter() .filter_map(|shard| { if shard.id != self.id { @@ -801,18 +835,19 @@ impl IggyShard { conn.send(ShardFrame::new(event.clone().into(), None)); None } - }) { - match maybe_receiver { - Some(receiver) => { - let response = receiver.recv().await.unwrap(); - responses.push(response); - } - None => { - responses.push(ShardResponse::Event); - } + }) + { + match maybe_receiver { + Some(receiver) => { + let response = receiver.recv().await.unwrap(); + responses.push(response); + } + None => { + responses.push(ShardResponse::Event); } } - responses + } + responses } fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> { @@ -866,6 +901,20 @@ impl IggyShard { self.active_sessions.borrow_mut().push(session); } + pub fn remove_active_session(&self, user_id: u32) { + let mut active_sessions = self.active_sessions.borrow_mut(); + let pos = active_sessions + .iter() + .position(|s| s.get_user_id() == user_id); + if let Some(pos) = pos { + active_sessions.remove(pos); + } else { + error!( + "{COMPONENT} - failed to remove active session for user ID: {user_id}, session not found." + ); + } + } + pub fn ensure_authenticated(&self, session: &Session) -> Result { let active_sessions = self.active_sessions.borrow(); let user_id = active_sessions diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 7b98205f83..96109b8526 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -26,6 +26,7 @@ use crate::streaming::topics::consumer_group::ConsumerGroup; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; +use iggy_common::locking::IggySharedMutFn; impl IggyShard { pub fn get_consumer_group<'cg, 'stream>( @@ -85,6 +86,17 @@ impl IggyShard { Ok(topic.get_consumer_groups()) } + pub fn create_consumer_group_bypass_auth( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: Option, + name: &str, + ) -> Result<(), IggyError> { + self.create_consumer_group_base(stream_id, topic_id, group_id, name)?; + Ok(()) + } + pub fn create_consumer_group( &self, session: &Session, @@ -109,7 +121,16 @@ impl IggyShard { topic.topic_id, ).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to create consumer group for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?; } + self.create_consumer_group_base(stream_id, topic_id, group_id, name) + } + fn create_consumer_group_base( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: Option, + name: &str, + ) -> Result { let mut stream = self.get_stream_mut(stream_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}"))?; let topic = stream.get_topic_mut(topic_id) @@ -122,6 +143,15 @@ impl IggyShard { }) } + pub fn delete_consumer_group_bypass_auth( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + consumer_group_id: &Identifier, + ) -> Result { + self.delete_consumer_group_base(stream_id, topic_id, consumer_group_id) + } + pub async fn delete_consumer_group( &self, session: &Session, @@ -130,8 +160,6 @@ impl IggyShard { consumer_group_id: &Identifier, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let stream_id_value; - let topic_id_value; { let stream = self.get_stream(stream_id).with_error_context(|error| { format!( @@ -146,11 +174,32 @@ impl IggyShard { topic.stream_id, topic.topic_id, ).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to delete consumer group for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?; - - stream_id_value = topic.stream_id; - topic_id_value = topic.topic_id; } + let cg = self.delete_consumer_group_base(stream_id, topic_id, consumer_group_id)?; + let stream = self.get_stream(stream_id)?; + let topic = stream.get_topic(topic_id)?; + + for (_, partition) in topic.partitions.iter() { + let partition = partition.read().await; + if let Some((_, offset)) = partition.consumer_group_offsets.remove(&cg.group_id) { + self.storage + .partition + .delete_consumer_offset(&offset.path) + .await?; + } + } + + Ok(()) + } + fn delete_consumer_group_base( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + consumer_group_id: &Identifier, + ) -> Result { + let stream_id_value; + let topic_id_value; let consumer_group; { let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { @@ -161,8 +210,9 @@ impl IggyShard { let topic = stream.get_topic_mut(topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; consumer_group = topic.delete_consumer_group(consumer_group_id) - .await - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete consumer group with ID: {consumer_group_id}"))? + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete consumer group with ID: {consumer_group_id}"))?; + stream_id_value = topic.stream_id; + topic_id_value = topic.topic_id; } for member in consumer_group.get_members() { @@ -174,8 +224,7 @@ impl IggyShard { ) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to make client leave consumer group for client ID: {}, group ID: {}", member.id, consumer_group.group_id))?; } - - Ok(()) + Ok(consumer_group) } pub fn join_consumer_group( diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 8cc89cdcd9..8448cc9d3b 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -58,7 +58,7 @@ impl IggyShard { session: &Session, name: &str, expiry: IggyExpiry, - ) -> Result { + ) -> Result<(PersonalAccessToken, String), IggyError> { self.ensure_authenticated(session)?; let user_id = session.get_user_id(); let identifier = user_id.try_into()?; @@ -78,6 +78,27 @@ impl IggyShard { } } + let (personal_access_token, token) = + PersonalAccessToken::new(user_id, name, IggyTimestamp::now(), expiry); + self.create_personal_access_token_base(personal_access_token.clone())?; + Ok((personal_access_token, token)) + } + + pub fn create_personal_access_token_bypass_auth( + &self, + personal_access_token: PersonalAccessToken, + ) -> Result<(), IggyError> { + self.create_personal_access_token_base(personal_access_token) + } + + fn create_personal_access_token_base( + &self, + personal_access_token: PersonalAccessToken, + ) -> Result<(), IggyError> { + let user_id = personal_access_token.user_id; + let name = personal_access_token.name.clone(); + let token_hash = personal_access_token.token.clone(); + let identifier = user_id.try_into()?; let user = self.get_user_mut(&identifier).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; @@ -85,22 +106,20 @@ impl IggyShard { if user .personal_access_tokens .iter() - .any(|pat| pat.name.as_str() == name) + .any(|pat| pat.name.as_str() == name.as_str()) { error!("Personal access token: {name} for user with ID: {user_id} already exists."); return Err(IggyError::PersonalAccessTokenAlreadyExists( - name.to_owned(), + name.to_string(), user_id, )); } info!("Creating personal access token: {name} for user with ID: {user_id}..."); - let (personal_access_token, token) = - PersonalAccessToken::new(user_id, name, IggyTimestamp::now(), expiry); user.personal_access_tokens - .insert(personal_access_token.token.clone(), personal_access_token); + .insert(token_hash, personal_access_token); info!("Created personal access token: {name} for user with ID: {user_id}."); - Ok(token) + Ok(()) } pub fn delete_personal_access_token( @@ -110,6 +129,22 @@ impl IggyShard { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; let user_id = session.get_user_id(); + self.delete_personal_access_token_base(user_id, name) + } + + pub fn delete_personal_access_token_bypass_auth( + &self, + user_id: u32, + name: &str, + ) -> Result<(), IggyError> { + self.delete_personal_access_token_base(user_id, name) + } + + fn delete_personal_access_token_base( + &self, + user_id: u32, + name: &str, + ) -> Result<(), IggyError> { let user = self .get_user_mut(&user_id.try_into()?) .with_error_context(|error| { diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index f0e87fc0ef..0f92a9122f 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -143,7 +143,7 @@ impl IggyShard { Ok(self.users.borrow().values().cloned().collect()) } - pub async fn create_user( + pub fn create_user( &self, session: &Session, username: &str, @@ -162,6 +162,31 @@ impl IggyShard { ) })?; + let user_id = self.create_user_base(username, password, status, permissions)?; + self.get_user(&user_id.try_into()?) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") + }) + } + + pub fn create_user_bypass_auth( + &self, + username: &str, + password: &str, + status: UserStatus, + permissions: Option, + ) -> Result { + let user_id = self.create_user_base(username, password, status, permissions)?; + Ok(user_id) + } + + fn create_user_base( + &self, + username: &str, + password: &str, + status: UserStatus, + permissions: Option, + ) -> Result { if self .users .borrow() @@ -186,26 +211,32 @@ impl IggyShard { self.users.borrow_mut().insert(user.id, user); info!("Created user: {username} with ID: {user_id}."); self.metrics.increment_users(1); - self.get_user(&user_id.try_into()?) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") - }) + Ok(user_id) } pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> Result { self.ensure_authenticated(session)?; + self.permissioner + .borrow() + .delete_user(session.get_user_id()) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - permission denied to delete user for user with id: {}", + session.get_user_id() + ) + })?; + + self.delete_user_base(user_id) + } + + pub fn delete_user_bypass_auth(&self, user_id: &Identifier) -> Result { + self.delete_user_base(user_id) + } + + fn delete_user_base(&self, user_id: &Identifier) -> Result { let existing_user_id; let existing_username; { - self.permissioner - .borrow() - .delete_user(session.get_user_id()) - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - permission denied to delete user for user with id: {}", - session.get_user_id() - ) - })?; let user = self.get_user(user_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") })?; @@ -257,6 +288,24 @@ impl IggyShard { ) })?; + self.update_user_base(user_id, username, status) + } + + pub fn update_user_bypass_auth( + &self, + user_id: &Identifier, + username: Option, + status: Option, + ) -> Result { + self.update_user_base(user_id, username, status) + } + + fn update_user_base( + &self, + user_id: &Identifier, + username: Option, + status: Option, + ) -> Result { if let Some(username) = username.to_owned() { let user = self.get_user(user_id)?; let existing_user = self.get_user(&username.to_owned().try_into()?); @@ -310,7 +359,29 @@ impl IggyShard { error!("Cannot change the root user permissions."); return Err(IggyError::CannotChangePermissions(user.id)); } + } + self.update_permissions_base(user_id, permissions) + } + + pub fn update_permissions_bypass_auth( + &self, + user_id: &Identifier, + permissions: Option, + ) -> Result<(), IggyError> { + self.update_permissions_base(user_id, permissions) + } + + fn update_permissions_base( + &self, + user_id: &Identifier, + permissions: Option, + ) -> Result<(), IggyError> { + { + let user: User = self.get_user(user_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") + })?; + self.permissioner .borrow_mut() .update_permissions_for_user(user.id, permissions.clone()); @@ -353,6 +424,24 @@ impl IggyShard { } } + self.change_password_base(user_id, current_password, new_password) + } + + pub fn change_password_bypass_auth( + &self, + user_id: &Identifier, + current_password: &str, + new_password: &str, + ) -> Result<(), IggyError> { + self.change_password_base(user_id, current_password, new_password) + } + + fn change_password_base( + &self, + user_id: &Identifier, + current_password: &str, + new_password: &str, + ) -> Result<(), IggyError> { let mut user = self.get_user_mut(user_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; @@ -456,25 +545,29 @@ impl IggyShard { pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> { self.ensure_authenticated(session)?; + let client_id = session.client_id; + let user_id = session.get_user_id(); + self.logout_user_base(user_id, client_id)?; + Ok(()) + } + + fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(), IggyError> { let user = self - .get_user(&Identifier::numeric(session.get_user_id())?) + .get_user(&Identifier::numeric(user_id)?) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get user with id: {}", - session.get_user_id() + user_id, ) })?; info!( "Logging out user: {} with ID: {}...", user.username, user.id ); - if session.client_id > 0 { + if client_id > 0 { let mut client_manager = self.client_manager.borrow_mut(); - client_manager.clear_user_id(session.client_id)?; - info!( - "Cleared user ID: {} for client: {}.", - user.id, session.client_id - ); + client_manager.clear_user_id(client_id)?; + info!("Cleared user ID: {} for client: {}.", user.id, client_id); } info!("Logged out user: {} with ID: {}.", user.username, user.id); Ok(()) diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 02daba5e35..3b62d3d261 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -6,7 +6,7 @@ use iggy_common::{ use crate::{ shard::namespace::IggyNamespace, - streaming::{clients::client_manager::Transport, polling_consumer::PollingConsumer}, + streaming::{clients::client_manager::Transport, polling_consumer::PollingConsumer, personal_access_tokens::personal_access_token::PersonalAccessToken}, }; #[derive(Debug)] @@ -87,6 +87,10 @@ pub enum ShardEvent { status: UserStatus, permissions: Option, }, + UpdatedPermissions { + user_id: Identifier, + permissions: Option, + }, DeletedUser { user_id: Identifier, }, @@ -109,21 +113,15 @@ pub enum ShardEvent { new_password: String, }, CreatedPersonalAccessToken { - name: String, - expiry: IggyExpiry, + personal_access_token: PersonalAccessToken, }, DeletedPersonalAccessToken { + user_id: u32, name: String, }, LoginWithPersonalAccessToken { token: String, }, - StoredConsumerOffset { - stream_id: Identifier, - topic_id: Identifier, - consumer: PollingConsumer, - offset: u64, - }, NewSession { address: SocketAddr, transport: Transport, diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index 79c7f9f399..ccb8b69f32 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -216,10 +216,7 @@ impl Topic { Ok(Identifier::numeric(id)?) } - pub async fn delete_consumer_group( - &mut self, - id: &Identifier, - ) -> Result { + pub fn delete_consumer_group(&mut self, id: &Identifier) -> Result { let group_id; { let consumer_group = self.get_consumer_group(id).with_error_context(|error| { @@ -244,16 +241,6 @@ impl Topic { .store(group_id, Ordering::SeqCst); } - for (_, partition) in self.partitions.iter() { - let partition = partition.read().await; - if let Some((_, offset)) = partition.consumer_group_offsets.remove(&group_id) { - self.storage - .partition - .delete_consumer_offset(&offset.path) - .await?; - } - } - info!( "Deleted consumer group with ID: {} from topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id @@ -380,9 +367,7 @@ mod tests { let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); assert_eq!(topic.consumer_groups.borrow().len(), 1); - let result = topic - .delete_consumer_group(&Identifier::numeric(group_id).unwrap()) - .await; + let result = topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap()); assert!(result.is_ok()); assert!(topic.consumer_groups.borrow().is_empty()); } @@ -396,9 +381,7 @@ mod tests { assert!(result.is_ok()); assert_eq!(topic.consumer_groups.borrow().len(), 1); let group_id = group_id + 1; - let result = topic - .delete_consumer_group(&Identifier::numeric(group_id).unwrap()) - .await; + let result = topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap()); assert!(result.is_err()); assert_eq!(topic.consumer_groups.borrow().len(), 1); }