Skip to content

Commit

Permalink
Add demand control for uplink (backport of #1564)
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-Guo authored and trevor-scheer committed Mar 11, 2022
1 parent 73166f7 commit dac11e8
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 45 deletions.
15 changes: 15 additions & 0 deletions docs/source/api/apollo-gateway.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,21 @@ Provide this field **only** if you are using managed federation.
</tr>


<tr>
<td>

###### `fallbackPollIntervalInMs` (managed mode only)

`number`
</td>
<td>

Specify this option as a fallback if Uplink fails to provide a polling interval. This will also take effect if `fallbackPollIntervalInMs` is greater than the Uplink defined interval.

</td>
</tr>


<tr>
<td>

Expand Down
3 changes: 2 additions & 1 deletion gateway-js/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ This CHANGELOG pertains only to Apollo Federation packages in the `0.x` range. T

> The changes noted within this `vNEXT` section have not been released yet. New PRs and commits which introduce changes should include an entry in this `vNEXT` section as part of their development. When a release is being prepared, a new header will be (manually) created below and the appropriate changes within that release will be moved into the new section.
- _Nothing yet! Stay tuned._
- Respect the `minDelaySeconds` returning from Uplink when polling and retrying to fetch the supergraph schema from Uplink [PR #1564](https://github.com/apollographql/federation/pull/1564)
- Remove the previously deprecated `experimental_pollInterval` config option and deprecate `pollIntervalInMs` in favour of `fallbackPollIntervalInMs` (for managed mode only). [PR #1564](https://github.com/apollographql/federation/pull/1564)

## v0.48.3

Expand Down
2 changes: 1 addition & 1 deletion gateway-js/src/__generated__/graphqlTypes.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ describe('lifecycle hooks', () => {
);
});

it('registers schema change callbacks when experimental_pollInterval is set for unmanaged configs', async () => {
it('registers schema change callbacks when pollIntervalInMs is set for unmanaged configs', async () => {
const experimental_updateServiceDefinitions: Experimental_UpdateServiceDefinitions =
jest.fn(async (_config) => {
return { serviceDefinitions, isNewSchema: true };
Expand Down
11 changes: 0 additions & 11 deletions gateway-js/src/__tests__/integration/configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -443,15 +443,4 @@ describe('deprecation warnings', () => {
'The `schemaConfigDeliveryEndpoint` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent (array form) `uplinkEndpoints` configuration option.',
);
});

it('warns with `experimental_pollInterval` option set', async () => {
new ApolloGateway({
experimental_pollInterval: 10000,
logger,
});

expect(logger.warn).toHaveBeenCalledWith(
'The `experimental_pollInterval` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent `pollIntervalInMs` configuration option.',
);
});
});
17 changes: 11 additions & 6 deletions gateway-js/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export interface ServiceDefinitionUpdate {
export interface SupergraphSdlUpdate {
id: string;
supergraphSdl: string;
minDelaySeconds?: number;
}

export function isSupergraphSdlUpdate(
Expand Down Expand Up @@ -125,11 +126,6 @@ interface GatewayConfigBase {
// experimental observability callbacks
experimental_didResolveQueryPlan?: Experimental_DidResolveQueryPlanCallback;
experimental_didUpdateSupergraph?: Experimental_DidUpdateSupergraphCallback;
/**
* @deprecated use `pollIntervalInMs` instead
*/
experimental_pollInterval?: number;
pollIntervalInMs?: number;
experimental_approximateQueryPlanStoreMiB?: number;
experimental_autoFragmentization?: boolean;
fetcher?: typeof fetch;
Expand All @@ -150,6 +146,7 @@ export interface ServiceListGatewayConfig extends GatewayConfigBase {
| ((
service: ServiceEndpointDefinition,
) => Promise<HeadersInit> | HeadersInit);
pollIntervalInMs?: number;
}

export interface ManagedGatewayConfig extends GatewayConfigBase {
Expand All @@ -168,6 +165,11 @@ export interface ManagedGatewayConfig extends GatewayConfigBase {
*/
uplinkEndpoints?: string[];
uplinkMaxRetries?: number;
/**
* @deprecated use `fallbackPollIntervalInMs` instead
*/
pollIntervalInMs?: number;
fallbackPollIntervalInMs?: number;
}

// TODO(trevor:removeServiceList): migrate users to `supergraphSdl` function option
Expand All @@ -176,6 +178,7 @@ interface ManuallyManagedServiceDefsGatewayConfig extends GatewayConfigBase {
* @deprecated: use `supergraphSdl` instead (either as a `SupergraphSdlHook` or `SupergraphManager`)
*/
experimental_updateServiceDefinitions: Experimental_UpdateServiceDefinitions;
pollIntervalInMs?: number;
}

// TODO(trevor:removeServiceList): migrate users to `supergraphSdl` function option
Expand All @@ -185,6 +188,7 @@ interface ExperimentalManuallyManagedSupergraphSdlGatewayConfig
* @deprecated: use `supergraphSdl` instead (either as a `SupergraphSdlHook` or `SupergraphManager`)
*/
experimental_updateSupergraphSdl: Experimental_UpdateSupergraphSdl;
pollIntervalInMs?: number;
}

export function isManuallyManagedSupergraphSdlGatewayConfig(
Expand Down Expand Up @@ -238,7 +242,7 @@ type ManuallyManagedGatewayConfig =
| ManuallyManagedServiceDefsGatewayConfig
| ExperimentalManuallyManagedSupergraphSdlGatewayConfig
| ManuallyManagedSupergraphSdlGatewayConfig
// TODO(trevor:removeServiceList
// TODO(trevor:removeServiceList)
| ServiceListGatewayConfig;

// TODO(trevor:removeServiceList)
Expand Down Expand Up @@ -322,6 +326,7 @@ export function isManagedConfig(
return (
'schemaConfigDeliveryEndpoint' in config ||
'uplinkEndpoints' in config ||
'fallbackPollIntervalInMs' in config ||
(!isLocalConfig(config) &&
!isStaticSupergraphSdlConfig(config) &&
!isManuallyManagedConfig(config))
Expand Down
18 changes: 12 additions & 6 deletions gateway-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ export class ApolloGateway implements GraphQLService {
this.experimental_didUpdateSupergraph =
config?.experimental_didUpdateSupergraph;

this.pollIntervalInMs =
config?.pollIntervalInMs ?? config?.experimental_pollInterval;
if (isManagedConfig(this.config)) {
this.pollIntervalInMs =
this.config.fallbackPollIntervalInMs ?? this.config.pollIntervalInMs;
} else if (isServiceListConfig(this.config)) {
this.pollIntervalInMs = this.config?.pollIntervalInMs;
}

this.issueConfigurationWarningsIfApplicable();

Expand Down Expand Up @@ -254,7 +258,7 @@ export class ApolloGateway implements GraphQLService {
'Polling Apollo services at a frequency of less than once per 10 ' +
'seconds (10000) is disallowed. Instead, the minimum allowed ' +
'pollInterval of 10000 will be used. Please reconfigure your ' +
'`pollIntervalInMs` accordingly. If this is problematic for ' +
'`fallbackPollIntervalInMs` accordingly. If this is problematic for ' +
'your team, please contact support.',
);
}
Expand Down Expand Up @@ -288,9 +292,11 @@ export class ApolloGateway implements GraphQLService {
);
}

if ('experimental_pollInterval' in this.config) {
if (isManagedConfig(this.config) && 'pollIntervalInMs' in this.config) {
this.logger.warn(
'The `experimental_pollInterval` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent `pollIntervalInMs` configuration option.',
'The `pollIntervalInMs` option is deprecated and will be removed in a future version of `@apollo/gateway`. ' +
'Please migrate to the equivalent `fallbackPollIntervalInMs` configuration option. ' +
'The poll interval is now defined by Uplink, this option will only be used if it is greater than the value defined by Uplink or as a fallback.',
);
}
}
Expand Down Expand Up @@ -404,7 +410,7 @@ export class ApolloGateway implements GraphQLService {
subgraphHealthCheck: this.config.serviceHealthCheck,
fetcher: this.fetcher,
logger: this.logger,
pollIntervalInMs: this.pollIntervalInMs ?? 10000,
fallbackPollIntervalInMs: this.pollIntervalInMs ?? 10000,
}),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe('loadSupergraphSdlFromStorage', () => {
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: null,
});

expect(result).toMatchObject({
Expand All @@ -88,6 +89,7 @@ describe('loadSupergraphSdlFromStorage', () => {
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: null,
}),
).rejects.toThrowError(
new UplinkFetcherError(
Expand Down Expand Up @@ -388,10 +390,49 @@ describe("loadSupergraphSdlFromUplinks", () => {
compositionId: "id-1234",
maxRetries: 5,
roundRobinSeed: 0,
earliestFetchTime: null,
});

expect(result).toBeNull();
expect(fetcher).toHaveBeenCalledTimes(1);
});

it("Waits the correct time before retrying", async () => {
const timeoutSpy = jest.spyOn(global, 'setTimeout');

mockSupergraphSdlRequest('originalId-1234', mockCloudConfigUrl1).reply(500);
mockSupergraphSdlRequestIfAfter('originalId-1234', mockCloudConfigUrl2).reply(
200,
JSON.stringify({
data: {
routerConfig: {
__typename: 'RouterConfigResult',
id: 'originalId-1234',
supergraphSdl: getTestingSupergraphSdl()
},
},
}),
);
const fetcher = getDefaultFetcher();

await loadSupergraphSdlFromUplinks({
graphRef,
apiKey,
endpoints: [mockCloudConfigUrl1, mockCloudConfigUrl2],
errorReportingEndpoint: undefined,
fetcher: fetcher,
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: new Date(Date.now() + 1000),
});

// test if setTimeout was called with a value in range to deal with time jitter
const setTimeoutCall = timeoutSpy.mock.calls[1][1];
expect(setTimeoutCall).toBeLessThanOrEqual(1000);
expect(setTimeoutCall).toBeGreaterThanOrEqual(900);

timeoutSpy.mockRestore();
});
});

55 changes: 37 additions & 18 deletions gateway-js/src/supergraphManagers/UplinkFetcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { SubgraphHealthCheckFunction, SupergraphSdlUpdateFunction } from '../..'
import { loadSupergraphSdlFromUplinks } from './loadSupergraphSdlFromStorage';

export interface UplinkFetcherOptions {
pollIntervalInMs: number;
fallbackPollIntervalInMs: number;
subgraphHealthCheck?: boolean;
graphRef: string;
apiKey: string;
Expand All @@ -31,6 +31,8 @@ export class UplinkFetcher implements SupergraphManager {
process.env.APOLLO_OUT_OF_BAND_REPORTER_ENDPOINT ?? undefined;
private compositionId?: string;
private fetchCount: number = 0;
private minDelayMs: number | null = null;
private earliestFetchTime: Date | null = null;

constructor(options: UplinkFetcherOptions) {
this.config = options;
Expand All @@ -46,7 +48,12 @@ export class UplinkFetcher implements SupergraphManager {

let initialSupergraphSdl: string | null = null;
try {
initialSupergraphSdl = await this.updateSupergraphSdl();
const result = await this.updateSupergraphSdl();
initialSupergraphSdl = result?.supergraphSdl || null;
if (result?.minDelaySeconds) {
this.minDelayMs = 1000 * result?.minDelaySeconds;
this.earliestFetchTime = new Date(Date.now() + this.minDelayMs);
}
} catch (e) {
this.logUpdateFailure(e);
throw e;
Expand Down Expand Up @@ -83,6 +90,7 @@ export class UplinkFetcher implements SupergraphManager {
compositionId: this.compositionId ?? null,
maxRetries: this.config.maxRetries,
roundRobinSeed: this.fetchCount++,
earliestFetchTime: this.earliestFetchTime,
});

if (!result) {
Expand All @@ -91,7 +99,8 @@ export class UplinkFetcher implements SupergraphManager {
this.compositionId = result.id;
// the healthCheck fn is only assigned if it's enabled in the config
await this.healthCheck?.(result.supergraphSdl);
return result.supergraphSdl;
const { supergraphSdl, minDelaySeconds } = result;
return { supergraphSdl, minDelaySeconds };
}
}

Expand All @@ -101,24 +110,34 @@ export class UplinkFetcher implements SupergraphManager {
}

private poll() {
this.timerRef = setTimeout(async () => {
if (this.state.phase === 'polling') {
const pollingPromise = resolvable();

this.state.pollingPromise = pollingPromise;
try {
const maybeNewSupergraphSdl = await this.updateSupergraphSdl();
if (maybeNewSupergraphSdl) {
this.update?.(maybeNewSupergraphSdl);
this.timerRef = setTimeout(
async () => {
if (this.state.phase === 'polling') {
const pollingPromise = resolvable();

this.state.pollingPromise = pollingPromise;
try {
const result = await this.updateSupergraphSdl();
const maybeNewSupergraphSdl = result?.supergraphSdl || null;
if (result?.minDelaySeconds) {
this.minDelayMs = 1000 * result?.minDelaySeconds;
this.earliestFetchTime = new Date(Date.now() + this.minDelayMs);
}
if (maybeNewSupergraphSdl) {
this.update?.(maybeNewSupergraphSdl);
}
} catch (e) {
this.logUpdateFailure(e);
}
} catch (e) {
this.logUpdateFailure(e);
pollingPromise.resolve();
}
pollingPromise.resolve();
}

this.poll();
}, this.config.pollIntervalInMs);
this.poll();
},
this.minDelayMs
? Math.max(this.minDelayMs, this.config.fallbackPollIntervalInMs)
: this.config.fallbackPollIntervalInMs,
);
}

private logUpdateFailure(e: any) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const SUPERGRAPH_SDL_QUERY = /* GraphQL */`#graphql
... on RouterConfigResult {
id
supergraphSdl: supergraphSDL
minDelaySeconds
}
... on FetchError {
code
Expand Down Expand Up @@ -56,6 +57,7 @@ export async function loadSupergraphSdlFromUplinks({
compositionId,
maxRetries,
roundRobinSeed,
earliestFetchTime,
}: {
graphRef: string;
apiKey: string;
Expand All @@ -65,6 +67,7 @@ export async function loadSupergraphSdlFromUplinks({
compositionId: string | null;
maxRetries: number,
roundRobinSeed: number,
earliestFetchTime: Date | null
}) : Promise<SupergraphSdlUpdate | null> {
// This Promise resolves with either an updated supergraph or null if no change.
// This Promise can reject in the case that none of the retries are successful,
Expand All @@ -81,6 +84,10 @@ export async function loadSupergraphSdlFromUplinks({
}),
{
retries: maxRetries,
onRetry: async () => {
const delayMS = earliestFetchTime ? earliestFetchTime.getTime() - Date.now(): 0;
if (delayMS > 0) await new Promise(resolve => setTimeout(resolve, delayMS));
}
},
);

Expand Down Expand Up @@ -176,9 +183,10 @@ export async function loadSupergraphSdlFromStorage({
const {
id,
supergraphSdl,
minDelaySeconds,
// messages,
} = routerConfig;
return { id, supergraphSdl: supergraphSdl! };
return { id, supergraphSdl: supergraphSdl!, minDelaySeconds };
} else if (routerConfig.__typename === 'FetchError') {
// FetchError case
const { code, message } = routerConfig;
Expand Down

0 comments on commit dac11e8

Please sign in to comment.