Skip to content

Commit

Permalink
Completed HTTP auth with JWT, added identity info response, tests (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Sep 19, 2023
1 parent 6829550 commit 23b3309
Show file tree
Hide file tree
Showing 40 changed files with 249 additions and 171 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The name is an abbreviation for the Italian Greyhound - small yet extremely fast
- **Highly performant**, persistent append-only log for the message streaming
- **Very high throughput** for both writes and reads
- **Low latency and predictable resource usage** thanks to the Rust compiled language (no GC)
- **Users authentication and authorization** with granular permissions
- Support for multiple streams, topics and partitions
- Support for **multiple transport protocols** (QUIC, TCP, HTTP)
- Fully operational RESTful API which can be optionally enabled
Expand Down Expand Up @@ -53,7 +54,6 @@ The name is an abbreviation for the Italian Greyhound - small yet extremely fast

- Streaming server caching and I/O improvements
- Low level optimizations (zero-copy etc.)
- Users & permissions management
- Clustering & data replication
- Rich console CLI
- Advanced Web UI
Expand All @@ -73,6 +73,12 @@ For the detailed information about current progress, please refer to the [projec
- [Python](https://github.com/iggy-rs/iggy-python-client)
---

### CLI

The brand new, rich, interactive CLI is being implemented under the `cmd` project, to provide the best developer experience. This will be a great addition to the Web UI, especially for all the developers who prefer using the console tools.

![CLI](assets/cli.png)

### Web UI

There's an ongoing effort to build the administrative web UI for the server, which will allow to manage the streams, topics, partitions, messages and so on. Check the [Web UI repository](https://github.com/iggy-rs/iggy-web-ui)
Expand Down
Binary file added assets/cli.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.81"
version = "0.0.82"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
9 changes: 9 additions & 0 deletions iggy/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::Error;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::{Message, MessageState, PolledMessages};
use crate::models::partition::Partition;
use crate::models::permissions::Permissions;
Expand Down Expand Up @@ -139,6 +140,14 @@ pub fn map_users(payload: &[u8]) -> Result<Vec<UserInfo>, Error> {
Ok(users)
}

pub fn map_identity_info(payload: &[u8]) -> Result<IdentityInfo, Error> {
let user_id = u32::from_le_bytes(payload[..4].try_into()?);
Ok(IdentityInfo {
user_id,
token: None,
})
}

pub fn map_client(payload: &[u8]) -> Result<ClientInfoDetails, Error> {
let (client, mut position) = map_to_client_info(payload, 0)?;
let mut consumer_groups = Vec::new();
Expand Down
10 changes: 7 additions & 3 deletions iggy/src/binary/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::command::{
LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE,
};
use crate::error::Error;
use crate::models::identity_info::IdentityInfo;
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::users::change_password::ChangePassword;
use crate::users::create_user::CreateUser;
Expand Down Expand Up @@ -78,11 +79,14 @@ pub async fn change_password(
Ok(())
}

pub async fn login_user(client: &dyn BinaryClient, command: &LoginUser) -> Result<(), Error> {
client
pub async fn login_user(
client: &dyn BinaryClient,
command: &LoginUser,
) -> Result<IdentityInfo, Error> {
let response = client
.send_with_response(LOGIN_USER_CODE, &command.as_bytes())
.await?;
Ok(())
mapper::map_identity_info(&response)
}

pub async fn logout_user(client: &dyn BinaryClient, command: &LogoutUser) -> Result<(), Error> {
Expand Down
3 changes: 2 additions & 1 deletion iggy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::messages::send_messages::SendMessages;
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::PolledMessages;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
Expand Down Expand Up @@ -82,7 +83,7 @@ pub trait UserClient {
async fn update_user(&self, command: &UpdateUser) -> Result<(), Error>;
async fn update_permissions(&self, command: &UpdatePermissions) -> Result<(), Error>;
async fn change_password(&self, command: &ChangePassword) -> Result<(), Error>;
async fn login_user(&self, command: &LoginUser) -> Result<(), Error>;
async fn login_user(&self, command: &LoginUser) -> Result<IdentityInfo, Error>;
async fn logout_user(&self, command: &LogoutUser) -> Result<(), Error>;
}

Expand Down
3 changes: 2 additions & 1 deletion iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::messages::send_messages::{Partitioning, PartitioningKind, SendMessage
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::{Message, PolledMessages};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
Expand Down Expand Up @@ -482,7 +483,7 @@ impl UserClient for IggyClient {
self.client.read().await.change_password(command).await
}

async fn login_user(&self, command: &LoginUser) -> Result<(), Error> {
async fn login_user(&self, command: &LoginUser) -> Result<IdentityInfo, Error> {
self.client.read().await.login_user(command).await
}

Expand Down
8 changes: 5 additions & 3 deletions iggy/src/http/users.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::client::UserClient;
use crate::error::Error;
use crate::http::client::HttpClient;
use crate::models::identity_info::IdentityInfo;
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::users::change_password::ChangePassword;
use crate::users::create_user::CreateUser;
Expand Down Expand Up @@ -57,9 +58,10 @@ impl UserClient for HttpClient {
Ok(())
}

async fn login_user(&self, command: &LoginUser) -> Result<(), Error> {
self.post(&format!("{PATH}/login"), &command).await?;
Ok(())
async fn login_user(&self, command: &LoginUser) -> Result<IdentityInfo, Error> {
let response = self.post(&format!("{PATH}/login"), &command).await?;
let identity_info = response.json().await?;
Ok(identity_info)
}

async fn logout_user(&self, command: &LogoutUser) -> Result<(), Error> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use serde::{Deserialize, Serialize};

// TODO: Refactor UserClient trait to use this struct for login
#[derive(Debug, Serialize, Deserialize)]
pub struct IdentityToken {
pub struct IdentityInfo {
pub user_id: u32,
pub token: Option<String>,
}
2 changes: 1 addition & 1 deletion iggy/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod client_info;
pub mod consumer_group;
pub mod consumer_offset_info;
pub mod header;
pub mod identity_token;
pub mod identity_info;
pub mod messages;
pub mod partition;
pub mod permissions;
Expand Down
3 changes: 2 additions & 1 deletion iggy/src/quic/users.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::binary;
use crate::client::UserClient;
use crate::error::Error;
use crate::models::identity_info::IdentityInfo;
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::quic::client::QuicClient;
use crate::users::change_password::ChangePassword;
Expand Down Expand Up @@ -44,7 +45,7 @@ impl UserClient for QuicClient {
binary::users::change_password(self, command).await
}

async fn login_user(&self, command: &LoginUser) -> Result<(), Error> {
async fn login_user(&self, command: &LoginUser) -> Result<IdentityInfo, Error> {
binary::users::login_user(self, command).await
}

Expand Down
3 changes: 2 additions & 1 deletion iggy/src/tcp/users.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::binary;
use crate::client::UserClient;
use crate::error::Error;
use crate::models::identity_info::IdentityInfo;
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::tcp::client::TcpClient;
use crate::users::change_password::ChangePassword;
Expand Down Expand Up @@ -44,7 +45,7 @@ impl UserClient for TcpClient {
binary::users::change_password(self, command).await
}

async fn login_user(&self, command: &LoginUser) -> Result<(), Error> {
async fn login_user(&self, command: &LoginUser) -> Result<IdentityInfo, Error> {
binary::users::login_user(self, command).await
}

Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.0.10"
version = "0.0.11"
edition = "2021"
build = "src/build.rs"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ pub async fn handle(
return Err(Error::Unauthenticated);
}

let system = system.read().await;
let stream = system.get_stream(&command.stream_id)?;
let topic = stream.get_topic(&command.topic_id)?;
system.permissioner.get_consumer_offset(
user_context.user_id,
stream.stream_id,
topic.topic_id,
)?;

let consumer = PollingConsumer::from_consumer(
&command.consumer,
user_context.client_id,
command.partition_id,
);
let system = system.read().await;
let offset = system
.get_stream(&command.stream_id)?
.get_topic(&command.topic_id)?
.get_consumer_offset(consumer)
.await?;
let offset = topic.get_consumer_offset(consumer).await?;
let offset = mapper::map_consumer_offset(&offset);
sender.send_ok_response(&offset).await?;
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ pub async fn handle(
return Err(Error::Unauthenticated);
}

let system = system.read().await;
let stream = system.get_stream(&command.stream_id)?;
let topic = stream.get_topic(&command.topic_id)?;
system.permissioner.get_consumer_offset(
user_context.user_id,
stream.stream_id,
topic.topic_id,
)?;

let consumer = PollingConsumer::from_consumer(
&command.consumer,
user_context.client_id,
command.partition_id,
);
let system = system.read().await;
system
.get_stream(&command.stream_id)?
.get_topic(&command.topic_id)?

topic
.store_consumer_offset(consumer, command.offset)
.await?;

sender.send_empty_ok_response().await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub async fn handle(
let mut system = system.write().await;
system.permissioner.create_stream(user_context.user_id)?;
system
.create_stream(user_context.user_id, command.stream_id, &command.name)
.create_stream(command.stream_id, &command.name)
.await?;
sender.send_empty_ok_response().await?;
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions server/src/binary/handlers/system/get_me_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub async fn handle(
system: Arc<RwLock<System>>,
) -> Result<(), Error> {
trace!("{command}");
if !user_context.is_authenticated() {
return Err(Error::Unauthenticated);
}

let bytes;
{
let system = system.read().await;
Expand Down
4 changes: 3 additions & 1 deletion server/src/binary/handlers/users/login_user_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::binary::mapper;
use crate::binary::sender::Sender;
use crate::streaming::systems::system::System;
use crate::streaming::users::user_context::UserContext;
Expand Down Expand Up @@ -25,6 +26,7 @@ pub async fn handle(
.await?;
user_context.set_user_id(user.id);
info!("Set user ID: {}", user.id);
sender.send_empty_ok_response().await?;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(identity_info.as_slice()).await?;
Ok(())
}
6 changes: 6 additions & 0 deletions server/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ pub fn map_users(users: &[User]) -> Vec<u8> {
bytes
}

pub fn map_identity_info(user_id: u32) -> Vec<u8> {
let mut bytes = Vec::with_capacity(4);
bytes.put_u32_le(user_id);
bytes
}

pub fn map_polled_messages(polled_messages: &PolledMessages) -> Vec<u8> {
let messages_count = polled_messages.messages.len() as u32;
let messages_size = polled_messages
Expand Down
4 changes: 0 additions & 4 deletions server/src/http/auth.rs

This file was deleted.

9 changes: 0 additions & 9 deletions server/src/http/claims.rs

This file was deleted.

0 comments on commit 23b3309

Please sign in to comment.