Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/sprout-auth/src/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ impl FromStr for Scope {
/// `SubscriptionsRead`, `SubscriptionsWrite`) are intentionally excluded — they require
/// `sprout-admin mint-token`.
///
/// `UsersWrite` is included because it only guards self-profile endpoints
/// (`PUT /api/users/me/profile`, `PUT /api/users/me/channel-add-policy`).
/// `UsersWrite` is included because it guards self-profile endpoints
/// (`PUT /api/users/me/profile`, `PUT /api/users/me/channel-add-policy`)
/// and contact list (kind:3) publishing.
pub const SELF_MINTABLE_SCOPES: &[Scope] = &[
Scope::MessagesRead,
Scope::MessagesWrite,
Expand Down
13 changes: 13 additions & 0 deletions crates/sprout-core/src/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
// Standard NIP kinds
/// NIP-01: User profile metadata.
pub const KIND_PROFILE: u32 = 0;
/// NIP-01: Short text note.
pub const KIND_TEXT_NOTE: u32 = 1;
/// NIP-02: Contact list / follow list.
pub const KIND_CONTACT_LIST: u32 = 3;
/// NIP-01: Channel metadata (replaceable). Not used by Sprout today.
pub const KIND_CHANNEL_METADATA: u32 = 41;
/// NIP-09: Event deletion request.
pub const KIND_DELETION: u32 = 5;
/// NIP-25: Content is emoji char or `+`/`-`.
Expand Down Expand Up @@ -214,7 +218,9 @@ pub const KIND_MEDIA_UPLOAD: u32 = 49001;
/// All registered kind constants — used for duplicate detection and iteration.
pub const ALL_KINDS: &[u32] = &[
KIND_PROFILE,
KIND_TEXT_NOTE,
KIND_CONTACT_LIST,
KIND_CHANNEL_METADATA,
KIND_DELETION,
KIND_REACTION,
KIND_GIFT_WRAP,
Expand Down Expand Up @@ -300,6 +306,13 @@ pub const fn is_ephemeral(kind: u32) -> bool {
kind >= EPHEMERAL_KIND_MIN && kind <= EPHEMERAL_KIND_MAX
}

/// Returns `true` if `kind` is replaceable (NIP-01: kinds 0, 3, 41, 10000–19999).
/// NIP-33 parameterized-replaceable kinds (30000–39999) use a different replacement
/// key (includes `d`-tag) and are handled separately via `replace_addressable_event`.
pub const fn is_replaceable(kind: u32) -> bool {
matches!(kind, 0 | 3 | KIND_CHANNEL_METADATA | 10000..=19999)
}

/// Returns `true` if `kind` is a workflow execution event (46001–46012).
/// These must not trigger workflows (prevents infinite loops).
pub const fn is_workflow_execution_kind(kind: u32) -> bool {
Expand Down
129 changes: 120 additions & 9 deletions crates/sprout-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,36 +1267,147 @@ impl Db {
Ok(result.rows_affected())
}

// ── Addressable events ─────────────────────────────────────────────────
// ── Replaceable events ─────────────────────────────────────────────────

/// Replace an addressable event (NIP-33-like): soft-delete any existing
/// event with the same (kind, pubkey, channel_id) and insert the new one.
/// Atomically replace a replaceable event: NIP-16 kinds (0, 3, 41, 10000–19999)
/// and NIP-29 discovery state (39000–39002, called from side_effects.rs).
///
/// Keeps only the event with the highest `created_at` per (kind, pubkey, channel_id).
/// Same-second ties are broken by lowest event `id` (NIP-16 deterministic ordering).
/// Returns `(event, false)` for stale writes and duplicate IDs — callers should
/// skip fan-out/dispatch when `was_inserted` is false.
pub async fn replace_addressable_event(
&self,
event: &nostr::Event,
channel_id: Option<Uuid>,
) -> Result<(StoredEvent, bool)> {
let kind_i32 = sprout_core::kind::event_kind_i32(event);
let pubkey_bytes = event.pubkey.to_bytes();
let created_at_secs = event.created_at.as_u64() as i64;
let created_at = chrono::DateTime::from_timestamp(created_at_secs, 0)
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;

// Stable advisory-lock key: hash (kind, pubkey, channel_id) to i64.
// Uses FNV-1a for determinism — Rust's DefaultHasher is NOT stable across processes.
// Collisions cause extra serialization, not incorrect behavior.
let lock_key = {
let mut h: u64 = 0xcbf29ce484222325; // FNV offset basis
for b in kind_i32.to_le_bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3); // FNV prime
}
for b in pubkey_bytes.as_slice() {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
if let Some(ch) = channel_id {
for b in ch.as_bytes() {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
}
h as i64
};

let mut tx = self.pool.begin().await?;

// Soft-delete existing events with the same (kind, pubkey, channel_id).
// The idx_events_addressable index supports this lookup efficiently.
// Serialize all writers for the same (kind, pubkey, channel_id) tuple.
// Advisory lock is transaction-scoped — released on commit/rollback.
sqlx::query("SELECT pg_advisory_xact_lock($1)")
.bind(lock_key)
.execute(&mut *tx)
.await?;

// Check for the newest existing event. ORDER BY + LIMIT 1 is defensive against
// historical data where prior bugs may have left multiple live rows.
let existing: Option<(chrono::DateTime<chrono::Utc>, Vec<u8>)> = sqlx::query_as(
"SELECT created_at, id FROM events \
WHERE kind = $1 AND pubkey = $2 \
AND channel_id IS NOT DISTINCT FROM $3 \
AND deleted_at IS NULL \
ORDER BY created_at DESC, id ASC LIMIT 1",
)
.bind(kind_i32)
.bind(pubkey_bytes.as_slice())
.bind(channel_id)
.fetch_optional(&mut *tx)
.await?;

// Stale-write protection: reject if incoming is not newer.
// NIP-16: created_at is second-resolution. On same-second tie, lowest
// event id (lexicographic) wins — deterministic across relays.
let incoming_id = event.id.as_bytes().as_slice();
if let Some((existing_ts, existing_id)) = existing {
let dominated = created_at < existing_ts
|| (created_at == existing_ts && incoming_id >= existing_id.as_slice());
if dominated {
tx.rollback().await?;
let received_at = chrono::Utc::now();
return Ok((
StoredEvent::with_received_at(event.clone(), received_at, channel_id, false),
false,
));
}
}

// Soft-delete the old event (if any). IS NOT DISTINCT FROM for NULL safety.
sqlx::query(
"UPDATE events SET deleted_at = NOW() \
WHERE kind = $1 AND pubkey = $2 AND channel_id = $3 AND deleted_at IS NULL",
WHERE kind = $1 AND pubkey = $2 \
AND channel_id IS NOT DISTINCT FROM $3 \
AND deleted_at IS NULL",
)
.bind(kind_i32)
.bind(pubkey_bytes.as_slice())
.bind(channel_id)
.execute(&mut *tx)
.await?;

// Insert the new event inside the same transaction.
let sig_bytes = event.sig.serialize();
let tags_json = serde_json::to_value(&event.tags)?;
let received_at = chrono::Utc::now();

let insert_result = sqlx::query(
"INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
ON CONFLICT DO NOTHING",
)
.bind(event.id.as_bytes().as_slice())
.bind(pubkey_bytes.as_slice())
.bind(created_at)
.bind(kind_i32)
.bind(&tags_json)
.bind(&event.content)
.bind(sig_bytes.as_slice())
.bind(received_at)
.bind(channel_id)
.execute(&mut *tx)
.await?;

let was_inserted = insert_result.rows_affected() > 0;
if !was_inserted {
// ON CONFLICT fired — the event ID already exists. Rollback the
// soft-delete so we don't lose the previous replaceable event.
tx.rollback().await?;
return Ok((
StoredEvent::with_received_at(event.clone(), received_at, channel_id, false),
false,
));
}

tx.commit().await?;

// Insert the new event (outside the tx — uses the standard path with
// dedup via ON CONFLICT DO NOTHING).
self.insert_event(event, channel_id).await
// Mentions are a denormalized index — safe outside the transaction.
// insert_event() normally handles this, but we inlined the INSERT above.
if let Err(e) = crate::insert_mentions(&self.pool, event, channel_id).await {
tracing::warn!(event_id = %event.id, "Failed to insert mentions: {e}");
}

Ok((
StoredEvent::with_received_at(event.clone(), received_at, channel_id, true),
true,
))
}
}

Expand Down
20 changes: 9 additions & 11 deletions crates/sprout-relay/src/api/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ pub async fn search_handler(
.await
.unwrap_or_default();

// Build Typesense filter_by: channel_id:=[id1,id2,...]
// Build Typesense filter_by: channel_id:=[id1,id2,...] || global events
let filter_by = if channel_ids.is_empty() {
// No accessible channels — return empty results immediately.
return Ok(Json(serde_json::json!({ "hits": [], "found": 0 })));
Some("channel_id:=__global__".to_string())
} else {
let ids: Vec<String> = channel_ids.iter().map(|id| id.to_string()).collect();
Some(format!("channel_id:=[{}]", ids.join(",")))
Some(format!(
"(channel_id:=[{}] || channel_id:=__global__)",
ids.join(",")
))
};

let search_query = SearchQuery {
Expand Down Expand Up @@ -82,19 +84,15 @@ pub async fn search_handler(
.map(|c| (c.id.to_string(), c.name))
.collect();

// Filter out hits with no channel_id (spec requirement: "Exclude hits with channel_id: None").
// This also prevents a deserialization mismatch — the desktop expects channel_id: String.
// Global events have channel_id: null — include them in results.
let hits: Vec<serde_json::Value> = search_result
.hits
.into_iter()
.filter(|hit| hit.channel_id.is_some())
.map(|hit| {
let channel_name = hit
let channel_name: Option<&String> = hit
.channel_id
.as_deref()
.and_then(|id| channel_name_map.get(id))
.cloned()
.unwrap_or_default();
.and_then(|id| channel_name_map.get(id));
serde_json::json!({
"event_id": hit.event_id,
"content": hit.content,
Expand Down
35 changes: 23 additions & 12 deletions crates/sprout-relay/src/handlers/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use uuid::Uuid;
use nostr::Event;
use sprout_auth::Scope;
use sprout_core::kind::{
event_kind_u32, KIND_AUTH, KIND_CANVAS, KIND_DELETION, KIND_FORUM_COMMENT, KIND_FORUM_POST,
KIND_FORUM_VOTE, KIND_GIFT_WRAP, KIND_MEMBER_ADDED_NOTIFICATION,
event_kind_u32, KIND_AUTH, KIND_CANVAS, KIND_CONTACT_LIST, KIND_DELETION, KIND_FORUM_COMMENT,
KIND_FORUM_POST, KIND_FORUM_VOTE, KIND_GIFT_WRAP, KIND_MEMBER_ADDED_NOTIFICATION,
KIND_MEMBER_REMOVED_NOTIFICATION, KIND_NIP29_CREATE_GROUP, KIND_NIP29_DELETE_EVENT,
KIND_NIP29_DELETE_GROUP, KIND_NIP29_EDIT_METADATA, KIND_NIP29_JOIN_REQUEST,
KIND_NIP29_LEAVE_REQUEST, KIND_NIP29_PUT_USER, KIND_NIP29_REMOVE_USER, KIND_PRESENCE_UPDATE,
KIND_PROFILE, KIND_REACTION, KIND_STREAM_MESSAGE, KIND_STREAM_MESSAGE_BOOKMARKED,
KIND_STREAM_MESSAGE_DIFF, KIND_STREAM_MESSAGE_EDIT, KIND_STREAM_MESSAGE_PINNED,
KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_MESSAGE_V2, KIND_STREAM_REMINDER,
KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_MESSAGE_V2, KIND_STREAM_REMINDER, KIND_TEXT_NOTE,
};
use sprout_core::verification::verify_event;

Expand Down Expand Up @@ -141,6 +141,8 @@ pub enum IngestError {
fn required_scope_for_kind(kind: u32, event: &Event) -> Result<Scope, &'static str> {
match kind {
KIND_PROFILE => Ok(Scope::UsersWrite),
KIND_TEXT_NOTE => Ok(Scope::MessagesWrite),
KIND_CONTACT_LIST => Ok(Scope::UsersWrite),
KIND_DELETION
| KIND_REACTION
| KIND_GIFT_WRAP
Expand Down Expand Up @@ -779,7 +781,7 @@ pub async fn ingest_event(
}

// ── 5. Channel resolution ────────────────────────────────────────────
let channel_id = if kind_u32 == KIND_REACTION {
let mut channel_id = if kind_u32 == KIND_REACTION {
match derive_reaction_channel(&state.db, &event).await {
ReactionChannelResult::Channel(ch_id) => Some(ch_id),
ReactionChannelResult::NoChannel => None,
Expand Down Expand Up @@ -840,6 +842,13 @@ pub async fn ingest_event(
extract_channel_id(&event)
};

// ── 5b. Global-only kinds ignore h-tags ─────────────────────────────
// kind:0 (profile), kind:1 (text note), kind:3 (contact list) are always global.
// If a client includes a stray h-tag, ignore it — these kinds are never channel-scoped.
if matches!(kind_u32, KIND_PROFILE | KIND_TEXT_NOTE | KIND_CONTACT_LIST) {
channel_id = None;
}

// ── 6. h-tag requirement ─────────────────────────────────────────────
if requires_h_channel_scope(kind_u32) && channel_id.is_none() {
return Err(IngestError::Rejected(
Expand All @@ -850,12 +859,13 @@ pub async fn ingest_event(
// ── 7. Token channel access ──────────────────────────────────────────
if let Some(ch_id) = channel_id {
check_token_channel_access(&auth, ch_id).map_err(IngestError::AuthFailed)?;
} else if kind_u32 == KIND_NIP29_CREATE_GROUP && auth.channel_ids().is_some() {
// Channel-scoped tokens cannot create channels outside their scope.
// kind:9007 without an h-tag would auto-create a server-assigned UUID,
// bypassing the token's channel restriction.
} else if auth.channel_ids().is_some() {
// Channel-scoped tokens cannot publish global events — that would bypass
// the token's channel restriction. This covers kind:1 (global text notes),
// kind:3 (contact lists), kind:0 (profiles), and kind:9007 (create-group
// without an h-tag, which would auto-assign a server UUID).
return Err(IngestError::AuthFailed(
"restricted: channel-scoped tokens must include an h tag for create-group".into(),
"restricted: channel-scoped tokens cannot publish global events".into(),
));
}

Expand Down Expand Up @@ -1220,11 +1230,12 @@ pub async fn ingest_event(
});
}

let (stored_event, was_inserted) = if kind_u32 == KIND_PROFILE {
// kind:0 is replaceable — use addressable event storage.
let (stored_event, was_inserted) = if sprout_core::kind::is_replaceable(kind_u32) {
// NIP-16 replaceable event — atomic replace with stale-write protection.
// channel_id is None for global kinds (0, 1, 3) due to step 5b above.
state
.db
.replace_addressable_event(&event, None)
.replace_addressable_event(&event, channel_id)
.await
.map_err(|e| IngestError::Internal(format!("error: {e}")))?
} else {
Expand Down
23 changes: 12 additions & 11 deletions crates/sprout-relay/src/handlers/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,19 @@ async fn handle_search_req(
conn: &ConnectionState,
state: &AppState,
) {
if accessible_channels.is_empty() {
conn.send(RelayMessage::eose(sub_id));
return;
}

let all_channels_filter = {
let ids: Vec<String> = accessible_channels
.iter()
.map(|id| id.to_string())
.collect();
format!("channel_id:=[{}]", ids.join(","))
if accessible_channels.is_empty() {
"channel_id:=__global__".to_string()
} else {
let ids: Vec<String> = accessible_channels
.iter()
.map(|id| id.to_string())
.collect();
format!(
"(channel_id:=[{}] || channel_id:=__global__)",
ids.join(",")
)
}
};

let mut seen_ids: HashSet<nostr::EventId> = HashSet::new();
Expand Down Expand Up @@ -354,7 +356,6 @@ async fn handle_search_req(
let hit_ids: Vec<Vec<u8>> = search_result
.hits
.into_iter()
.filter(|h| h.channel_id.is_some())
.filter_map(|h| hex::decode(&h.event_id).ok())
.filter(|bytes| bytes.len() == 32)
.collect();
Expand Down
8 changes: 5 additions & 3 deletions crates/sprout-relay/src/handlers/side_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@ async fn emit_addressable_discovery_event(
.sign_with_keys(&state.relay_keypair)
.map_err(|e| anyhow::anyhow!("failed to sign kind:{kind}: {e}"))?;

let (stored, _) = state
let (stored, was_inserted) = state
.db
.replace_addressable_event(&event, Some(channel_id))
.await?;
let kind_u32 = event_kind_u32(&stored.event);
dispatch_persistent_event(state, &stored, kind_u32, relay_pubkey_hex).await;
if was_inserted {
let kind_u32 = event_kind_u32(&stored.event);
dispatch_persistent_event(state, &stored, kind_u32, relay_pubkey_hex).await;
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sprout-relay/src/nip11.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RelayInfo {
description: "Sprout — private team communication relay".to_string(),
pubkey: None,
contact: None,
supported_nips: vec![1, 10, 11, 17, 25, 29, 42, 50],
supported_nips: vec![1, 2, 10, 11, 16, 17, 25, 29, 42, 50],
software: "https://github.com/sprout-rs/sprout".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
limitation: Some(RelayLimitation {
Expand Down
Loading
Loading