Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): allow kafka portals to anchor trust on identities #8074

Merged
merged 3 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,20 @@ async fn main(ctx: Context) -> Result<()> {
DefaultAddress::ECHO_SERVICE,
&sc_listener_options.spawner_flow_control_id(),
);
let allow_production_incoming =
IncomingAbac::create_name_value(node.identities_attributes(), issuer.clone(), "cluster", "production");
let allow_production_outgoing =
OutgoingAbac::create_name_value(&ctx, node.identities_attributes(), issuer, "cluster", "production").await?;
let allow_production_incoming = IncomingAbac::create_name_value(
node.identities_attributes(),
Some(issuer.clone()),
"cluster",
"production",
);
let allow_production_outgoing = OutgoingAbac::create_name_value(
&ctx,
node.identities_attributes(),
Some(issuer),
"cluster",
"production",
)
.await?;
node.start_worker_with_access_control(
DefaultAddress::ECHO_SERVICE,
Echoer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// 3. create an access control policy checking the value of the "component" attribute of the caller
let incoming_access_control = IncomingAbac::create_name_value(
node.identities_attributes(),
project.authority_identifier(),
Some(project.authority_identifier()),
"component",
"edge",
);
let outgoing_access_control = OutgoingAbac::create_name_value(
node.context(),
node.identities_attributes(),
project.authority_identifier(),
Some(project.authority_identifier()),
"component",
"edge",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// 3. create an access control policy checking the value of the "component" attribute of the caller
let incoming_access_control = IncomingAbac::create_name_value(
identities().await?.identities_attributes(),
project.authority_identifier(),
Some(project.authority_identifier()),
"component",
"control",
);
let outgoing_access_control = OutgoingAbac::create_name_value(
node.context(),
identities().await?.identities_attributes(),
project.authority_identifier(),
Some(project.authority_identifier()),
"component",
"control",
)
Expand Down
105 changes: 54 additions & 51 deletions implementations/rust/ockam/ockam_abac/src/abac/abac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ pub const ABAC_IDENTIFIER_KEY: &str = "identifier";
#[derive(Clone)]
pub struct Abac {
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
environment: Env,
}

/// Debug implementation printing out the policy expression only
impl Debug for Abac {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Authority: {}", self.authority)
write!(f, "Authority: {:?}", self.authority)
}
}

impl Abac {
/// Create a new AccessControl using a specific policy for checking attributes
pub fn new(
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
environment: Env,
) -> Self {
Self {
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Abac {
Self::is_identity_authorized_static(
self.identities_attributes.clone(),
&self.environment,
&self.authority,
self.authority.as_ref(),
identifier,
expression,
)
Expand All @@ -139,7 +139,7 @@ impl Abac {
pub async fn is_identity_authorized_static(
identities_attributes: Arc<IdentitiesAttributes>,
environment: &Env,
authority: &Identifier,
authority: Option<&Identifier>,
identifier: &Identifier,
expression: &Expr,
) -> Result<bool> {
Expand All @@ -153,67 +153,69 @@ impl Abac {
);

// Get identity attributes and populate the environment:
match identities_attributes
.get_attributes(identifier, authority)
.await?
{
Some(attrs) => {
environment.put(
subject_has_credential_attribute().to_string(),
Expr::CONST_TRUE,
);
if let Some(authority) = authority {
match identities_attributes
.get_attributes(identifier, authority)
.await?
{
Some(attrs) => {
environment.put(
subject_has_credential_attribute().to_string(),
Expr::CONST_TRUE,
);

for (key, value) in attrs.attrs() {
let key = match from_utf8(key) {
Ok(key) => key,
Err(_) => {
for (key, value) in attrs.attrs() {
let key = match from_utf8(key) {
Ok(key) => key,
Err(_) => {
warn! {
policy = %expression,
id = %identifier,
"attribute key is not utf-8"
}
continue;
}
};
if key.find(|c: char| c.is_whitespace()).is_some() {
warn! {
policy = %expression,
id = %identifier,
"attribute key is not utf-8"
key = %key,
"attribute key with whitespace ignored"
}
continue;
}
};
if key.find(|c: char| c.is_whitespace()).is_some() {
warn! {
policy = %expression,
id = %identifier,
key = %key,
"attribute key with whitespace ignored"
}
}
match str::from_utf8(value) {
Ok(s) => {
if environment.contains(key) {
match str::from_utf8(value) {
Ok(s) => {
if environment.contains(key) {
warn! {
policy = %expression,
id = %identifier,
key = %key,
"attribute already present"
}
} else {
environment
.put(format!("{}.{key}", SUBJECT_KEY), str(s.to_string()));
}
}
Err(e) => {
warn! {
policy = %expression,
id = %identifier,
key = %key,
"attribute already present"
err = %e,
"failed to interpret attribute as string"
}
} else {
environment
.put(format!("{}.{key}", SUBJECT_KEY), str(s.to_string()));
}
}
Err(e) => {
warn! {
policy = %expression,
id = %identifier,
key = %key,
err = %e,
"failed to interpret attribute as string"
}
}
}
}
}
None => {
environment.put(
subject_has_credential_attribute().to_string(),
Expr::CONST_FALSE,
);
None => {
environment.put(
subject_has_credential_attribute().to_string(),
Expr::CONST_FALSE,
);
}
}
}

Expand Down Expand Up @@ -242,6 +244,7 @@ impl Abac {
policy = %expression,
id = %identifier,
err = %e,
env = %environment,
"policy evaluation failed"
}
Ok(false)
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_abac/src/abac/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl IncomingAbac {
/// a message has an authenticated attribute that resolves the expression to `true`
pub fn create(
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
expression: Expr,
) -> Self {
let abac = Abac::new(identities_attributes, authority, Env::new());
Expand All @@ -38,7 +38,7 @@ impl IncomingAbac {
/// a message has an authenticated attribute with the correct name and value
pub fn create_name_value(
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
attribute_name: &str,
attribute_value: &str,
) -> Self {
Expand All @@ -56,7 +56,7 @@ impl IncomingAbac {
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
) -> Self {
Self::create(identities_attributes, authority, true.into())
Self::create(identities_attributes, Some(authority), true.into())
}

/// Returns true if the sender of the message is validated by the expression stored in AbacAccessControl
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_abac/src/abac/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OutgoingAbac {
pub async fn create(
ctx: &Context,
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
expression: Expr,
) -> Result<Self> {
let ctx = ctx
Expand All @@ -58,7 +58,7 @@ impl OutgoingAbac {
pub async fn create_name_value(
ctx: &Context,
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
attribute_name: &str,
attribute_value: &str,
) -> Result<Self> {
Expand All @@ -77,7 +77,7 @@ impl OutgoingAbac {
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
) -> Result<Self> {
Self::create(ctx, identities_attributes, authority, true.into()).await
Self::create(ctx, identities_attributes, Some(authority), true.into()).await
}

/// Returns true if the sender of the message is validated by the expression stored in AbacAccessControl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl PolicyAccessControl {
pub fn new(
policies: Policies,
identities_attributes: Arc<IdentitiesAttributes>,
authority: Identifier,
authority: Option<Identifier>,
env: Env,
resource: Resource,
action: Action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ impl Policies {
}
}

#[instrument(skip_all, fields(resource = %resource, action = %action, env = %env, authority = %authority))]
#[instrument(skip_all, fields(resource = %resource, action = %action, env = %env, authority = ?authority))]
pub fn make_policy_access_control(
&self,
identities_attributes: Arc<IdentitiesAttributes>,
resource: Resource,
action: Action,
env: Env,
authority: Identifier,
authority: Option<Identifier>,
) -> PolicyAccessControl {
debug!(
"set a policy access control for resource '{}' of type '{}' and action '{}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod test {
let consumer_policy_access_control = handler
.node_manager
.policy_access_control(
project_authority.clone(),
Some(project_authority.clone()),
Resource::new(listener_address.address(), ResourceType::KafkaConsumer),
Action::HandleMessage,
None,
Expand All @@ -75,7 +75,7 @@ mod test {
let producer_policy_access_control = handler
.node_manager
.policy_access_control(
project_authority.clone(),
Some(project_authority.clone()),
Resource::new(listener_address.address(), ResourceType::KafkaProducer),
Action::HandleMessage,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::kafka::KAFKA_OUTLET_INTERCEPTOR_ADDRESS;
use ockam::{Any, Context, Result, Routed, Worker};
use ockam_abac::PolicyExpression;
use ockam_core::flow_control::{FlowControlId, FlowControls};
use ockam_core::{Address, IncomingAccessControl, OutgoingAccessControl};
use ockam_core::{Address, AllowAll, IncomingAccessControl, OutgoingAccessControl};
use ockam_node::WorkerBuilder;
use std::sync::Arc;

Expand All @@ -26,7 +26,7 @@ impl OutletManagerService {
default_secure_channel_listener_flow_control_id: FlowControlId,
policy_expression: Option<PolicyExpression>,
request_incoming_access_control: Arc<dyn IncomingAccessControl>,
response_outgoing_access_control: Arc<dyn OutgoingAccessControl>,
_response_outgoing_access_control: Arc<dyn OutgoingAccessControl>,
polvorin marked this conversation as resolved.
Show resolved Hide resolved
tls: bool,
) -> Result<()> {
let flow_controls = context.flow_controls();
Expand All @@ -44,7 +44,9 @@ impl OutletManagerService {
let worker = OutletManagerService {
outlet_controller: KafkaOutletController::new(policy_expression, tls),
request_incoming_access_control,
response_outgoing_access_control,
//TODO FIXME: make this work, if needed.
//response_outgoing_access_control,
response_outgoing_access_control: Arc::new(AllowAll),
spawner_flow_control_id: spawner_flow_control_id.clone(),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,15 +780,15 @@ mod test {
Resource::new("arbitrary-resource-name", ResourceType::KafkaConsumer),
Action::HandleMessage,
Env::new(),
authority_identifier.clone(),
Some(authority_identifier.clone()),
);

let producer_policy_access_control = policies.make_policy_access_control(
secure_channels.identities().identities_attributes(),
Resource::new("arbitrary-resource-name", ResourceType::KafkaProducer),
Action::HandleMessage,
Env::new(),
authority_identifier.clone(),
Some(authority_identifier.clone()),
);

let secure_channel_controller = KafkaSecureChannelControllerImpl::new(
Expand Down Expand Up @@ -858,7 +858,7 @@ mod test {
let consumer_policy_access_control = handle
.node_manager
.policy_access_control(
project_authority.clone(),
Some(project_authority.clone()),
Resource::new("arbitrary-resource-name", ResourceType::KafkaConsumer),
Action::HandleMessage,
None,
Expand All @@ -868,7 +868,7 @@ mod test {
let producer_policy_access_control = handle
.node_manager
.policy_access_control(
project_authority.clone(),
Some(project_authority.clone()),
Resource::new("arbitrary-resource-name", ResourceType::KafkaProducer),
Action::HandleMessage,
None,
Expand Down
Loading
Loading