Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
750 changes: 382 additions & 368 deletions Cargo.lock

Large diffs are not rendered by default.

183 changes: 127 additions & 56 deletions apps/freenet-ping/Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions crates/core/src/config/secret.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::Path;

use aes_gcm::KeyInit;
use blake3::traits::digest::generic_array::GenericArray;
#[allow(deprecated)]
use aes_gcm::{aead::generic_array::GenericArray, KeyInit};
use chacha20poly1305::{XChaCha20Poly1305, XNonce};
use freenet_stdlib::client_api::DelegateRequest;
use rsa::pkcs8::DecodePrivateKey;
Expand Down Expand Up @@ -148,6 +148,7 @@ impl Default for Secrets {
}
}

#[allow(deprecated)]
impl Secrets {
#[inline]
pub fn nonce(&self) -> XNonce {
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use blake3::traits::digest::generic_array::GenericArray;
use either::Either;
use freenet_stdlib::client_api::{
ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
Expand Down
84 changes: 46 additions & 38 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,25 +371,29 @@ impl ContractExecutor for Executor<Runtime> {
cipher,
nonce,
} => {
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
let key = delegate.key().clone();
let arr = GenericArray::from_slice(&cipher);
let cipher = XChaCha20Poly1305::new(arr);
let nonce = GenericArray::from_slice(&nonce).to_owned();
if let Some(contract) = attested_contract {
self.delegate_attested_ids
.entry(key.clone())
.or_default()
.push(*contract);
}
match self.runtime.register_delegate(delegate, cipher, nonce) {
Ok(_) => Ok(DelegateResponse {
key,
values: Vec::new(),
}),
Err(err) => {
tracing::warn!("failed registering delegate `{key}`: {err}");
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
#[allow(deprecated)]
{
use aes_gcm::aead::generic_array::GenericArray;
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
let key = delegate.key().clone();
let arr = GenericArray::from_slice(&cipher);
let cipher = XChaCha20Poly1305::new(arr);
let nonce = GenericArray::from_slice(&nonce).to_owned();
if let Some(contract) = attested_contract {
self.delegate_attested_ids
.entry(key.clone())
.or_default()
.push(*contract);
}
match self.runtime.register_delegate(delegate, cipher, nonce) {
Ok(_) => Ok(DelegateResponse {
key,
values: Vec::new(),
}),
Err(err) => {
tracing::warn!("failed registering delegate `{key}`: {err}");
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
}
}
}
}
Expand Down Expand Up @@ -657,25 +661,29 @@ impl Executor<Runtime> {
cipher,
nonce,
} => {
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
let key = delegate.key().clone();
let arr = GenericArray::from_slice(&cipher);
let cipher = XChaCha20Poly1305::new(arr);
let nonce = GenericArray::from_slice(&nonce).to_owned();
if let Some(contract) = attested_contract {
self.delegate_attested_ids
.entry(key.clone())
.or_default()
.push(*contract);
}
match self.runtime.register_delegate(delegate, cipher, nonce) {
Ok(_) => Ok(DelegateResponse {
key,
values: Vec::new(),
}),
Err(err) => {
tracing::warn!("failed registering delegate `{key}`: {err}");
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
#[allow(deprecated)]
{
use aes_gcm::aead::generic_array::GenericArray;
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
let key = delegate.key().clone();
let arr = GenericArray::from_slice(&cipher);
let cipher = XChaCha20Poly1305::new(arr);
let nonce = GenericArray::from_slice(&nonce).to_owned();
if let Some(contract) = attested_contract {
self.delegate_attested_ids
.entry(key.clone())
.or_default()
.push(*contract);
}
match self.runtime.register_delegate(delegate, cipher, nonce) {
Ok(_) => Ok(DelegateResponse {
key,
values: Vec::new(),
}),
Err(err) => {
tracing::warn!("failed registering delegate `{key}`: {err}");
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
}
}
}
}
Expand Down
47 changes: 38 additions & 9 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,39 @@ use ulid::Ulid;
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy)]
pub struct Transaction {
id: Ulid,
/// Parent transaction ID for child operations spawned by this transaction.
/// Enables atomicity tracking for composite operations (e.g., PUT with SUBSCRIBE).
parent: Option<Ulid>,
}

impl Transaction {
pub const NULL: &'static Transaction = &Transaction { id: Ulid(0) };
pub const NULL: &'static Transaction = &Transaction {
id: Ulid(0),
parent: None,
};

pub(crate) fn new<T: TxType>() -> Self {
let ty = <T as TxType>::tx_type_id();
let id = Ulid::new();
Self::update(ty.0, id)
// Self { id }
Self::update(ty.0, id, None)
}

/// Creates a child transaction with the specified type, linked to the parent
/// for atomicity tracking in composite operations.
pub(crate) fn new_child_of<T: TxType>(parent: &Transaction) -> Self {
let ty = <T as TxType>::tx_type_id();
let id = Ulid::new();
Self::update(ty.0, id, Some(parent.id))
}

/// Returns the parent transaction ID for child operations.
pub fn parent_id(&self) -> Option<&Ulid> {
self.parent.as_ref()
}

/// Returns true if this transaction is a child operation.
pub fn is_sub_operation(&self) -> bool {
self.parent.is_some()
}

pub(crate) fn transaction_type(&self) -> TransactionType {
Expand Down Expand Up @@ -100,18 +123,24 @@ impl Transaction {
// Clear the ts significant bits of the ULID and replace them with the new cutoff ts.
const TIMESTAMP_MASK: u128 = 0x00000000000000000000FFFFFFFFFFFFFFFF;
let new_ulid = (id.0 & TIMESTAMP_MASK) | ((ttl_epoch as u128) << 80);
Self { id: Ulid(new_ulid) }
Self {
id: Ulid(new_ulid),
parent: None,
}
}

fn update(ty: TransactionType, id: Ulid) -> Self {
fn update(ty: TransactionType, id: Ulid, parent: Option<Ulid>) -> Self {
const TYPE_MASK: u128 = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00u128;
// Clear the last byte
let cleared = id.0 & TYPE_MASK;
// Set the last byte with the transaction type
let updated = cleared | (ty as u8) as u128;

// 2 words size for 64-bits platforms
Self { id: Ulid(updated) }
Self {
id: Ulid(updated),
parent,
}
}
}

Expand All @@ -120,7 +149,7 @@ impl<'a> arbitrary::Arbitrary<'a> for Transaction {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let ty: TransactionTypeId = u.arbitrary()?;
let bytes: u128 = Ulid::new().0;
Ok(Self::update(ty.0, Ulid(bytes)))
Ok(Self::update(ty.0, Ulid(bytes), None))
}
}

Expand Down Expand Up @@ -509,9 +538,9 @@ mod tests {
fn pack_transaction_type() {
let ts_0 = Ulid::new();
std::thread::sleep(Duration::from_millis(1));
let tx = Transaction::update(TransactionType::Connect, Ulid::new());
let tx = Transaction::update(TransactionType::Connect, Ulid::new(), None);
assert_eq!(tx.transaction_type(), TransactionType::Connect);
let tx = Transaction::update(TransactionType::Subscribe, Ulid::new());
let tx = Transaction::update(TransactionType::Subscribe, Ulid::new(), None);
assert_eq!(tx.transaction_type(), TransactionType::Subscribe);
std::thread::sleep(Duration::from_millis(1));
let ts_1 = Ulid::new();
Expand Down
31 changes: 16 additions & 15 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
ring::PeerKeyLocation,
tracing::NetEventLog,
};
use freenet_stdlib::client_api::{ContractResponse, HostResponse};

type P2pBridgeEvent = Either<(PeerId, Box<NetMessage>), NodeEvent>;

Expand Down Expand Up @@ -797,22 +798,22 @@ impl P2pConnManager {
key,
subscribed,
} => {
tracing::info!("Received LocalSubscribeComplete event for transaction: {tx}, contract: {key}");

// Deliver SubscribeResponse directly to result router
tracing::info!("Sending SubscribeResponse to result router for transaction: {tx}");
use freenet_stdlib::client_api::{ContractResponse, HostResponse};
let response = Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse { key, subscribed },
));

match op_manager.result_router_tx.send((tx, response)).await {
Ok(()) => {
tracing::info!("Successfully sent SubscribeResponse to result router for transaction: {tx}");
// Clean up client subscription after successful delivery
state.tx_to_client.remove(&tx);
tracing::debug!(%tx, %key, "local subscribe complete");

if !op_manager.is_sub_operation(tx) {
let response = Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse { key, subscribed },
));

match op_manager.result_router_tx.send((tx, response)).await {
Ok(()) => {
tracing::debug!(%tx, "sent subscribe response to client");
state.tx_to_client.remove(&tx);
}
Err(e) => {
tracing::error!(%tx, error = %e, "failed to send subscribe response")
}
}
Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e),
}
}
NodeEvent::Disconnect { cause } => {
Expand Down
Loading
Loading