Skip to content

Commit

Permalink
Refactor command trait and server command handler
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Jun 29, 2024
1 parent c18c145 commit a685ca2
Show file tree
Hide file tree
Showing 64 changed files with 1,103 additions and 1,087 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 30 additions & 59 deletions sdk/src/binary/consumer_groups.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::bytes_serializable::BytesSerializable;
use crate::client::ConsumerGroupClient;
use crate::command::{
CREATE_CONSUMER_GROUP_CODE, DELETE_CONSUMER_GROUP_CODE, GET_CONSUMER_GROUPS_CODE,
GET_CONSUMER_GROUP_CODE, JOIN_CONSUMER_GROUP_CODE, LEAVE_CONSUMER_GROUP_CODE,
};
use crate::consumer_groups::create_consumer_group::CreateConsumerGroup;
use crate::consumer_groups::delete_consumer_group::DeleteConsumerGroup;
use crate::consumer_groups::get_consumer_group::GetConsumerGroup;
Expand All @@ -26,15 +21,11 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
) -> Result<ConsumerGroupDetails, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
GET_CONSUMER_GROUP_CODE,
GetConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
}
.to_bytes(),
)
.send_with_response(GetConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
})
.await?;
mapper::map_consumer_group(response)
}
Expand All @@ -46,14 +37,10 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
) -> Result<Vec<ConsumerGroup>, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
GET_CONSUMER_GROUPS_CODE,
GetConsumerGroups {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
}
.to_bytes(),
)
.send_with_response(GetConsumerGroups {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
})
.await?;
mapper::map_consumer_groups(response)
}
Expand All @@ -67,16 +54,12 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
) -> Result<ConsumerGroupDetails, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
CREATE_CONSUMER_GROUP_CODE,
CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
}
.to_bytes(),
)
.send_with_response(CreateConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
group_id,
})
.await?;
mapper::map_consumer_group(response)
}
Expand All @@ -88,15 +71,11 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
group_id: &Identifier,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
DELETE_CONSUMER_GROUP_CODE,
DeleteConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
}
.to_bytes(),
)
self.send_with_response(DeleteConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
})
.await?;
Ok(())
}
Expand All @@ -108,15 +87,11 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
group_id: &Identifier,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
JOIN_CONSUMER_GROUP_CODE,
JoinConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
}
.to_bytes(),
)
self.send_with_response(JoinConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
})
.await?;
Ok(())
}
Expand All @@ -128,15 +103,11 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
group_id: &Identifier,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
LEAVE_CONSUMER_GROUP_CODE,
LeaveConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
}
.to_bytes(),
)
self.send_with_response(LeaveConsumerGroup {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
group_id: group_id.clone(),
})
.await?;
Ok(())
}
Expand Down
36 changes: 13 additions & 23 deletions sdk/src/binary/consumer_offsets.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::bytes_serializable::BytesSerializable;
use crate::client::ConsumerOffsetClient;
use crate::command::{GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE};
use crate::consumer::Consumer;
use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
Expand All @@ -21,17 +19,13 @@ impl<B: BinaryClient> ConsumerOffsetClient for B {
offset: u64,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
STORE_CONSUMER_OFFSET_CODE,
StoreConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
offset,
}
.to_bytes(),
)
self.send_with_response(StoreConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
offset,
})
.await?;
Ok(())
}
Expand All @@ -45,16 +39,12 @@ impl<B: BinaryClient> ConsumerOffsetClient for B {
) -> Result<ConsumerOffsetInfo, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
GET_CONSUMER_OFFSET_CODE,
GetConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
}
.to_bytes(),
)
.send_with_response(GetConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
})
.await?;
mapper::map_consumer_offset(response)
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/binary/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl<B: BinaryClient> MessageClient for B {
) -> Result<PolledMessages, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
.send_raw_with_response(
POLL_MESSAGES_CODE,
poll_messages::as_bytes(
stream_id,
Expand All @@ -48,7 +48,7 @@ impl<B: BinaryClient> MessageClient for B {
messages: &mut [Message],
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
self.send_raw_with_response(
SEND_MESSAGES_CODE,
send_messages::as_bytes(stream_id, topic_id, partitioning, messages),
)
Expand Down
4 changes: 3 additions & 1 deletion sdk/src/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::command::Command;
use crate::error::IggyError;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -42,7 +43,8 @@ pub trait BinaryTransport {
/// Sets the state of the client.
async fn set_state(&self, state: ClientState);
/// Sends a command and returns the response.
async fn send_with_response(&self, command: u32, payload: Bytes) -> Result<Bytes, IggyError>;
async fn send_with_response<T: Command>(&self, command: T) -> Result<Bytes, IggyError>;
async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError>;
}

async fn fail_if_not_authenticated<T: BinaryTransport>(transport: &T) -> Result<(), IggyError> {
Expand Down
30 changes: 10 additions & 20 deletions sdk/src/binary/partitions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#[allow(deprecated)]
use crate::binary::binary_client::BinaryClient;
use crate::binary::fail_if_not_authenticated;
use crate::bytes_serializable::BytesSerializable;
use crate::client::PartitionClient;
use crate::command::{CREATE_PARTITIONS_CODE, DELETE_PARTITIONS_CODE};
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::partitions::create_partitions::CreatePartitions;
Expand All @@ -18,15 +16,11 @@ impl<B: BinaryClient> PartitionClient for B {
partitions_count: u32,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
CREATE_PARTITIONS_CODE,
CreatePartitions {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partitions_count,
}
.to_bytes(),
)
self.send_with_response(CreatePartitions {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partitions_count,
})
.await?;
Ok(())
}
Expand All @@ -38,15 +32,11 @@ impl<B: BinaryClient> PartitionClient for B {
partitions_count: u32,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
DELETE_PARTITIONS_CODE,
DeletePartitions {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partitions_count,
}
.to_bytes(),
)
self.send_with_response(DeletePartitions {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partitions_count,
})
.await?;
Ok(())
}
Expand Down
41 changes: 11 additions & 30 deletions sdk/src/binary/personal_access_tokens.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper, ClientState};
use crate::bytes_serializable::BytesSerializable;
use crate::client::PersonalAccessTokenClient;
use crate::command::*;
use crate::error::IggyError;
use crate::models::identity_info::IdentityInfo;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
Expand All @@ -16,12 +14,7 @@ use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
impl<B: BinaryClient> PersonalAccessTokenClient for B {
async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
GET_PERSONAL_ACCESS_TOKENS_CODE,
GetPersonalAccessTokens {}.to_bytes(),
)
.await?;
let response = self.send_with_response(GetPersonalAccessTokens {}).await?;
mapper::map_personal_access_tokens(response)
}

Expand All @@ -32,27 +25,19 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
) -> Result<RawPersonalAccessToken, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_with_response(
CREATE_PERSONAL_ACCESS_TOKEN_CODE,
CreatePersonalAccessToken {
name: name.to_string(),
expiry,
}
.to_bytes(),
)
.send_with_response(CreatePersonalAccessToken {
name: name.to_string(),
expiry,
})
.await?;
mapper::map_raw_pat(response)
}

async fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(
DELETE_PERSONAL_ACCESS_TOKEN_CODE,
DeletePersonalAccessToken {
name: name.to_string(),
}
.to_bytes(),
)
self.send_with_response(DeletePersonalAccessToken {
name: name.to_string(),
})
.await?;
Ok(())
}
Expand All @@ -62,13 +47,9 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
token: &str,
) -> Result<IdentityInfo, IggyError> {
let response = self
.send_with_response(
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
LoginWithPersonalAccessToken {
token: token.to_string(),
}
.to_bytes(),
)
.send_with_response(LoginWithPersonalAccessToken {
token: token.to_string(),
})
.await?;
self.set_state(ClientState::Authenticated).await;
mapper::map_identity_info(response)
Expand Down
Loading

0 comments on commit a685ca2

Please sign in to comment.