Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .changeset/beige-teams-spend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
'@graphql-hive/envelop': minor
'@graphql-hive/apollo': minor
'@graphql-hive/core': minor
'@graphql-hive/yoga': minor
---

Support circuit breaking for usage reporting.

Circuit breaking is a fault-tolerance pattern that prevents a system from repeatedly calling a failing service. When errors or timeouts exceed a set threshold, the circuit “opens,” blocking further requests until the service recovers.

This ensures that during a network issue or outage, the service using the Hive SDK remains healthy and is not overwhelmed by failed usage reports or repeated retries.

```ts
import { createClient } from "@graphql-hive/core"

const client = createClient({
agent: {
circuitBreaker: {
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: 5,
/**
* Percentage of requests failing before the circuit breaker kicks in.
* Default: 50
*/
errorThresholdPercentage: 1,
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: 10_000,
},
}
})
```
3 changes: 3 additions & 0 deletions packages/libraries/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
"graphql": "^0.13.0 || ^14.0.0 || ^15.0.0 || ^16.0.0"
},
"dependencies": {
"@graphql-hive/signal": "^2.0.0",
"@graphql-tools/utils": "^10.0.0",
"@whatwg-node/fetch": "^0.10.6",
"async-retry": "^1.3.3",
"js-md5": "0.8.3",
"lodash.sortby": "^4.7.0",
"opossum": "^9.0.0",
"tiny-lru": "^8.0.2"
},
"devDependencies": {
Expand All @@ -58,6 +60,7 @@
"@types/async-retry": "1.4.8",
"@types/js-md5": "0.8.0",
"@types/lodash.sortby": "4.7.9",
"@types/opossum": "8.1.9",
"graphql": "16.9.0",
"nock": "14.0.10",
"tslib": "2.8.1",
Expand Down
52 changes: 52 additions & 0 deletions packages/libraries/core/playground/agent-circuit-breaker.ts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hard to unit test this functionality, so I added this file that I can play around with the values for the circuit breaker.

I encourage reviewers to do the same to get an understanding on how this works.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
*
* Just a small playground to play around with different scenarios arounf the agent.
* You can run it like this: `bun run --watch packages/libraries/core/playground/agent-circuit-breaker.ts`
*/

import { createAgent } from '../src/client/agent.js';

let data: Array<{}> = [];

const agent = createAgent<{}>(
{
debug: true,
endpoint: 'http://127.0.0.1',
token: 'noop',
async fetch(_url, _opts) {
// throw new Error('FAIL FAIL');
console.log('SENDING!');
return new Response('ok', {
status: 200,
});
},
circuitBreaker: {
errorThresholdPercentage: 1,
resetTimeout: 10_000,
volumeThreshold: 0,
},
maxSize: 1,
maxRetries: 0,
},
{
body() {
data = [];
return String(data);
},
data: {
clear() {
data = [];
},
size() {
return data.length;
},
set(d) {
data.push(d);
},
},
},
);

setInterval(() => {
agent.capture({});
}, 1_000);
156 changes: 135 additions & 21 deletions packages/libraries/core/src/client/agent.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
import { fetch as defaultFetch } from '@whatwg-node/fetch';
import { version } from '../version.js';
import { http } from './http-client.js';
import type { Logger } from './types.js';
import { CircuitBreakerInterface, createHiveLogger, loadCircuitBreaker } from './utils.js';

type ReadOnlyResponse = Pick<Response, 'status' | 'text' | 'json' | 'statusText'>;

export type AgentCircuitBreakerConfiguration = {
/**
* Percentage after what the circuit breaker should kick in.
* Default: 50
*/
errorThresholdPercentage: number;
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: number;
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: number;
};

const defaultCircuitBreakerConfiguration: AgentCircuitBreakerConfiguration = {
errorThresholdPercentage: 50,
volumeThreshold: 10,
resetTimeout: 30_000,
};

export interface AgentOptions {
enabled?: boolean;
name?: string;
Expand Down Expand Up @@ -48,7 +74,14 @@ export interface AgentOptions {
* WHATWG Compatible fetch implementation
* used by the agent to send reports
*/
fetch?: typeof fetch;
fetch?: typeof defaultFetch;
/**
* Circuit Breaker Configuration.
* true -> Use default configuration
* false -> Disable
* object -> use custom configuration see {AgentCircuitBreakerConfiguration}
*/
circuitBreaker?: boolean | AgentCircuitBreakerConfiguration;
}

export function createAgent<TEvent>(
Expand All @@ -67,23 +100,31 @@ export function createAgent<TEvent>(
headers?(): Record<string, string>;
},
) {
const options: Required<Omit<AgentOptions, 'fetch'>> = {
const options: Required<Omit<AgentOptions, 'fetch' | 'circuitBreaker'>> & {
circuitBreaker: null | AgentCircuitBreakerConfiguration;
} = {
timeout: 30_000,
debug: false,
enabled: true,
minTimeout: 200,
maxRetries: 3,
sendInterval: 10_000,
maxSize: 25,
logger: console,
name: 'hive-client',
version,
...pluginOptions,
circuitBreaker:
pluginOptions.circuitBreaker == null || pluginOptions.circuitBreaker === true
? defaultCircuitBreakerConfiguration
: pluginOptions.circuitBreaker === false
? null
: pluginOptions.circuitBreaker,
logger: createHiveLogger(pluginOptions.logger ?? console, '[agent]'),
};

const enabled = options.enabled !== false;

let timeoutID: any = null;
let timeoutID: ReturnType<typeof setTimeout> | null = null;

function schedule() {
if (timeoutID) {
Expand Down Expand Up @@ -143,6 +184,27 @@ export function createAgent<TEvent>(
return send({ throwOnError: true, skipSchedule: true });
}

async function sendHTTPCall(buffer: string | Buffer<ArrayBufferLike>): Promise<Response> {
const signal = breaker.getSignal();
return await http.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
signal,
});
}

async function send(sendOptions?: {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are basically doing the following:

circuit breaker ( # shuts down if underlying fails or times out aggressively
  http client ( # calls fetch and retries if non okay response
      fetch( # good old fetch
          # the payload we want to send
       )
   )
)

throwOnError?: boolean;
skipSchedule: boolean;
Expand All @@ -160,23 +222,7 @@ export function createAgent<TEvent>(
data.clear();

debugLog(`Sending report (queue ${dataToSend})`);
const response = await http
.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
})
const response = sendFromBreaker(buffer)
.then(res => {
debugLog(`Report sent!`);
return res;
Expand Down Expand Up @@ -215,6 +261,74 @@ export function createAgent<TEvent>(
});
}

let breaker: CircuitBreakerInterface<
Parameters<typeof sendHTTPCall>,
ReturnType<typeof sendHTTPCall>
>;
let loadCircuitBreakerPromise: Promise<void> | null = null;
const breakerLogger = createHiveLogger(options.logger, '[circuit breaker]');

function noopBreaker(): typeof breaker {
return {
getSignal() {
return undefined;
},
fire: sendHTTPCall,
};
}

if (options.circuitBreaker) {
/**
* We support Cloudflare, which does not has the `events` module.
* So we lazy load opossum which has `events` as a dependency.
*/
breakerLogger.info('initialize circuit breaker');
loadCircuitBreakerPromise = loadCircuitBreaker(
CircuitBreaker => {
breakerLogger.info('started');
const realBreaker = new CircuitBreaker(sendHTTPCall, {
...options.circuitBreaker,
timeout: false,
autoRenewAbortController: true,
});

realBreaker.on('open', () =>
breakerLogger.error('circuit opened - backend seems unreachable.'),
);
realBreaker.on('halfOpen', () =>
breakerLogger.info('circuit half open - testing backend connectivity'),
);
realBreaker.on('close', () => breakerLogger.info('circuit closed - backend recovered '));

// @ts-expect-error missing definition in typedefs for `opposum`
breaker = realBreaker;
},
() => {
breakerLogger.info('circuit breaker not supported on platform');
breaker = noopBreaker();
},
);
} else {
breaker = noopBreaker();
}

async function sendFromBreaker(...args: Parameters<typeof breaker.fire>) {
if (!breaker) {
await loadCircuitBreakerPromise;
}

try {
return await breaker.fire(...args);
} catch (err: unknown) {
if (err instanceof Error && 'code' in err && err.code === 'EOPENBREAKER') {
breakerLogger.info('circuit open - sending report skipped');
return null;
}

throw err;
}
}

return {
capture,
sendImmediately,
Expand Down
18 changes: 17 additions & 1 deletion packages/libraries/core/src/client/http-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncRetry from 'async-retry';
import { abortSignalAny } from '@graphql-hive/signal';
import { crypto, fetch, URL } from '@whatwg-node/fetch';
import type { Logger } from './types.js';

Expand All @@ -21,6 +22,8 @@ interface SharedConfig {
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
}

/**
Expand Down Expand Up @@ -78,6 +81,8 @@ export async function makeFetchCall(
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
},
): Promise<Response> {
const logger = config.logger;
Expand All @@ -87,6 +92,9 @@ export async function makeFetchCall(
let maxTimeout = 2000;
let factor = 1.2;

const actionHeader =
config.method === 'POST' ? { 'x-client-action-id': crypto.randomUUID() } : undefined;

if (config.retry !== false) {
retries = config.retry?.retries ?? 5;
minTimeout = config.retry?.minTimeout ?? 200;
Expand All @@ -104,13 +112,15 @@ export async function makeFetchCall(
);

const getDuration = measureTime();
const signal = AbortSignal.timeout(config.timeout ?? 20_000);
const timeoutSignal = AbortSignal.timeout(config.timeout ?? 20_000);
const signal = config.signal ? abortSignalAny([config.signal, timeoutSignal]) : timeoutSignal;

const response = await (config.fetchImplementation ?? fetch)(endpoint, {
method: config.method,
body: config.body,
headers: {
'x-request-id': requestId,
...actionHeader,
...config.headers,
},
signal,
Expand All @@ -135,6 +145,12 @@ export async function makeFetchCall(
throw new Error(`Unexpected HTTP error. (x-request-id=${requestId})`, { cause: error });
});

if (config.signal?.aborted === true) {
const error = config.signal.reason ?? new Error('Request aborted externally.');
bail(error);
throw error;
}

if (isRequestOk(response)) {
logger?.info(
`${config.method} ${endpoint} (x-request-id=${requestId}) succeeded with status ${response.status} ${getDuration()}.`,
Expand Down
Loading
Loading