Secure P2P Communication Library for Go
Glueberry is a production-ready Go library for peer-to-peer communications built on libp2p. It provides encrypted, multiplexed streams between peers with app-controlled handshaking and automatic reconnection.
✨ End-to-End Encryption – ChaCha20-Poly1305 AEAD with X25519 ECDH key exchange 🔄 Automatic Reconnection – Exponential backoff with configurable retry policies 🎯 Multiple Named Streams – Per-peer stream multiplexing (e.g., "messages", "consensus") ⚡ Event-Driven API – Non-blocking connection state notifications via channels 🔒 Peer Blacklisting – Connection-level enforcement via libp2p connection gater 🌐 NAT Traversal – Built-in hole punching and relay support via libp2p 📊 Observability – Prometheus metrics and OpenTelemetry tracing support 🧵 Thread-Safe – All public APIs safe for concurrent use
- Installation
- Quick Start
- Usage
- Documentation
- Examples
- API Reference
- Configuration
- Security
- Performance
- Testing
- Contributing
- License
- Go 1.25+ (recommended: 1.25.6)
- libp2p dependencies (automatically managed by
go mod)
go get github.com/blockberries/glueberry@latestpackage main
import (
"fmt"
"github.com/blockberries/glueberry"
)
func main() {
fmt.Println("Glueberry version:", glueberry.CurrentVersion())
}Here's a minimal example to get you started:
package main
import (
"crypto/ed25519"
"crypto/rand"
"fmt"
"log"
"github.com/blockberries/glueberry"
"github.com/blockberries/glueberry/pkg/streams"
"github.com/multiformats/go-multiaddr"
)
func main() {
// Generate identity key
_, privateKey, _ := ed25519.GenerateKey(rand.Reader)
// Configure listen address
listenAddr, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/9000")
// Create node configuration
cfg := glueberry.NewConfig(
privateKey,
"./addressbook.json",
[]multiaddr.Multiaddr{listenAddr},
)
// Create and start node
node, err := glueberry.New(cfg)
if err != nil {
log.Fatalf("Failed to create node: %v", err)
}
if err := node.Start(); err != nil {
log.Fatalf("Failed to start node: %v", err)
}
defer node.Stop()
fmt.Printf("Node started!\n")
fmt.Printf(" Peer ID: %s\n", node.PeerID())
fmt.Printf(" Listening on: %v\n", node.Addrs())
// Handle connection events
go func() {
for event := range node.Events() {
fmt.Printf("Event: %s -> %s\n", event.PeerID.String()[:8], event.State)
}
}()
// Handle incoming messages
go func() {
for msg := range node.Messages() {
fmt.Printf("Message from %s: %s\n", msg.PeerID.String()[:8], msg.Data)
}
}()
// Keep running
select {}
}import (
"crypto/ed25519"
"crypto/rand"
"github.com/blockberries/glueberry"
"github.com/multiformats/go-multiaddr"
)
// Generate identity key
_, privateKey, _ := ed25519.GenerateKey(rand.Reader)
// Configure listen addresses
listenAddr, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/9000")
// Create configuration
cfg := glueberry.NewConfig(
privateKey,
"./addressbook.json",
[]multiaddr.Multiaddr{listenAddr},
)
// Create node
node, err := glueberry.New(cfg)
if err != nil {
// Handle error
}
// Start listening
if err := node.Start(); err != nil {
// Handle error
}
defer node.Stop()import (
"log/slog"
"time"
)
cfg := glueberry.NewConfig(
privateKey,
"./addressbook.json",
[]multiaddr.Multiaddr{listenAddr},
// Optional configuration
glueberry.WithLogger(slog.Default()),
glueberry.WithHandshakeTimeout(120*time.Second),
glueberry.WithMaxMessageSize(50*1024*1024), // 50MB
glueberry.WithReconnectMaxAttempts(20),
)import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
// Parse peer address
peerAddr, _ := multiaddr.NewMultiaddr("/ip4/192.168.1.100/tcp/9001/p2p/12D3KooW...")
peerInfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
// Add to address book (auto-connects when node is started)
node.AddPeer(peerInfo.ID, peerInfo.Addrs, nil)// Connect to peer
if err := node.Connect(peerID); err != nil {
log.Printf("Connect failed: %v", err)
}
// Check connection state
state := node.ConnectionState(peerID)
fmt.Printf("Connection state: %v\n", state)// Disconnect from peer
if err := node.Disconnect(peerID); err != nil {
log.Printf("Disconnect failed: %v", err)
}
// Cancel automatic reconnection attempts
if err := node.CancelReconnection(peerID); err != nil {
log.Printf("Cancel reconnection failed: %v", err)
}// Blacklist a peer (blocks all connections)
if err := node.BlacklistPeer(peerID); err != nil {
log.Printf("Blacklist failed: %v", err)
}
// Remove from blacklist
if err := node.UnblacklistPeer(peerID); err != nil {
log.Printf("Unblacklist failed: %v", err)
}Glueberry uses a symmetric, two-phase handshake to prevent race conditions where encrypted streams become active before both sides are ready.
1. StateConnected event fires (handshake stream ready)
2. App sends Hello message
3. App receives PubKey from peer
4. App calls PrepareStreams() [Phase 1: derive key, ready to receive]
5. App sends Complete message
6. App receives Complete from peer
7. App calls FinalizeHandshake() [Phase 2: activate encrypted streams]
8. StateEstablished event fires (encrypted streams active)
import (
"crypto/ed25519"
"github.com/blockberries/glueberry/pkg/streams"
)
// Handshake message types
const (
msgHello byte = 1
msgPubKey byte = 2
msgComplete byte = 3
)
publicKey := privateKey.Public().(ed25519.PublicKey)
// Track handshake state per peer
type peerState struct {
gotHello bool
gotPubKey bool
sentComplete bool
gotComplete bool
peerPubKey ed25519.PublicKey
streamsPrepared bool
}
peerStates := make(map[string]*peerState)
// Handle connection events
go func() {
for event := range node.Events() {
if event.State == glueberry.StateConnected {
// Start handshake: send Hello
_ = node.Send(event.PeerID, streams.HandshakeStreamName, []byte{msgHello})
}
}
}()
// Handle handshake messages
go func() {
for msg := range node.Messages() {
if msg.StreamName != streams.HandshakeStreamName {
continue // Skip non-handshake messages
}
peerID := msg.PeerID.String()
state, ok := peerStates[peerID]
if !ok {
state = &peerState{}
peerStates[peerID] = state
}
switch msg.Data[0] {
case msgHello:
state.gotHello = true
// Send Hello back
_ = node.Send(msg.PeerID, streams.HandshakeStreamName, []byte{msgHello})
// Send our public key
pubKeyMsg := append([]byte{msgPubKey}, publicKey...)
_ = node.Send(msg.PeerID, streams.HandshakeStreamName, pubKeyMsg)
case msgPubKey:
state.peerPubKey = ed25519.PublicKey(msg.Data[1:])
state.gotPubKey = true
// Phase 1: Prepare streams (derive key, ready to receive)
if err := node.PrepareStreams(msg.PeerID, state.peerPubKey, []string{"messages"}); err != nil {
log.Printf("PrepareStreams failed: %v", err)
continue
}
state.streamsPrepared = true
// Send Complete
_ = node.Send(msg.PeerID, streams.HandshakeStreamName, []byte{msgComplete})
state.sentComplete = true
case msgComplete:
state.gotComplete = true
}
// Phase 2: Finalize when both sides are ready
if state.streamsPrepared && state.sentComplete && state.gotComplete {
if err := node.FinalizeHandshake(msg.PeerID); err != nil {
log.Printf("FinalizeHandshake failed: %v", err)
continue
}
fmt.Printf("Handshake complete with %s\n", peerID[:8])
delete(peerStates, peerID) // Cleanup
}
}
}()// Send encrypted message (after handshake complete)
data := []byte("Hello, peer!")
if err := node.Send(peerID, "messages", data); err != nil {
log.Printf("Send failed: %v", err)
}
// Send with context (timeout)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := node.SendCtx(ctx, peerID, "messages", data); err != nil {
log.Printf("Send failed: %v", err)
}for msg := range node.Messages() {
switch msg.StreamName {
case streams.HandshakeStreamName:
// Handle handshake messages
handleHandshake(msg)
case "messages":
// Handle application messages
fmt.Printf("Message from %s: %s\n", msg.PeerID, msg.Data)
case "consensus":
// Handle consensus messages
handleConsensus(msg)
default:
log.Printf("Unknown stream: %s", msg.StreamName)
}
}for event := range node.Events() {
switch event.State {
case glueberry.StateConnecting:
fmt.Printf("Connecting to %s\n", event.PeerID)
case glueberry.StateConnected:
fmt.Printf("Connected to %s (handshake stream ready)\n", event.PeerID)
// Start handshake
case glueberry.StateEstablished:
fmt.Printf("Connection established with %s (encrypted streams active)\n", event.PeerID)
case glueberry.StateDisconnected:
fmt.Printf("Disconnected from %s\n", event.PeerID)
if event.IsError() {
fmt.Printf(" Error: %v\n", event.Error)
}
case glueberry.StateReconnecting:
fmt.Printf("Reconnecting to %s\n", event.PeerID)
}
}// Create subscription
sub := node.SubscribeEvents()
defer sub.Unsubscribe()
go func() {
for evt := range sub.Events() {
fmt.Printf("Subscription received: %v\n", evt)
}
}()// Only established events
sub := node.EventsForStates(glueberry.StateEstablished)
defer sub.Unsubscribe()
// Only events for specific peer
sub := node.EventsForPeer(peerID)
defer sub.Unsubscribe()
// Custom filter
sub := node.FilteredEvents(glueberry.EventFilter{
PeerID: &peerID,
States: []glueberry.ConnectionState{
glueberry.StateEstablished,
glueberry.StateDisconnected,
},
})
defer sub.Unsubscribe()- ARCHITECTURE.md – Comprehensive architecture documentation
- CHANGELOG.md – Version history and release notes
- CLAUDE.md – Development guidelines and patterns
- Godoc – API reference
- Package Documentation – See
doc.goin each package - Examples – See
examples/directory for complete applications - Tests – See
*_test.gofiles for usage patterns
Glueberry includes several example applications demonstrating different use cases:
Minimal example showing node creation and basic connectivity.
go run examples/basic/main.goInteractive two-peer chat application with complete handshake flow.
# Terminal 1
go run examples/simple-chat/main.go -port 9000
# Terminal 2
go run examples/simple-chat/main.go -port 9001 -peer /ip4/127.0.0.1/tcp/9000/p2p/...Multi-peer cluster with gossip-style message broadcast.
go run examples/cluster/main.goLarge file transfer with progress reporting.
# Sender
go run examples/file-transfer/main.go -mode send -file document.pdf
# Receiver
go run examples/file-transfer/main.go -mode receiveRPC-style request-response communication.
# Server
go run examples/rpc/main.go -mode server
# Client
go run examples/rpc/main.go -mode client -method echo -params "Hello"Integration with Blockberry blockchain system (consensus messages, transactions).
go run examples/blockberry-integration/main.goNew(cfg *Config) (*Node, error)– Create nodeStart() error– Start listeningStop() error– Graceful shutdown
PeerID() peer.ID– Get local peer IDPublicKey() ed25519.PublicKey– Get local public keyAddrs() []multiaddr.Multiaddr– Get listen addressesVersion() ProtocolVersion– Get protocol version
AddPeer(peerID, addrs, metadata)– Add peerRemovePeer(peerID)– Remove peerBlacklistPeer(peerID)– Blacklist peerUnblacklistPeer(peerID)– Remove from blacklistGetPeer(peerID)– Get peer entryListPeers()– List all peers
Connect(peerID)– Initiate connectionConnectCtx(ctx, peerID)– Connect with contextDisconnect(peerID)– Disconnect peerDisconnectCtx(ctx, peerID)– Disconnect with contextConnectionState(peerID)– Get connection stateIsOutbound(peerID)– Check if outbound connectionCancelReconnection(peerID)– Cancel reconnect
PrepareStreams(peerID, peerPubKey, streamNames)– Phase 1: Derive keyFinalizeHandshake(peerID)– Phase 2: Activate streamsCompleteHandshake(peerID, peerPubKey, streamNames)– Combined (backward compat)
Send(peerID, streamName, data)– Send messageSendCtx(ctx, peerID, streamName, data)– Send with contextMessages() <-chan IncomingMessage– Receive messages
Events() <-chan ConnectionEvent– Receive eventsSubscribeEvents() *EventSubscription– Create subscriptionFilteredEvents(filter) *EventSubscription– Filtered subscriptionEventsForPeer(peerID) *EventSubscription– Per-peer eventsEventsForStates(...states) *EventSubscription– State-filtered events
PeerStatistics(peerID) *PeerStats– Get peer statsAllPeerStatistics() map[peer.ID]*PeerStats– All peer statsMessagesSent(peerID) uint64– Total messages sentHealth() *HealthStatus– Health check
const (
StateDisconnected ConnectionState = iota
StateConnecting
StateConnected // Handshake stream ready
StateEstablished // Encrypted streams active
StateReconnecting
StateCooldown
)const (
ErrCodeConnectionFailed
ErrCodeHandshakeFailed
ErrCodeHandshakeTimeout
ErrCodeStreamClosed
ErrCodeEncryptionFailed
ErrCodeDecryptionFailed
// ... and more
)type Logger interface {
Debug(msg string, keysAndValues ...any)
Info(msg string, keysAndValues ...any)
Warn(msg string, keysAndValues ...any)
Error(msg string, keysAndValues ...any)
}type Metrics interface {
ConnectionOpened(direction string)
ConnectionClosed(direction string)
MessageSent(stream string, bytes int)
MessageReceived(stream string, bytes int)
HandshakeDuration(seconds float64)
// ... and more
}type Config struct {
// Required
PrivateKey ed25519.PrivateKey
AddressBookPath string
ListenAddrs []multiaddr.Multiaddr
// Optional (with defaults)
Logger Logger
Metrics Metrics
HandshakeTimeout time.Duration // Default: 60s
MaxMessageSize int // Default: 10MB
ReconnectInitialBackoff time.Duration // Default: 1s
ReconnectMaxBackoff time.Duration // Default: 5m
ReconnectMaxAttempts int // Default: 10
EventBufferSize int // Default: 100
MessageBufferSize int // Default: 1000
DisableBackpressure bool // Default: false
HighWatermark int64 // Default: 1000
LowWatermark int64 // Default: 500
OnDecryptionError func(peer.ID, error)
}cfg := glueberry.NewConfig(
privateKey,
"./addressbook.json",
listenAddrs,
glueberry.WithLogger(myLogger),
glueberry.WithMetrics(myMetrics),
glueberry.WithHandshakeTimeout(120*time.Second),
glueberry.WithMaxMessageSize(50*1024*1024),
glueberry.WithReconnectMaxAttempts(20),
)| Option | Default | Description |
|---|---|---|
HandshakeTimeout |
60s | Time to complete handshake |
MaxMessageSize |
10MB | Maximum message size |
ReconnectInitialBackoff |
1s | Initial reconnect delay |
ReconnectMaxBackoff |
5m | Maximum reconnect delay |
ReconnectMaxAttempts |
10 | Max reconnect attempts |
EventBufferSize |
100 | Event channel buffer |
MessageBufferSize |
1000 | Message channel buffer |
HighWatermark |
1000 | Backpressure high mark |
LowWatermark |
500 | Backpressure low mark |
| Operation | Algorithm | Key Size |
|---|---|---|
| Identity | Ed25519 | 256-bit |
| Key Exchange | X25519 ECDH | 256-bit |
| Key Derivation | HKDF-SHA256 | 256-bit |
| Symmetric Encryption | ChaCha20-Poly1305 | 256-bit |
| Message Authentication | Poly1305 MAC | 128-bit tag |
- End-to-end encryption – All post-handshake messages encrypted
- Forward secrecy – Unique keys per peer pair
- Authenticated encryption – Poly1305 MAC prevents tampering
- Secure key management – Private keys never logged, secure zeroing on close
- Input validation – All network inputs validated
- Handshake timeout – Prevents resource exhaustion
- Blacklist enforcement – Connection-level blocking
- Protect private keys – Never commit keys to version control
- Use strong identity keys – Generate with
crypto/rand - Validate peer identities – Verify public keys during handshake
- Monitor for anomalies – Use metrics to detect attacks
- Keep dependencies updated – Regularly update Glueberry and libp2p
- Limit message sizes – Use
MaxMessageSizeto prevent DoS
Run benchmarks:
make bench # Quick benchmarks
make bench-all # All benchmarks
make bench-crypto # Crypto-onlyTypical Results (M1 Mac):
| Operation | Time | Allocs |
|---|---|---|
| Key Derivation (cached) | ~1 µs | 48 B |
| Key Derivation (uncached) | ~50 µs | 2 KB |
| Encryption (1KB) | ~10 µs | 1.5 KB |
| Decryption (1KB) | ~10 µs | 1.5 KB |
| Message Send (1KB) | ~100 µs | 5 KB |
- Reuse connections – Avoid frequent connect/disconnect cycles
- Batch small messages – Reduce encryption overhead
- Use flow control – Enable backpressure to prevent overload
- Monitor metrics – Track performance with Prometheus
- Profile your app – Use pprof to identify bottlenecks
Enable pprof in your application:
import _ "net/http/pprof"
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()Access profiles:
- CPU:
http://localhost:6060/debug/pprof/profile?seconds=30 - Memory:
http://localhost:6060/debug/pprof/heap - Goroutines:
http://localhost:6060/debug/pprof/goroutine
make test # All tests with race detection
make test-short # Quick tests (no race detection)
make test-race # Force race detectiongo test -v -run TestIntegrationmake bench
make bench-allmake fuzz # 30s per target (default)
FUZZTIME=5m make fuzz # Custom durationmake coverage # Generate HTML report
open coverage.html # View in browserContributions are welcome! Please follow these guidelines:
- gofmt – Format code with
make fmt - go vet – Run static analysis with
make vet - golangci-lint – Run linters with
make lint
- Write tests – All new features must have tests
- Run tests –
make testmust pass - Check coverage – Aim for >80% coverage
- Fork the repository
- Create a feature branch (
git checkout -b feature/my-feature) - Write tests for your changes
- Run
make check(fmt + vet + lint + test) - Commit with clear messages
- Push to your fork
- Submit a pull request
- Use clear, descriptive commit messages
- Reference issues in commits (
Fixes #123) - Follow conventional commits format (optional)
Glueberry is licensed under the Apache License 2.0.
See LICENSE for details.
- Documentation: ARCHITECTURE.md
- Godoc: pkg.go.dev/github.com/blockberries/glueberry
- Examples: examples/
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Glueberry is built on top of:
- libp2p – Modular P2P networking stack
- Cramberry – Binary serialization
- golang.org/x/crypto – Cryptographic primitives
Special thanks to the libp2p and Go communities for their excellent work.
Built with ❤️ by the Blockberries team