From cc3c88502ac4cd567d1c1e265698023e4dbb652e Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Mon, 25 Aug 2025 04:25:00 +0800 Subject: [PATCH] fix(apisix-standalone): handle inline upstream update and delete --- .../service-inline-upstream.e2e-spec.ts | 115 +++++++-- .../backend-apisix-standalone/src/operator.ts | 228 ++++++++++-------- 2 files changed, 210 insertions(+), 133 deletions(-) diff --git a/libs/backend-apisix-standalone/e2e/resources/service-inline-upstream.e2e-spec.ts b/libs/backend-apisix-standalone/e2e/resources/service-inline-upstream.e2e-spec.ts index 7cbe6d3..3074505 100644 --- a/libs/backend-apisix-standalone/e2e/resources/service-inline-upstream.e2e-spec.ts +++ b/libs/backend-apisix-standalone/e2e/resources/service-inline-upstream.e2e-spec.ts @@ -22,46 +22,109 @@ describe('Service E2E - inline upstream', () => { }); }); + afterEach(() => vi.useRealTimers()); + it('Initialize cache', () => expect(dumpConfiguration(backend)).resolves.not.toThrow()); const serviceName = 'test'; - it('Create service', async () => { - const events = DifferV3.diff( + const config = { + services: [ { - services: [ - { - name: serviceName, - upstream: { - nodes: [ - { - host: '127.0.0.1', - port: 9180, - weight: 100, - }, - ], + name: serviceName, + upstream: { + nodes: [ + { + host: '127.0.0.1', + port: 9180, + weight: 100, }, - }, - ], + ], + }, }, - await dumpConfiguration(backend), - ); + ], + }; + it('Create service', async () => { + vi.useFakeTimers(); + vi.setSystemTime(100); + const events = DifferV3.diff(config, await dumpConfiguration(backend)); return syncEvents(backend, events); }); it('Check configuration', () => { - expect(rawConfigCache.get(cacheKey)?.upstreams).not.toBeUndefined(); - expect(rawConfigCache.get(cacheKey)!.upstreams).toHaveLength(1); - expect(rawConfigCache.get(cacheKey)!.upstreams![0].id).toEqual( + const rawConfig = rawConfigCache.get(cacheKey); + expect(rawConfig?.services?.[0].id).toEqual( ADCSDK.utils.generateId(serviceName), ); - expect(rawConfigCache.get(cacheKey)!.upstreams![0].name).toEqual( - serviceName, + expect(rawConfig?.services?.[0].modifiedIndex).toEqual(100); + expect(rawConfig?.upstreams).not.toBeUndefined(); + expect(rawConfig?.upstreams).toHaveLength(1); + expect(rawConfig?.upstreams?.[0].id).toEqual( + ADCSDK.utils.generateId(serviceName), ); + expect(rawConfig?.upstreams?.[0].name).toEqual(serviceName); + expect(rawConfig?.upstreams?.[0].modifiedIndex).toEqual(100); + expect(rawConfig?.services_conf_version).toEqual(100); + expect(rawConfig?.upstreams_conf_version).toEqual(100); + expect(rawConfig?.consumers_conf_version).toBeUndefined(); + expect(rawConfig?.global_rules_conf_version).toBeUndefined(); + expect(rawConfig?.plugin_metadata_conf_version).toBeUndefined(); + expect(rawConfig?.routes_conf_version).toBeUndefined(); + expect(rawConfig?.ssls_conf_version).toBeUndefined(); + expect(rawConfig?.stream_routes_conf_version).toBeUndefined(); + + const config = configCache.get(cacheKey); + expect(config?.services).not.toBeUndefined(); + expect(config?.services?.[0].upstream).not.toBeUndefined(); + }); + + it('Update inlined upstream', async () => { + vi.useFakeTimers(); + vi.setSystemTime(200); + + const newConfig = structuredClone(config); + newConfig.services[0].upstream.nodes[0].port = 19080; + + const events = DifferV3.diff(newConfig, await dumpConfiguration(backend)); + expect(events).toHaveLength(1); + expect(events[0].type).toEqual(ADCSDK.EventType.UPDATE); + expect(events[0].resourceType).toEqual(ADCSDK.ResourceType.SERVICE); + expect(events[0].diff?.[0].path?.[0]).toEqual('upstream'); + + return syncEvents(backend, events); + }); + + it('Check configuration', () => { + const rawConfig = rawConfigCache.get(cacheKey); + expect(rawConfig?.upstreams?.[0].modifiedIndex).toEqual(200); + expect(rawConfig?.services?.[0].modifiedIndex).toEqual(100); + expect(rawConfig?.upstreams_conf_version).toEqual(200); + expect(rawConfig?.services_conf_version).toEqual(100); + expect(rawConfig?.consumers_conf_version).toBeUndefined(); + expect(rawConfig?.global_rules_conf_version).toBeUndefined(); + expect(rawConfig?.plugin_metadata_conf_version).toBeUndefined(); + expect(rawConfig?.routes_conf_version).toBeUndefined(); + expect(rawConfig?.ssls_conf_version).toBeUndefined(); + expect(rawConfig?.stream_routes_conf_version).toBeUndefined(); + }); + + it('Delete service', async () => { + vi.useFakeTimers(); + vi.setSystemTime(300); - expect(configCache.get(cacheKey)?.services).not.toBeUndefined(); - expect( - configCache.get(cacheKey)?.services![0].upstream, - ).not.toBeUndefined(); + const events = DifferV3.diff({}, await dumpConfiguration(backend)); + expect(events).toHaveLength(1); + expect(events[0].type).toEqual(ADCSDK.EventType.DELETE); + expect(events[0].resourceType).toEqual(ADCSDK.ResourceType.SERVICE); + + return syncEvents(backend, events); + }); + + it('Check configuration', () => { + const rawConfig = rawConfigCache.get(cacheKey); + expect(rawConfig?.upstreams).toHaveLength(0); + expect(rawConfig?.services).toHaveLength(0); + expect(rawConfig?.upstreams_conf_version).toEqual(300); + expect(rawConfig?.services_conf_version).toEqual(300); }); }); diff --git a/libs/backend-apisix-standalone/src/operator.ts b/libs/backend-apisix-standalone/src/operator.ts index aa814c5..4b36014 100644 --- a/libs/backend-apisix-standalone/src/operator.ts +++ b/libs/backend-apisix-standalone/src/operator.ts @@ -63,113 +63,9 @@ export class Operator extends ADCSDK.backend.BackendEventSource { const timestamp = Date.now(); return from(events).pipe( // derive the latest configuration from the old config - tap((event) => { - const resourceType = - event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL - ? ADCSDK.ResourceType.CONSUMER - : (event.resourceType as typing.UsedResourceTypes); - const resourceKey = typing.APISIXStandaloneKeyMap[resourceType]; - const upstreamResourceKey = - typing.APISIXStandaloneKeyMap[ADCSDK.ResourceType.UPSTREAM]; - - if (event.type === ADCSDK.EventType.CREATE) { - if (!newConfig[resourceKey]) newConfig[resourceKey] = []; - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- infer error - newConfig[resourceKey].push(this.fromADC(event, timestamp) as any); - increaseVersion[resourceType] = true; - - // Emit inline upstream - if (event.resourceType === ADCSDK.ResourceType.SERVICE) { - if (!newConfig[upstreamResourceKey]) - newConfig[upstreamResourceKey] = []; - newConfig[upstreamResourceKey].push({ - ...this.fromADCUpstream( - (event.newValue as ADCSDK.Service).upstream!, - ), - id: event.resourceId, - modifiedIndex: timestamp, - name: event.resourceName, - }); - increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; - } - } else if ( - event.type === ADCSDK.EventType.UPDATE || - event.type === ADCSDK.EventType.DELETE - ) { - if (!newConfig[resourceKey]) newConfig[resourceKey] = []; - const resources = newConfig[resourceKey]; - const eventGeneratedId = this.generateIdFromEvent(event); - const index = resources?.findIndex((item) => - 'id' in item ? item.id : item.username === eventGeneratedId, - ); - - if (!isNil(index) && index !== -1) { - if (event.type === ADCSDK.EventType.UPDATE) { - resources[index] = this.fromADC( - event, - timestamp, - ) as (typeof resources)[number]; - } else { - resources.splice(index, 1); - } - increaseVersion[resourceType] = true; - } - - // Emit inline upstream - if (event.resourceType === ADCSDK.ResourceType.SERVICE) { - if (event.type === ADCSDK.EventType.UPDATE) { - const baseUpstream = { - id: event.resourceId, - name: event.resourceName, - }; - const newUpstream = (event.newValue as ADCSDK.Service)?.upstream; - const oldUpstream = (event.oldValue as ADCSDK.Service)?.upstream; - const events = DifferV3.diff( - { - ...(newUpstream && { - upstreams: [Object.assign(baseUpstream, newUpstream)], - }), - }, - { - ...(oldUpstream && { - upstreams: [Object.assign(baseUpstream, oldUpstream)], - }), - }, - ); - if (events.length > 0) { - if (!newConfig[upstreamResourceKey]) - newConfig[upstreamResourceKey] = []; - const resources = newConfig[upstreamResourceKey]; - const index = resources?.findIndex( - (item) => item.id === eventGeneratedId, - ); - if (!isNil(index) && index != -1) { - resources[index] = { - ...this.fromADCUpstream( - (event.newValue as ADCSDK.Service).upstream!, - ), - id: event.resourceId, - modifiedIndex: timestamp, - name: event.resourceName, - }; - increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; - } - } - } else { - if (!newConfig[upstreamResourceKey]) - newConfig[upstreamResourceKey] = []; - const resources = newConfig[upstreamResourceKey]; - const index = resources?.findIndex( - (item) => item.id === eventGeneratedId, - ); - if (!isNil(index) && index != -1) { - resources.splice(index, 1); - increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; - } - } - } - } - }), + tap((event) => + this.applyEvent(newConfig, increaseVersion, timestamp, event), + ), // filtering of new consumer configurations to ensure // that orphaned credential objects do not exist tap(() => { @@ -267,6 +163,124 @@ export class Operator extends ADCSDK.backend.BackendEventSource { ); } + private applyEvent( + config: typing.APISIXStandalone, + increaseVersion: Partial>, + timestamp: number, + event: ADCSDK.Event, + ) { + const resourceType = + event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL + ? ADCSDK.ResourceType.CONSUMER + : (event.resourceType as typing.UsedResourceTypes); + const resourceKey = typing.APISIXStandaloneKeyMap[resourceType]; + + if (event.resourceType === ADCSDK.ResourceType.SERVICE) + this.applyEventForServiceInlinedUpstream( + config, + increaseVersion, + timestamp, + event, + ); + + if (event.type === ADCSDK.EventType.CREATE) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- infer error + (config[resourceKey] ||= []).push(this.fromADC(event, timestamp) as any); + increaseVersion[resourceType] = true; + } else if (event.type === ADCSDK.EventType.UPDATE) { + // Only update the service when the service itself is modified, to avoid service + // conf version (modifiedIndex) changes caused by changes to its inline upstream. + if ( + event.resourceType === ADCSDK.ResourceType.SERVICE && + (event.diff || []).filter((item) => item.path?.[0] !== 'upstream') + .length <= 0 + ) + return; + + config[resourceKey] ||= []; + const resources = config[resourceKey]; + const index = resources.findIndex( + (item) => + ('id' in item ? item.id : item.username) === + this.generateIdFromEvent(event), + ); + if (index !== -1) { + resources[index] = this.fromADC( + event, + timestamp, + ) as (typeof resources)[number]; + increaseVersion[resourceType] = true; + } + } else { + // If the resource does not exist, there is no need to delete it. + if (!config[resourceKey]) return; + + const resources = config[resourceKey]; + const index = resources.findIndex( + (item) => + ('id' in item ? item.id : item.username) === + this.generateIdFromEvent(event), + ); + if (index !== -1) { + resources.splice(index, 1); + increaseVersion[resourceType] = true; + } + } + } + + private applyEventForServiceInlinedUpstream( + config: typing.APISIXStandalone, + increaseVersion: Partial>, + timestamp: number, + event: ADCSDK.Event, + ) { + if (event.resourceType !== ADCSDK.ResourceType.SERVICE) return; + + const upstreamResourceKey = + typing.APISIXStandaloneKeyMap[ADCSDK.ResourceType.UPSTREAM]; + + if (event.type === ADCSDK.EventType.CREATE) { + (config[upstreamResourceKey] ||= []).push({ + ...this.fromADCUpstream( + (event.newValue as ADCSDK.Service).upstream as ADCSDK.Upstream, + ), + id: event.resourceId, + modifiedIndex: timestamp, + name: event.resourceName, + }); + increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; + } else if (event.type === ADCSDK.EventType.UPDATE) { + if ( + (event.diff || []).filter((item) => item.path?.[0] === 'upstream') + .length <= 0 + ) + return; + + config[upstreamResourceKey] ||= []; + const resources = config[upstreamResourceKey]; + const index = resources.findIndex((item) => item.id === event.resourceId); + if (index != -1) { + resources[index] = { + ...this.fromADCUpstream( + (event.newValue as ADCSDK.Service).upstream as ADCSDK.Upstream, + ), + id: event.resourceId, + modifiedIndex: timestamp, + name: event.resourceName, + }; + increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; + } + } else if (event.type === ADCSDK.EventType.DELETE) { + config[upstreamResourceKey] ||= []; + const resources = config[upstreamResourceKey]; + const index = resources.findIndex((item) => item.id === event.resourceId); + if (index != -1) { + resources.splice(index, 1); + increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true; + } + } + } + private generateIdFromEvent(event: ADCSDK.Event): string { if (event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL) return `${event.parentId}/credentials/${event.resourceId}`;