Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking: Message processing pipeline #268

Open
pete-eiger opened this issue Aug 14, 2023 · 4 comments
Open

Tracking: Message processing pipeline #268

pete-eiger opened this issue Aug 14, 2023 · 4 comments
Assignees
Labels
p1 High priority size:x-large Very large type:tracking Tracking issues with related scope

Comments

@pete-eiger
Copy link
Contributor

pete-eiger commented Aug 14, 2023

No description provided.

@pete-eiger pete-eiger added the type:spike Additional research required label Aug 14, 2023
@pete-eiger pete-eiger reopened this Aug 14, 2023
@hopeyen
Copy link
Collaborator

hopeyen commented Sep 2, 2023

Try to layout the ideal message pipeline after receiving a gossip.

Message Pipelining in SDK

Handle general Waku messages received

  • Nonce/timestamp (outer layer of GraphcastMessage) validation for all messages
  • Async message processing: when Graphcast Agent receives a Waku signal, asynchronously send the contained WakuMessage to a Radio Operator through a MPSC channel.
  • A separate runtime is used for processing waku signals; compared to using a new thread, runtime is faster for TCP/UDP communications.
  • Compared to the previous process of receive, parse, and validate, and save to persistence, Graphcast Agent now uses the quickest and lightest processing of gossips.

Message Pipelining in Radios

The majority of message processing falls on specific radios and their intended application. While the operations are mostly async, there are some improvements worth considering:

  • Validation for a specific content topic is done async and sequentially such that the order of messages are preserved. This is the processing bottleneck so we should explore concurrent validations.
  • Handlers after validation can be spawned to concurrent threads and joined afterwards
    • Subgraph radio on the public POI consensus computation: set up concurrent threads such that computation for one deployment does not need to wait for the computation of another deployment and block.
    • Listener radio on the storage of generic message: batch processing can be achieved through a new psql mutation resolver to throttle writing to the DB. Operator can either wait for some time or a number of messages before writing messages to the database. We are not necessarily concern with the order of messages inside the DB, but it's best to avoid parallel writes.
  • Raw/Generic Message Retention: For ephemeral use cases (most of the radios), raw messages are thrown away right after being decoded as a Graphcast typed message.
    • For listener radio, introduce a configurable retention window for raw or generic messages.
  • Aggregated summary and reports: Track an aggregation of messages
    • Subgraph radio: For each deployment (topic), tally the number of messages received, number of participating indexers (gossip network connectivity), attestation results (number of matched results versus divergence result). Allow indexers to configure report options - notification methods, report frequencies, and aggregation methods (summation, mean/mode/median, periodic summation, or a moving average).
    • Listener radio: Tally fields available on the GraphcastMessage level. For each type of GraphcastMessage, track number of peer nodes, number of messages per sender, number of messages per topic, and size and frequency of messages.
  • Persistence of Aggregations: Aggregated data should be persisted in a scalable storage solution for quick retrieval and analytics. Explore prometheus histogram and summaries or storing a summaries table in the database.
  • Pruning: Introduce a pruning configuration, either by message timestamp or maximum number of messages allowed to store. Ensure optimal resource usage.

Sequence of process

sequenceDiagram
    participant GP as Graphcast Peer
    participant SDK as Graphcast agent (SDK)
    participant RO as Radio Operator
    participant S as Stateful
    GP->>SDK: Waku Signal
    SDK->>SDK: Parse signal to WakuMessage
    SDK->>RO: Send WakuMessage to Radio operator
    Note over RO: Sequential Validation
    alt Concurrent
        RO->>RO: Different tasks for each message types
    else Parallel
        RO->>RO: Independently process messages of the same type
    end
    alt Persistence - In-memory
        RO->>S: Update to contain recent typed messages
    else Persistence - Retent FIFO
        RO->>S: Prune old Messages
        RO->>S: Store new Messages
    end
    S ->> S: Update aggregated summary
    RO ->> S: Query summary for periodic report


Loading

A different perspective on messages

timeline

	title Message lifespan

	section Generic

	Trigger: Either manual or perodic interval based on Networks
	
	Create : Radio Operator collect necessary info
	
	Send : Graphcast agent wraps message into WakuMessage

	: Stores a temporary local copy
	
	Waku signal propagation: Send to connected gossip peers
	
	Receive : Graphcast agent receives signal
	
	: Parse raw message into generic Graphcast message

	: Graphcast field validation (nonce, timestamp, first time sender)

	: Pass message to Radio Operator

	Process: Radio Operator handles application specific logic

	section Listener Radio

	Storage: Asynchronously store generic graphcast message to database

	: Summarize messages received/stored

	section Subgraph Radio

	Further decoding: Try decode messages to PublicPOIMessage
	: Validate POI message fields (block hash, sender identity)
	: Queue in remote_messages 
	Message comparison : determine comparison point from local copy
	: trigger after message collection duration
	: Take local and remote messages with matching deployment and block number
	: Determine comparison results for NotFound, Diverged, Matched
	: Notification
	: Summarize results

Loading

@chriswessels
Copy link
Member

Nice start! There are some aspects of this work that are missing from the spec, including:

  • Some notion of a raw/generic message retention window
  • Batched consumption and processing of raw messages to do aggregations (e.g. listener-radio counting messages per sender or topic)
  • Persistence of aggregations
  • Periodic pruning of raw message data that is outside the retention window

I recognise some of these may be Radio-level concerns, but we should have a spec for message pipelining that factors in these requirements.

@hopeyen
Copy link
Collaborator

hopeyen commented Sep 8, 2023

Updated the previous comment to include retention, persistence, aggregations, and pruning.

Linking 2 DB issues

@hopeyen
Copy link
Collaborator

hopeyen commented Sep 21, 2023

@pete-eiger pete-eiger changed the title Defining Issue relating to message processing pipeline Message processing pipeline Sep 25, 2023
@pete-eiger pete-eiger added type:tracking Tracking issues with related scope p1 High priority size:large Large size:x-large Very large and removed type:spike Additional research required size:large Large labels Sep 25, 2023
@pete-eiger pete-eiger changed the title Message processing pipeline Tracking: Message processing pipeline Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
p1 High priority size:x-large Very large type:tracking Tracking issues with related scope
Projects
None yet
Development

No branches or pull requests

3 participants