Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fuzzy-nails-trace.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cleverbrush/otel": minor
---

Add `@cleverbrush/otel/client` with typed-client tracing middleware that creates outbound CLIENT spans and injects W3C trace context for distributed service-to-service traces.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The flagship package is **`@cleverbrush/schema`** — a schema validation librar
| [`@cleverbrush/deep`](./libs/deep) | Deep operations on objects: deep equality, deep merge, flattening, hashing |
| [`@cleverbrush/scheduler`](./libs/scheduler) | Cron-like job scheduler for Node.js using worker threads |
| [`@cleverbrush/client`](./libs/client) | Typed HTTP client for `@cleverbrush/server` API contracts — Proxy-based, zero codegen, full type inference. Optional React + TanStack Query integration via `/react` subpath |
| [`@cleverbrush/otel`](./libs/otel) | OpenTelemetry instrumentation — traces for HTTP, SQL, and outbound client calls; OTLP log sink with trace correlation; DI integration |
| [`@cleverbrush/knex-clickhouse`](./libs/knex-clickhouse) | Knex query builder dialect for ClickHouse |

---
Expand Down
21 changes: 19 additions & 2 deletions libs/otel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,23 @@ const server = createServer()

A `SpanKind.SERVER` span is opened per request, named `operationId` or `METHOD route` and tagged with the standard HTTP semantic-convention attributes. W3C `traceparent` is extracted, so spans link to upstream callers.

### 3. Trace SQL queries
### 3. Trace typed client calls between services

```ts
import { createClient } from '@cleverbrush/client';
import { clientTracingMiddleware } from '@cleverbrush/otel/client';

const client = createClient(api, {
baseUrl: 'http://todo-backend:3000',
middlewares: [clientTracingMiddleware()]
});
```

`clientTracingMiddleware()` opens a `SpanKind.CLIENT` span around each typed client call and injects W3C `traceparent` / `tracestate` / `baggage` headers. When the downstream Cleverbrush service uses `tracingMiddleware()`, SigNoz and other OTel backends show both services under one distributed trace.

Put the tracing middleware first in the client middleware list so it wraps retries, timeouts, and batching. If you use `batching()`, keep batching last so each logical subrequest carries its own trace context.

### 4. Trace SQL queries

```ts
import { instrumentKnex } from '@cleverbrush/otel';
Expand All @@ -71,7 +87,7 @@ const db = instrumentKnex(

Every Knex query becomes a `SpanKind.CLIENT` span with `db.system.name`, `db.namespace`, `db.operation.name`, `db.query.text`, and parented under the active server span automatically.

### 4. Send logs as OTLP records (with trace correlation)
### 5. Send logs as OTLP records (with trace correlation)

```ts
import { createLogger, consoleSink } from '@cleverbrush/log';
Expand All @@ -90,6 +106,7 @@ const logger = createLogger({
| --------------------------------- | --------------------------------------------------------- |
| `setupOtel(config)` | Boot the Node SDK; returns `{ shutdown(), sdk }` |
| `tracingMiddleware(opts?)` | `@cleverbrush/server` middleware; opens SERVER span |
| `clientTracingMiddleware(opts?)` | `@cleverbrush/client` middleware; opens CLIENT span |
| `instrumentKnex(knex, opts?)` | Hook a Knex instance; emits CLIENT span per query |
| `otelLogSink(opts?)` | `@cleverbrush/log` sink → OTLP log records |
| `traceEnricher()` | `@cleverbrush/log` enricher → adds `TraceId` / `SpanId` |
Expand Down
4 changes: 4 additions & 0 deletions libs/otel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
},
"./client": {
"types": "./dist/client.d.ts",
"import": "./dist/client.js"
},
"./instrumentations": {
"types": "./dist/instrumentations.d.ts",
"import": "./dist/instrumentations.js"
Expand Down
230 changes: 230 additions & 0 deletions libs/otel/src/client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import { batching } from '@cleverbrush/client/batching';
import {
propagation,
SpanKind,
SpanStatusCode,
trace
} from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import {
InMemorySpanExporter,
SimpleSpanProcessor
} from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
vi
} from 'vitest';
import { clientTracingMiddleware } from './client.js';
import { tracingMiddleware } from './middleware/tracing.js';

const exporter = new InMemorySpanExporter();
const provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)]
});

beforeAll(() => {
provider.register();
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
});

afterAll(async () => {
await provider.shutdown();
});

beforeEach(() => exporter.reset());

function parseTraceparent(value: string): {
traceId: string;
spanId: string;
} {
const parts = value.split('-');
return {
traceId: parts[1]!,
spanId: parts[2]!
};
}

function batchResponse(count: number): Response {
return new Response(
JSON.stringify({
responses: Array.from({ length: count }, () => ({
status: 200,
headers: { 'content-type': 'application/json' },
body: '{}'
}))
}),
{
status: 200,
headers: { 'content-type': 'application/json' }
}
);
}

describe('clientTracingMiddleware', () => {
it('creates a CLIENT span and injects traceparent from that span', async () => {
const mw = clientTracingMiddleware();
let injectedTraceparent = '';
const send = mw(async (_url, init) => {
const headers = init.headers as Record<string, string>;
injectedTraceparent = headers.traceparent;
expect(headers.Accept).toBe('application/json');
return new Response(null, { status: 200 });
});

const tracer = trace.getTracer('client-test');
await tracer.startActiveSpan('service-a.request', async parent => {
try {
await send('http://service-b.local/api/todos', {
method: 'GET',
headers: { Accept: 'application/json' }
});
} finally {
parent.end();
}
});

const spans = exporter.getFinishedSpans();
const parent = spans.find(s => s.name === 'service-a.request')!;
const client = spans.find(s => s.kind === SpanKind.CLIENT)!;
const traceparent = parseTraceparent(injectedTraceparent);

expect(client.name).toBe('GET /api/todos');
expect(client.parentSpanContext?.spanId).toBe(
parent.spanContext().spanId
);
expect(traceparent.traceId).toBe(client.spanContext().traceId);
expect(traceparent.spanId).toBe(client.spanContext().spanId);
});

it('lets downstream server middleware extract the client span as parent', async () => {
let outboundHeaders: Record<string, string> = {};
const clientSend = clientTracingMiddleware()(async (_url, init) => {
outboundHeaders = init.headers as Record<string, string>;
return new Response(null, { status: 204 });
});

const tracer = trace.getTracer('distributed-test');
await tracer.startActiveSpan('service-a.request', async parent => {
try {
await clientSend('http://service-b.local/api/downstream', {
method: 'POST'
});
} finally {
parent.end();
}
});

const clientSpan = exporter
.getFinishedSpans()
.find(s => s.kind === SpanKind.CLIENT)!;
const serverMw = tracingMiddleware();
const ctx = {
method: 'POST',
url: new URL('http://service-b.local/api/downstream'),
headers: outboundHeaders,
items: new Map<string, unknown>(),
response: {
statusCode: 200,
setHeader: vi.fn()
}
};

await serverMw(ctx, async () => {});

const serverSpan = exporter
.getFinishedSpans()
.find(s => s.kind === SpanKind.SERVER)!;

expect(serverSpan.spanContext().traceId).toBe(
clientSpan.spanContext().traceId
);
expect(serverSpan.parentSpanContext?.spanId).toBe(
clientSpan.spanContext().spanId
);
});

it('uses typed client endpoint metadata for span names and attributes', async () => {
const send = clientTracingMiddleware()(async () => {
return new Response(null, { status: 201 });
});

await send('http://service-b.local/api/todos', {
method: 'POST',
headers: {},
__endpointMeta: {
group: 'todos',
endpoint: 'create',
path: '/api/todos',
operationId: 'createTodo',
tags: ['todos']
}
} as RequestInit);

const span = exporter.getFinishedSpans()[0]!;
expect(span.name).toBe('createTodo');
expect(span.attributes['http.route']).toBe('/api/todos');
expect(span.attributes['cleverbrush.client.group']).toBe('todos');
expect(span.attributes['cleverbrush.client.endpoint']).toBe('create');
expect(span.attributes['cleverbrush.endpoint.operationId']).toBe(
'createTodo'
);
expect(span.attributes['cleverbrush.endpoint.tags']).toBe('todos');
});

it('marks HTTP error responses as failed client spans', async () => {
const send = clientTracingMiddleware()(async () => {
return new Response(null, { status: 503 });
});

await send('http://service-b.local/api/todos', { method: 'GET' });

const span = exporter.getFinishedSpans()[0]!;
expect(span.status.code).toBe(SpanStatusCode.ERROR);
expect(span.attributes['http.response.status_code']).toBe(503);
});

it('records thrown fetch errors on the client span', async () => {
const send = clientTracingMiddleware()(async () => {
throw new TypeError('fetch failed');
});

await expect(
send('http://service-b.local/api/todos', { method: 'GET' })
).rejects.toThrow('fetch failed');

const span = exporter.getFinishedSpans()[0]!;
expect(span.status.code).toBe(SpanStatusCode.ERROR);
expect(span.events.some(e => e.name === 'exception')).toBe(true);
});

it('preserves logical subrequest trace headers when wrapping batching', async () => {
const fetch = vi.fn().mockResolvedValue(batchResponse(2));
const send = clientTracingMiddleware()(
batching({ windowMs: 1 })(fetch)
);

await Promise.all([
send('http://service-b.local/api/a', { method: 'GET' }),
send('http://service-b.local/api/b', { method: 'GET' })
]);

expect(fetch).toHaveBeenCalledOnce();
const init = fetch.mock.calls[0]![1] as RequestInit;
const body = JSON.parse(String(init.body)) as {
requests: Array<{ headers: Record<string, string> }>;
};

const firstTraceparent = body.requests[0]!.headers.traceparent;
const secondTraceparent = body.requests[1]!.headers.traceparent;

expect(firstTraceparent).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-/);
expect(secondTraceparent).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-/);
expect(firstTraceparent).not.toBe(secondTraceparent);
});
});
Loading
Loading