Skip to content

Commit

Permalink
api: add event channel overflow event
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed May 21, 2024
1 parent 18f2a09 commit 469ff79
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 24 deletions.
8 changes: 8 additions & 0 deletions deltachat-ffi/deltachat.h
Original file line number Diff line number Diff line change
Expand Up @@ -6324,6 +6324,14 @@ void dc_event_unref(dc_event_t* event);

#define DC_EVENT_CHATLIST_ITEM_CHANGED 2301


/**
* Inform that some events have been skipped due to event channel overflow.
*
* @param data1 (int) number of events that have been skipped
*/
#define DC_EVENT_CHANNEL_OVERFLOW 2400

/**
* @}
*/
Expand Down
10 changes: 7 additions & 3 deletions deltachat-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int
EventType::AccountsBackgroundFetchDone => 2200,
EventType::ChatlistChanged => 2300,
EventType::ChatlistItemChanged { .. } => 2301,
EventType::EventChannelOverflow { .. } => 2400,
}
}

Expand Down Expand Up @@ -624,6 +625,7 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc:
EventType::ChatlistItemChanged { chat_id } => {
chat_id.unwrap_or_default().to_u32() as libc::c_int
}
EventType::EventChannelOverflow { n } => *n as libc::c_int,
}
}

Expand Down Expand Up @@ -662,8 +664,9 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc:
| EventType::AccountsBackgroundFetchDone
| EventType::ChatlistChanged
| EventType::ChatlistItemChanged { .. }
| EventType::ConfigSynced { .. } => 0,
EventType::ChatModified(_) => 0,
| EventType::ConfigSynced { .. }
| EventType::ChatModified(_)
| EventType::EventChannelOverflow { .. } => 0,
EventType::MsgsChanged { msg_id, .. }
| EventType::ReactionsChanged { msg_id, .. }
| EventType::IncomingMsg { msg_id, .. }
Expand Down Expand Up @@ -729,7 +732,8 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut
| EventType::ChatEphemeralTimerModified { .. }
| EventType::IncomingMsgBunch { .. }
| EventType::ChatlistItemChanged { .. }
| EventType::ChatlistChanged => ptr::null_mut(),
| EventType::ChatlistChanged
| EventType::EventChannelOverflow { .. } => ptr::null_mut(),
EventType::ConfigureProgress { comment, .. } => {
if let Some(comment) = comment {
comment.to_c_string().unwrap_or_default().into_raw()
Expand Down
4 changes: 4 additions & 0 deletions deltachat-jsonrpc/src/api/types/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ pub enum EventType {
/// If `chat_id` is set to None, then all currently visible chats need to be rerendered, and all not-visible items need to be cleared from cache if the UI has a cache.
#[serde(rename_all = "camelCase")]
ChatlistItemChanged { chat_id: Option<u32> },

/// Inform than some events have been skipped due to event channel overflow.
EventChannelOverflow { n: u64 },
}

impl From<CoreEventType> for EventType {
Expand Down Expand Up @@ -378,6 +381,7 @@ impl From<CoreEventType> for EventType {
chat_id: chat_id.map(|id| id.to_u32()),
},
CoreEventType::ChatlistChanged => ChatlistChanged,
CoreEventType::EventChannelOverflow { n } => EventChannelOverflow { n },
}
}
}
1 change: 1 addition & 0 deletions node/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports = {
DC_DOWNLOAD_IN_PROGRESS: 1000,
DC_DOWNLOAD_UNDECIPHERABLE: 30,
DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE: 2200,
DC_EVENT_CHANNEL_OVERFLOW: 2400,
DC_EVENT_CHATLIST_CHANGED: 2300,
DC_EVENT_CHATLIST_ITEM_CHANGED: 2301,
DC_EVENT_CHAT_EPHEMERAL_TIMER_MODIFIED: 2021,
Expand Down
3 changes: 2 additions & 1 deletion node/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ module.exports = {
2150: 'DC_EVENT_WEBXDC_REALTIME_DATA',
2200: 'DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE',
2300: 'DC_EVENT_CHATLIST_CHANGED',
2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED'
2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED',
2400: 'DC_EVENT_CHANNEL_OVERFLOW'
}
2 changes: 2 additions & 0 deletions node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export enum C {
DC_DOWNLOAD_IN_PROGRESS = 1000,
DC_DOWNLOAD_UNDECIPHERABLE = 30,
DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE = 2200,
DC_EVENT_CHANNEL_OVERFLOW = 2400,
DC_EVENT_CHATLIST_CHANGED = 2300,
DC_EVENT_CHATLIST_ITEM_CHANGED = 2301,
DC_EVENT_CHAT_EPHEMERAL_TIMER_MODIFIED = 2021,
Expand Down Expand Up @@ -343,4 +344,5 @@ export const EventId2EventName: { [key: number]: string } = {
2200: 'DC_EVENT_ACCOUNTS_BACKGROUND_FETCH_DONE',
2300: 'DC_EVENT_CHATLIST_CHANGED',
2301: 'DC_EVENT_CHATLIST_ITEM_CHANGED',
2400: 'DC_EVENT_CHANNEL_OVERFLOW',
}
38 changes: 18 additions & 20 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,13 @@ impl EventEmitter {
/// [`try_recv`]: Self::try_recv
pub async fn recv(&self) -> Option<Event> {
let mut lock = self.0.lock().await;
loop {
match lock.recv().await {
Err(async_broadcast::RecvError::Overflowed(_)) => {
// Some events have been lost,
// but the channel is not closed.
continue;
}
Err(async_broadcast::RecvError::Closed) => return None,
Ok(event) => return Some(event),
}
match lock.recv().await {
Err(async_broadcast::RecvError::Overflowed(n)) => Some(Event {
id: 0,
typ: EventType::EventChannelOverflow { n },
}),
Err(async_broadcast::RecvError::Closed) => None,
Ok(event) => Some(event),
}
}

Expand All @@ -96,17 +93,18 @@ impl EventEmitter {
// to avoid blocking
// in case there is a concurrent call to `recv`.
let mut lock = self.0.try_lock()?;
loop {
match lock.try_recv() {
Err(async_broadcast::TryRecvError::Overflowed(_)) => {
// Some events have been lost,
// but the channel is not closed.
continue;
}
res @ (Err(async_broadcast::TryRecvError::Empty)
| Err(async_broadcast::TryRecvError::Closed)
| Ok(_)) => return Ok(res?),
match lock.try_recv() {
Err(async_broadcast::TryRecvError::Overflowed(n)) => {
// Some events have been lost,
// but the channel is not closed.
Ok(Event {
id: 0,
typ: EventType::EventChannelOverflow { n },
})
}
res @ (Err(async_broadcast::TryRecvError::Empty)
| Err(async_broadcast::TryRecvError::Closed)
| Ok(_)) => Ok(res?),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/events/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,10 @@ pub enum EventType {
/// Event for using in tests, e.g. as a fence between normally generated events.
#[cfg(test)]
Test,

/// Inform than some events have been skipped due to event channel overflow.
EventChannelOverflow {
/// Number of events skipped.
n: u64,
},
}

0 comments on commit 469ff79

Please sign in to comment.