Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,109 @@ describe('Service E2E - inline upstream', () => {
});
});

afterEach(() => vi.useRealTimers());

it('Initialize cache', () =>
expect(dumpConfiguration(backend)).resolves.not.toThrow());

const serviceName = 'test';
it('Create service', async () => {
const events = DifferV3.diff(
const config = {
services: [
{
services: [
{
name: serviceName,
upstream: {
nodes: [
{
host: '127.0.0.1',
port: 9180,
weight: 100,
},
],
name: serviceName,
upstream: {
nodes: [
{
host: '127.0.0.1',
port: 9180,
weight: 100,
},
},
],
],
},
},
await dumpConfiguration(backend),
);
],
};
it('Create service', async () => {
vi.useFakeTimers();
vi.setSystemTime(100);
const events = DifferV3.diff(config, await dumpConfiguration(backend));
return syncEvents(backend, events);
});

it('Check configuration', () => {
expect(rawConfigCache.get(cacheKey)?.upstreams).not.toBeUndefined();
expect(rawConfigCache.get(cacheKey)!.upstreams).toHaveLength(1);
expect(rawConfigCache.get(cacheKey)!.upstreams![0].id).toEqual(
const rawConfig = rawConfigCache.get(cacheKey);
expect(rawConfig?.services?.[0].id).toEqual(
ADCSDK.utils.generateId(serviceName),
);
expect(rawConfigCache.get(cacheKey)!.upstreams![0].name).toEqual(
serviceName,
expect(rawConfig?.services?.[0].modifiedIndex).toEqual(100);
expect(rawConfig?.upstreams).not.toBeUndefined();
expect(rawConfig?.upstreams).toHaveLength(1);
expect(rawConfig?.upstreams?.[0].id).toEqual(
ADCSDK.utils.generateId(serviceName),
);
expect(rawConfig?.upstreams?.[0].name).toEqual(serviceName);
expect(rawConfig?.upstreams?.[0].modifiedIndex).toEqual(100);
expect(rawConfig?.services_conf_version).toEqual(100);
expect(rawConfig?.upstreams_conf_version).toEqual(100);
expect(rawConfig?.consumers_conf_version).toBeUndefined();
expect(rawConfig?.global_rules_conf_version).toBeUndefined();
expect(rawConfig?.plugin_metadata_conf_version).toBeUndefined();
expect(rawConfig?.routes_conf_version).toBeUndefined();
expect(rawConfig?.ssls_conf_version).toBeUndefined();
expect(rawConfig?.stream_routes_conf_version).toBeUndefined();

const config = configCache.get(cacheKey);
expect(config?.services).not.toBeUndefined();
expect(config?.services?.[0].upstream).not.toBeUndefined();
});

it('Update inlined upstream', async () => {
vi.useFakeTimers();
vi.setSystemTime(200);

const newConfig = structuredClone(config);
newConfig.services[0].upstream.nodes[0].port = 19080;

const events = DifferV3.diff(newConfig, await dumpConfiguration(backend));
expect(events).toHaveLength(1);
expect(events[0].type).toEqual(ADCSDK.EventType.UPDATE);
expect(events[0].resourceType).toEqual(ADCSDK.ResourceType.SERVICE);
expect(events[0].diff?.[0].path?.[0]).toEqual('upstream');

return syncEvents(backend, events);
});

it('Check configuration', () => {
const rawConfig = rawConfigCache.get(cacheKey);
expect(rawConfig?.upstreams?.[0].modifiedIndex).toEqual(200);
expect(rawConfig?.services?.[0].modifiedIndex).toEqual(100);
expect(rawConfig?.upstreams_conf_version).toEqual(200);
expect(rawConfig?.services_conf_version).toEqual(100);
expect(rawConfig?.consumers_conf_version).toBeUndefined();
expect(rawConfig?.global_rules_conf_version).toBeUndefined();
expect(rawConfig?.plugin_metadata_conf_version).toBeUndefined();
expect(rawConfig?.routes_conf_version).toBeUndefined();
expect(rawConfig?.ssls_conf_version).toBeUndefined();
expect(rawConfig?.stream_routes_conf_version).toBeUndefined();
});

it('Delete service', async () => {
vi.useFakeTimers();
vi.setSystemTime(300);

expect(configCache.get(cacheKey)?.services).not.toBeUndefined();
expect(
configCache.get(cacheKey)?.services![0].upstream,
).not.toBeUndefined();
const events = DifferV3.diff({}, await dumpConfiguration(backend));
expect(events).toHaveLength(1);
expect(events[0].type).toEqual(ADCSDK.EventType.DELETE);
expect(events[0].resourceType).toEqual(ADCSDK.ResourceType.SERVICE);

return syncEvents(backend, events);
});

it('Check configuration', () => {
const rawConfig = rawConfigCache.get(cacheKey);
expect(rawConfig?.upstreams).toHaveLength(0);
expect(rawConfig?.services).toHaveLength(0);
expect(rawConfig?.upstreams_conf_version).toEqual(300);
expect(rawConfig?.services_conf_version).toEqual(300);
});
});
228 changes: 121 additions & 107 deletions libs/backend-apisix-standalone/src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,113 +63,9 @@ export class Operator extends ADCSDK.backend.BackendEventSource {
const timestamp = Date.now();
return from(events).pipe(
// derive the latest configuration from the old config
tap((event) => {
const resourceType =
event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? ADCSDK.ResourceType.CONSUMER
: (event.resourceType as typing.UsedResourceTypes);
const resourceKey = typing.APISIXStandaloneKeyMap[resourceType];
const upstreamResourceKey =
typing.APISIXStandaloneKeyMap[ADCSDK.ResourceType.UPSTREAM];

if (event.type === ADCSDK.EventType.CREATE) {
if (!newConfig[resourceKey]) newConfig[resourceKey] = [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- infer error
newConfig[resourceKey].push(this.fromADC(event, timestamp) as any);
increaseVersion[resourceType] = true;

// Emit inline upstream
if (event.resourceType === ADCSDK.ResourceType.SERVICE) {
if (!newConfig[upstreamResourceKey])
newConfig[upstreamResourceKey] = [];
newConfig[upstreamResourceKey].push({
...this.fromADCUpstream(
(event.newValue as ADCSDK.Service).upstream!,
),
id: event.resourceId,
modifiedIndex: timestamp,
name: event.resourceName,
});
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
}
} else if (
event.type === ADCSDK.EventType.UPDATE ||
event.type === ADCSDK.EventType.DELETE
) {
if (!newConfig[resourceKey]) newConfig[resourceKey] = [];
const resources = newConfig[resourceKey];
const eventGeneratedId = this.generateIdFromEvent(event);
const index = resources?.findIndex((item) =>
'id' in item ? item.id : item.username === eventGeneratedId,
);

if (!isNil(index) && index !== -1) {
if (event.type === ADCSDK.EventType.UPDATE) {
resources[index] = this.fromADC(
event,
timestamp,
) as (typeof resources)[number];
} else {
resources.splice(index, 1);
}
increaseVersion[resourceType] = true;
}

// Emit inline upstream
if (event.resourceType === ADCSDK.ResourceType.SERVICE) {
if (event.type === ADCSDK.EventType.UPDATE) {
const baseUpstream = {
id: event.resourceId,
name: event.resourceName,
};
const newUpstream = (event.newValue as ADCSDK.Service)?.upstream;
const oldUpstream = (event.oldValue as ADCSDK.Service)?.upstream;
const events = DifferV3.diff(
{
...(newUpstream && {
upstreams: [Object.assign(baseUpstream, newUpstream)],
}),
},
{
...(oldUpstream && {
upstreams: [Object.assign(baseUpstream, oldUpstream)],
}),
},
);
if (events.length > 0) {
if (!newConfig[upstreamResourceKey])
newConfig[upstreamResourceKey] = [];
const resources = newConfig[upstreamResourceKey];
const index = resources?.findIndex(
(item) => item.id === eventGeneratedId,
);
if (!isNil(index) && index != -1) {
resources[index] = {
...this.fromADCUpstream(
(event.newValue as ADCSDK.Service).upstream!,
),
id: event.resourceId,
modifiedIndex: timestamp,
name: event.resourceName,
};
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
}
}
} else {
if (!newConfig[upstreamResourceKey])
newConfig[upstreamResourceKey] = [];
const resources = newConfig[upstreamResourceKey];
const index = resources?.findIndex(
(item) => item.id === eventGeneratedId,
);
if (!isNil(index) && index != -1) {
resources.splice(index, 1);
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
}
}
}
}
}),
tap((event) =>
this.applyEvent(newConfig, increaseVersion, timestamp, event),
),
// filtering of new consumer configurations to ensure
// that orphaned credential objects do not exist
tap(() => {
Expand Down Expand Up @@ -267,6 +163,124 @@ export class Operator extends ADCSDK.backend.BackendEventSource {
);
}

private applyEvent(
config: typing.APISIXStandalone,
increaseVersion: Partial<Record<typing.UsedResourceTypes, boolean>>,
timestamp: number,
event: ADCSDK.Event,
) {
const resourceType =
event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL
? ADCSDK.ResourceType.CONSUMER
: (event.resourceType as typing.UsedResourceTypes);
const resourceKey = typing.APISIXStandaloneKeyMap[resourceType];

if (event.resourceType === ADCSDK.ResourceType.SERVICE)
this.applyEventForServiceInlinedUpstream(
config,
increaseVersion,
timestamp,
event,
);

if (event.type === ADCSDK.EventType.CREATE) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- infer error
(config[resourceKey] ||= []).push(this.fromADC(event, timestamp) as any);
increaseVersion[resourceType] = true;
} else if (event.type === ADCSDK.EventType.UPDATE) {
// Only update the service when the service itself is modified, to avoid service
// conf version (modifiedIndex) changes caused by changes to its inline upstream.
if (
event.resourceType === ADCSDK.ResourceType.SERVICE &&
(event.diff || []).filter((item) => item.path?.[0] !== 'upstream')
.length <= 0
)
return;

config[resourceKey] ||= [];
const resources = config[resourceKey];
const index = resources.findIndex(
(item) =>
('id' in item ? item.id : item.username) ===
this.generateIdFromEvent(event),
);
if (index !== -1) {
resources[index] = this.fromADC(
event,
timestamp,
) as (typeof resources)[number];
increaseVersion[resourceType] = true;
}
} else {
// If the resource does not exist, there is no need to delete it.
if (!config[resourceKey]) return;

const resources = config[resourceKey];
const index = resources.findIndex(
(item) =>
('id' in item ? item.id : item.username) ===
this.generateIdFromEvent(event),
);
if (index !== -1) {
resources.splice(index, 1);
increaseVersion[resourceType] = true;
}
}
}

private applyEventForServiceInlinedUpstream(
config: typing.APISIXStandalone,
increaseVersion: Partial<Record<typing.UsedResourceTypes, boolean>>,
timestamp: number,
event: ADCSDK.Event,
) {
if (event.resourceType !== ADCSDK.ResourceType.SERVICE) return;

const upstreamResourceKey =
typing.APISIXStandaloneKeyMap[ADCSDK.ResourceType.UPSTREAM];

if (event.type === ADCSDK.EventType.CREATE) {
(config[upstreamResourceKey] ||= []).push({
...this.fromADCUpstream(
(event.newValue as ADCSDK.Service).upstream as ADCSDK.Upstream,
),
id: event.resourceId,
modifiedIndex: timestamp,
name: event.resourceName,
});
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
} else if (event.type === ADCSDK.EventType.UPDATE) {
if (
(event.diff || []).filter((item) => item.path?.[0] === 'upstream')
.length <= 0
)
return;

config[upstreamResourceKey] ||= [];
const resources = config[upstreamResourceKey];
const index = resources.findIndex((item) => item.id === event.resourceId);
if (index != -1) {
resources[index] = {
...this.fromADCUpstream(
(event.newValue as ADCSDK.Service).upstream as ADCSDK.Upstream,
),
id: event.resourceId,
modifiedIndex: timestamp,
name: event.resourceName,
};
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
}
} else if (event.type === ADCSDK.EventType.DELETE) {
config[upstreamResourceKey] ||= [];
const resources = config[upstreamResourceKey];
const index = resources.findIndex((item) => item.id === event.resourceId);
if (index != -1) {
resources.splice(index, 1);
increaseVersion[ADCSDK.ResourceType.UPSTREAM] = true;
}
}
}

private generateIdFromEvent(event: ADCSDK.Event): string {
if (event.resourceType === ADCSDK.ResourceType.CONSUMER_CREDENTIAL)
return `${event.parentId}/credentials/${event.resourceId}`;
Expand Down
Loading