WebSocket server for @hamicek/noex-store and @hamicek/noex-rules built on @hamicek/noex GenServer supervision.
- GenServer per connection with
simple_one_for_onesupervision and automatic cleanup - Full proxy for noex-store: CRUD, reactive query subscriptions, multi-bucket transactions
- Optional noex-rules proxy: emit events, manage facts, subscribe to rule matches
- Token-based authentication with pluggable validation and per-operation permissions
- Rate limiting, heartbeat ping/pong, and write-buffer backpressure
- Graceful shutdown with client notification and configurable grace period
- Connection registry with real-time stats and per-connection metadata
- JSON-over-WebSocket protocol (version 1.0.0) with request/response correlation and push channels
npm install @hamicek/noex-serverRequires @hamicek/noex and @hamicek/noex-store as peer dependencies and Node.js >= 20.
@hamicek/noex-rules is an optional peer dependency.
import { Store } from '@hamicek/noex-store';
import { NoexServer } from '@hamicek/noex-server';
const store = await Store.start({ name: 'my-store' });
store.defineBucket('users', {
key: 'id',
schema: {
id: { type: 'string', generated: 'uuid' },
name: { type: 'string', required: true },
role: { type: 'string', default: 'user' },
},
});
store.defineQuery('all-users', async (ctx) => ctx.bucket('users').all());
const server = await NoexServer.start({
port: 8080,
store,
});
console.log(`Listening on ws://localhost:${server.port}`);A client connects over WebSocket and sends JSON messages:
Creates and starts the server. Initializes the HTTP server, WebSocket upgrade handler, connection supervisor, and optional rate limiter.
const server = await NoexServer.start({
port: 8080,
store,
rules: engine, // optional
auth: { ... }, // optional
rateLimit: { ... }, // optional
});Gracefully stops the server.
| Option | Type | Default | Description |
|---|---|---|---|
gracePeriodMs |
number |
0 |
Time to wait for clients to disconnect after sending a shutdown notification |
await server.stop({ gracePeriodMs: 5000 });The port the server is listening on. Useful when starting with port: 0 for tests.
Number of active WebSocket connections.
Whether the server is currently accepting connections.
Returns metadata for all active connections: remote address, auth status, subscription counts, connected timestamp.
Aggregated statistics including connection counts, uptime, feature flags, and underlying store/rules stats.
const stats = await server.getStats();
// {
// name: 'noex-server',
// port: 8080,
// connectionCount: 42,
// uptimeMs: 360000,
// authEnabled: true,
// rateLimitEnabled: true,
// rulesEnabled: false,
// connections: { active: 42, authenticated: 40, totalStoreSubscriptions: 120, ... },
// store: { ... },
// rules: null,
// }interface ServerConfig {
store: Store; // required — noex-store instance
rules?: RuleEngine; // optional — noex-rules instance
port?: number; // default: 8080
host?: string; // default: '0.0.0.0'
path?: string; // default: '/' (WebSocket endpoint)
maxPayloadBytes?: number; // default: 1 MB
auth?: AuthConfig; // when omitted, auth is disabled
rateLimit?: RateLimitConfig; // when omitted, rate limiting is disabled
heartbeat?: HeartbeatConfig; // default: 30 s interval, 10 s timeout
backpressure?: BackpressureConfig; // default: 1 MB limit, 0.8 high water mark
name?: string; // default: 'noex-server'
}const server = await NoexServer.start({
store,
auth: {
validate: async (token) => {
const payload = verifyJwt(token);
if (!payload) return null;
return {
userId: payload.sub,
roles: payload.roles ?? ['user'],
expiresAt: payload.exp * 1000,
};
},
required: true, // default when auth is configured
permissions: {
check: (session, operation, resource) => {
if (session.roles.includes('admin')) return true;
if (operation === 'store.clear') return false;
return true;
},
},
},
});When auth.required is true, clients must send auth.login before any other operation. Session expiration is checked on every request.
rateLimit: {
maxRequests: 200, // requests per window
windowMs: 60_000, // sliding window duration
}Uses @hamicek/noex RateLimiter GenServer. Key is session.userId for authenticated connections, remote IP address otherwise.
heartbeat: {
intervalMs: 30_000, // how often to send ping
timeoutMs: 10_000, // how long to wait for pong
}The server sends ping messages at the configured interval. If no pong is received within timeoutMs, the connection is closed with code 4001.
backpressure: {
maxBufferedBytes: 1_048_576, // 1 MB
highWaterMark: 0.8, // pause push at 80%
}When the WebSocket write buffer exceeds the high water mark, push messages are paused to prevent memory exhaustion on slow clients.
All messages are JSON objects sent as WebSocket text frames. Protocol version: 1.0.0.
- Client connects via WebSocket
- Server sends a
welcomemessage with protocol version and auth requirements - If auth is required, client sends
auth.loginwith a token - Client sends requests, server responds with results or errors
- For subscriptions, server sends asynchronous
pushmessages - Server sends periodic
ping, client responds withpong - Either side can close the connection; server cleans up all subscriptions
Request (client -> server):
{ "id": 1, "type": "store.insert", "bucket": "users", "data": { "name": "Alice" } }Response (server -> client):
{ "id": 1, "type": "result", "data": { ... } }
{ "id": 1, "type": "error", "code": "VALIDATION_ERROR", "message": "...", "details": { ... } }Push (server -> client, no request correlation):
{ "type": "push", "channel": "subscription", "subscriptionId": "sub-1", "data": [...] }
{ "type": "push", "channel": "event", "subscriptionId": "sub-2", "data": { "topic": "...", "event": { ... } } }System (server -> client):
{ "type": "welcome", "version": "1.0.0", "serverTime": 1706745600000, "requiresAuth": true }
{ "type": "ping", "timestamp": 1706745600000 }
{ "type": "system", "event": "shutdown", "gracePeriodMs": 5000 }| Operation | Description | Payload fields |
|---|---|---|
store.insert |
Insert a record | bucket, data |
store.get |
Get by primary key | bucket, key |
store.update |
Update a record | bucket, key, data |
store.delete |
Delete a record | bucket, key |
store.all |
All records | bucket |
store.where |
Filter records | bucket, filter |
store.findOne |
First match | bucket, filter |
store.count |
Count records | bucket, filter? |
store.first |
First N records | bucket, n |
store.last |
Last N records | bucket, n |
store.paginate |
Cursor pagination | bucket, limit, after? |
store.clear |
Clear all records | bucket |
store.sum |
Sum a numeric field | bucket, field, filter? |
store.avg |
Average a numeric field | bucket, field, filter? |
store.min |
Minimum value | bucket, field, filter? |
store.max |
Maximum value | bucket, field, filter? |
store.subscribe |
Subscribe to reactive query | query, params? |
store.unsubscribe |
Cancel subscription | subscriptionId |
store.transaction |
Atomic multi-bucket transaction | operations |
store.buckets |
List defined buckets | — |
store.stats |
Store statistics | — |
Send multiple operations atomically:
{
"id": 10,
"type": "store.transaction",
"operations": [
{ "op": "get", "bucket": "users", "key": "user-1" },
{ "op": "update", "bucket": "users", "key": "user-1", "data": { "credits": 400 } },
{ "op": "insert", "bucket": "logs", "data": { "action": "credit_update" } }
]
}Supported ops: get, insert, update, delete, where, findOne, count.
Available only when rules is configured. Returns RULES_NOT_AVAILABLE otherwise.
| Operation | Description | Payload fields |
|---|---|---|
rules.emit |
Emit an event | topic, data, correlationId? |
rules.setFact |
Set a fact | key, value |
rules.getFact |
Get a fact | key |
rules.deleteFact |
Delete a fact | key |
rules.queryFacts |
Query facts by pattern | pattern |
rules.getAllFacts |
Get all facts | — |
rules.subscribe |
Subscribe to rule events | pattern |
rules.unsubscribe |
Cancel subscription | subscriptionId |
rules.stats |
Engine statistics | — |
| Operation | Description | Payload fields |
|---|---|---|
auth.login |
Authenticate with token | token |
auth.logout |
End session | — |
auth.whoami |
Current session info | — |
| Code | Description |
|---|---|
PARSE_ERROR |
Invalid JSON |
INVALID_REQUEST |
Missing id or type |
UNKNOWN_OPERATION |
Unsupported operation type |
VALIDATION_ERROR |
Schema validation failed |
NOT_FOUND |
Record or subscription not found |
ALREADY_EXISTS |
Duplicate key or unique constraint violation |
CONFLICT |
Transaction version conflict |
UNAUTHORIZED |
Authentication required or token invalid |
FORBIDDEN |
Insufficient permissions |
RATE_LIMITED |
Rate limit exceeded |
BACKPRESSURE |
Write buffer full, slow down |
INTERNAL_ERROR |
Unexpected server error |
BUCKET_NOT_DEFINED |
Bucket does not exist |
QUERY_NOT_DEFINED |
Reactive query not defined |
RULES_NOT_AVAILABLE |
Rule engine not configured |
NoexServer
└── ConnectionSupervisor (simple_one_for_one)
├── ConnectionServer #1 (GenServer per WebSocket)
├── ConnectionServer #2
└── ConnectionServer #N
Each WebSocket connection is managed by a dedicated ConnectionServer GenServer. The supervisor uses the temporary restart strategy — crashed connections are cleaned up (all subscriptions unsubscribed, WebSocket closed) but not restarted.
The request pipeline for each message:
- JSON parse and validate
- Authentication check (if configured)
- Rate limit check (if configured)
- Route to store proxy or rules proxy
- Serialize result or map error
MIT