Skip to content

Commit

Permalink
feat(events): add event service
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Jungermann <Patrick.Jungermann@gmail.com>
  • Loading branch information
pjungermann committed Jan 23, 2024
1 parent 5bdb001 commit 4a211f3
Show file tree
Hide file tree
Showing 74 changed files with 1,051 additions and 512 deletions.
2 changes: 2 additions & 0 deletions packages/backend-dynamic-feature-service/api-report.md
Expand Up @@ -9,6 +9,7 @@ import { CatalogBuilder } from '@backstage/plugin-catalog-backend';
import { Config } from '@backstage/config';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsBackend } from '@backstage/plugin-events-backend';
import { EventService } from '@backstage/plugin-events-node';
import { FeatureDiscoveryService } from '@backstage/backend-plugin-api/alpha';
import { HttpPostIngressOptions } from '@backstage/plugin-events-node';
import { IdentityApi } from '@backstage/plugin-auth-node';
Expand Down Expand Up @@ -186,6 +187,7 @@ export type LegacyPluginEnvironment = {
scheduler: PluginTaskScheduler;
identity: IdentityApi;
eventBroker: EventBroker;
eventService: EventService;
pluginProvider: BackendPluginProvider;
};

Expand Down
5 changes: 5 additions & 0 deletions packages/backend-dynamic-feature-service/src/manager/types.ts
Expand Up @@ -29,6 +29,7 @@ import { IdentityApi } from '@backstage/plugin-auth-node';
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
import {
EventBroker,
EventService,
HttpPostIngressOptions,
} from '@backstage/plugin-events-node';

Expand Down Expand Up @@ -63,7 +64,11 @@ export type LegacyPluginEnvironment = {
permissions: PermissionEvaluator;
scheduler: PluginTaskScheduler;
identity: IdentityApi;
/**
* @deprecated use `eventService` instead
*/
eventBroker: EventBroker;
eventService: EventService;
pluginProvider: BackendPluginProvider;
};

Expand Down
8 changes: 7 additions & 1 deletion packages/backend/src/index.ts
Expand Up @@ -70,6 +70,7 @@ import { PluginEnvironment } from './types';
import { ServerPermissionClient } from '@backstage/plugin-permission-node';
import { DefaultIdentityClient } from '@backstage/plugin-auth-node';
import { DefaultEventBroker } from '@backstage/plugin-events-backend';
import { DefaultEventService } from '@backstage/plugin-events-node';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
import { MeterProvider } from '@opentelemetry/sdk-metrics';
import { metrics } from '@opentelemetry/api';
Expand Down Expand Up @@ -99,7 +100,11 @@ function makeCreateEnv(config: Config) {
discovery,
});

const eventBroker = new DefaultEventBroker(root.child({ type: 'plugin' }));
const eventService = DefaultEventService.create({ logger: root });
const eventBroker = new DefaultEventBroker(
root.child({ type: 'plugin' }),
eventService,
);
const signalService = DefaultSignalService.create({
eventBroker,
});
Expand All @@ -119,6 +124,7 @@ function makeCreateEnv(config: Config) {
config,
reader,
eventBroker,
eventService,
discovery,
tokenManager,
permissions,
Expand Down
34 changes: 16 additions & 18 deletions packages/backend/src/plugins/DemoEventBasedEntityProvider.ts
Expand Up @@ -18,42 +18,40 @@ import {
EntityProvider,
EntityProviderConnection,
} from '@backstage/plugin-catalog-node';
import {
EventBroker,
EventParams,
EventSubscriber,
} from '@backstage/plugin-events-node';
import { EventParams, EventService } from '@backstage/plugin-events-node';
import { Logger } from 'winston';

export class DemoEventBasedEntityProvider
implements EntityProvider, EventSubscriber
{
export class DemoEventBasedEntityProvider implements EntityProvider {
private readonly logger: Logger;
private readonly eventService: EventService;
private readonly topics: string[];

constructor(opts: {
eventBroker: EventBroker;
eventService: EventService;
logger: Logger;
topics: string[];
}) {
const { eventBroker, logger, topics } = opts;
this.logger = logger;
this.topics = topics;
eventBroker.subscribe(this);
this.eventService = opts.eventService;
this.logger = opts.logger;
this.topics = opts.topics;
}

async subscribe() {
await this.eventService.subscribe({
id: this.constructor.name,
topics: this.topics,
onEvent: this.onEvent.bind(this),
});
}

async onEvent(params: EventParams): Promise<void> {
private async onEvent(params: EventParams): Promise<void> {
this.logger.info(
`onEvent: topic=${params.topic}, metadata=${JSON.stringify(
params.metadata,
)}, payload=${JSON.stringify(params.eventPayload)}`,
);
}

supportsEventTopics(): string[] {
return this.topics;
}

async connect(_: EntityProviderConnection): Promise<void> {
// not doing anything here
}
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/plugins/catalog.ts
Expand Up @@ -28,10 +28,11 @@ export default async function createPlugin(
builder.addProcessor(new ScaffolderEntitiesProcessor());

const demoProvider = new DemoEventBasedEntityProvider({
eventService: env.eventService,
logger: env.logger,
topics: ['example'],
eventBroker: env.eventBroker,
});
await demoProvider.subscribe();
builder.addEntityProvider(demoProvider);

const { processingEngine, router } = await builder.build();
Expand Down
11 changes: 2 additions & 9 deletions packages/backend/src/plugins/events.ts
Expand Up @@ -14,10 +14,7 @@
* limitations under the License.
*/

import {
EventsBackend,
HttpPostIngressEventPublisher,
} from '@backstage/plugin-events-backend';
import { HttpPostIngressEventPublisher } from '@backstage/plugin-events-backend';
import { Router } from 'express';
import { PluginEnvironment } from '../types';

Expand All @@ -28,14 +25,10 @@ export default async function createPlugin(

const http = HttpPostIngressEventPublisher.fromConfig({
config: env.config,
eventService: env.eventService,
logger: env.logger,
});
http.bind(eventsRouter);

await new EventsBackend(env.logger)
.setEventBroker(env.eventBroker)
.addPublishers(http)
.start();

return eventsRouter;
}
6 changes: 5 additions & 1 deletion packages/backend/src/types.ts
Expand Up @@ -26,7 +26,7 @@ import {
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventBroker, EventService } from '@backstage/plugin-events-node';
import { DefaultSignalService } from '@backstage/plugin-signals-node';

export type PluginEnvironment = {
Expand All @@ -40,6 +40,10 @@ export type PluginEnvironment = {
permissions: PermissionEvaluator;
scheduler: PluginTaskScheduler;
identity: IdentityApi;
/**
* @deprecated use `eventService` instead
*/
eventBroker: EventBroker;
eventService: EventService;
signalService: DefaultSignalService;
};
12 changes: 6 additions & 6 deletions plugins/events-backend-module-aws-sqs/api-report.md
Expand Up @@ -4,20 +4,20 @@
```ts
import { Config } from '@backstage/config';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventPublisher } from '@backstage/plugin-events-node';
import { Logger } from 'winston';
import { EventService } from '@backstage/plugin-events-node';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginTaskScheduler } from '@backstage/backend-tasks';

// @public
export class AwsSqsConsumingEventPublisher implements EventPublisher {
export class AwsSqsConsumingEventPublisher {
// (undocumented)
static fromConfig(env: {
config: Config;
logger: Logger;
eventService: EventService;
logger: LoggerService;
scheduler: PluginTaskScheduler;
}): AwsSqsConsumingEventPublisher[];
// (undocumented)
setEventBroker(eventBroker: EventBroker): Promise<void>;
start(): Promise<void>;
}
```
Expand Up @@ -22,7 +22,7 @@ import {
import { getVoidLogger } from '@backstage/backend-common';
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { ConfigReader } from '@backstage/config';
import { TestEventBroker } from '@backstage/plugin-events-backend-test-utils';
import { TestEventService } from '@backstage/plugin-events-backend-test-utils';
import { mockClient } from 'aws-sdk-client-mock';
import { AwsSqsConsumingEventPublisher } from './AwsSqsConsumingEventPublisher';

Expand Down Expand Up @@ -53,12 +53,14 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
const scheduler = {
scheduleTask: jest.fn(),
} as unknown as PluginTaskScheduler;

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
Expand All @@ -85,21 +87,21 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
const scheduler = {
scheduleTask: jest.fn(),
} as unknown as PluginTaskScheduler;

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
expect(publishers.length).toEqual(1);

const publisher = publishers[0];

const eventBroker = new TestEventBroker();
await publisher.setEventBroker(eventBroker);
await publisher.start();

// publisher.connect(..) was causing the polling for events to be scheduled
expect(scheduler.scheduleTask).toHaveBeenCalledWith(
Expand Down Expand Up @@ -133,6 +135,7 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
let taskFn: (() => Promise<void>) | undefined = undefined;
const scheduler = {
scheduleTask: (spec: { fn: () => Promise<void> }) => {
Expand Down Expand Up @@ -196,32 +199,31 @@ describe('AwsSqsConsumingEventPublisher', () => {

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
expect(publishers.length).toEqual(1);
const publisher = publishers[0];

const eventBroker = new TestEventBroker();
await publisher.setEventBroker(eventBroker);
await publisher.start();

await taskFn!();
await taskFn!();
await taskFn!();

expect(eventBroker.published.length).toEqual(2);
expect(eventBroker.published[0].topic).toEqual('fake1');
expect(eventBroker.published[0].eventPayload).toEqual({
expect(eventService.published).toHaveLength(2);
expect(eventService.published[0].topic).toEqual('fake1');
expect(eventService.published[0].eventPayload).toEqual({
event: 'payload1',
});
expect(eventBroker.published[0].metadata).toEqual({
expect(eventService.published[0].metadata).toEqual({
'X-Custom-Attr': 'value',
});

expect(eventBroker.published[1].topic).toEqual('fake1');
expect(eventBroker.published[1].eventPayload).toEqual({
expect(eventService.published[1].topic).toEqual('fake1');
expect(eventService.published[1].eventPayload).toEqual({
event: 'payload2',
});
expect(eventBroker.published[1].metadata).toEqual({});
expect(eventService.published[1].metadata).toEqual({});
});
});
Expand Up @@ -21,10 +21,10 @@ import {
ReceiveMessageCommandInput,
SQSClient,
} from '@aws-sdk/client-sqs';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { Config } from '@backstage/config';
import { EventBroker, EventPublisher } from '@backstage/plugin-events-node';
import { Logger } from 'winston';
import { EventService } from '@backstage/plugin-events-node';
import { AwsSqsEventSourceConfig, readConfig } from './config';

/**
Expand All @@ -34,28 +34,34 @@ import { AwsSqsEventSourceConfig, readConfig } from './config';
* @public
*/
// TODO(pjungermann): add prom metrics? (see plugins/catalog-backend/src/util/metrics.ts, etc.)
export class AwsSqsConsumingEventPublisher implements EventPublisher {
export class AwsSqsConsumingEventPublisher {
private readonly topic: string;
private readonly receiveParams: ReceiveMessageCommandInput;
private readonly sqs: SQSClient;
private readonly queueUrl: string;
private readonly taskTimeoutSeconds: number;
private readonly waitTimeAfterEmptyReceiveMs;
private eventBroker?: EventBroker;

static fromConfig(env: {
config: Config;
logger: Logger;
eventService: EventService;
logger: LoggerService;
scheduler: PluginTaskScheduler;
}): AwsSqsConsumingEventPublisher[] {
return readConfig(env.config).map(
config =>
new AwsSqsConsumingEventPublisher(env.logger, env.scheduler, config),
new AwsSqsConsumingEventPublisher(
env.logger,
env.eventService,
env.scheduler,
config,
),
);
}

private constructor(
private readonly logger: Logger,
private readonly logger: LoggerService,
private readonly eventService: EventService,
private readonly scheduler: PluginTaskScheduler,
config: AwsSqsEventSourceConfig,
) {
Expand All @@ -80,12 +86,7 @@ export class AwsSqsConsumingEventPublisher implements EventPublisher {
config.waitTimeAfterEmptyReceive.as('milliseconds');
}

async setEventBroker(eventBroker: EventBroker): Promise<void> {
this.eventBroker = eventBroker;
return this.start();
}

private async start(): Promise<void> {
async start(): Promise<void> {
const id = `events.awsSqs.publisher:${this.topic}`;
const logger = this.logger.child({
class: AwsSqsConsumingEventPublisher.prototype.constructor.name,
Expand Down Expand Up @@ -172,7 +173,7 @@ export class AwsSqsConsumingEventPublisher implements EventPublisher {
}
});

this.eventBroker!.publish({
this.eventService.publish({
topic: this.topic,
eventPayload,
metadata,
Expand Down

0 comments on commit 4a211f3

Please sign in to comment.