Skip to content

Commit

Permalink
feat(events): add event service
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Jungermann <Patrick.Jungermann@gmail.com>
  • Loading branch information
pjungermann committed Jan 21, 2024
1 parent de15284 commit fad74bc
Show file tree
Hide file tree
Showing 61 changed files with 913 additions and 440 deletions.
12 changes: 6 additions & 6 deletions plugins/events-backend-module-aws-sqs/api-report.md
Expand Up @@ -4,20 +4,20 @@
```ts
import { Config } from '@backstage/config';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventPublisher } from '@backstage/plugin-events-node';
import { Logger } from 'winston';
import { EventService } from '@backstage/plugin-events-node';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginTaskScheduler } from '@backstage/backend-tasks';

// @public
export class AwsSqsConsumingEventPublisher implements EventPublisher {
export class AwsSqsConsumingEventPublisher {
// (undocumented)
static fromConfig(env: {
config: Config;
logger: Logger;
eventService: EventService;
logger: LoggerService;
scheduler: PluginTaskScheduler;
}): AwsSqsConsumingEventPublisher[];
// (undocumented)
setEventBroker(eventBroker: EventBroker): Promise<void>;
start(): Promise<void>;
}
```
Expand Up @@ -22,7 +22,7 @@ import {
import { getVoidLogger } from '@backstage/backend-common';
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { ConfigReader } from '@backstage/config';
import { TestEventBroker } from '@backstage/plugin-events-backend-test-utils';
import { TestEventService } from '@backstage/plugin-events-backend-test-utils';
import { mockClient } from 'aws-sdk-client-mock';
import { AwsSqsConsumingEventPublisher } from './AwsSqsConsumingEventPublisher';

Expand Down Expand Up @@ -53,12 +53,14 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
const scheduler = {
scheduleTask: jest.fn(),
} as unknown as PluginTaskScheduler;

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
Expand All @@ -85,21 +87,21 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
const scheduler = {
scheduleTask: jest.fn(),
} as unknown as PluginTaskScheduler;

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
expect(publishers.length).toEqual(1);

const publisher = publishers[0];

const eventBroker = new TestEventBroker();
await publisher.setEventBroker(eventBroker);
await publisher.start();

// publisher.connect(..) was causing the polling for events to be scheduled
expect(scheduler.scheduleTask).toHaveBeenCalledWith(
Expand Down Expand Up @@ -133,6 +135,7 @@ describe('AwsSqsConsumingEventPublisher', () => {
},
});
const logger = getVoidLogger();
const eventService = new TestEventService();
let taskFn: (() => Promise<void>) | undefined = undefined;
const scheduler = {
scheduleTask: (spec: { fn: () => Promise<void> }) => {
Expand Down Expand Up @@ -196,32 +199,31 @@ describe('AwsSqsConsumingEventPublisher', () => {

const publishers = AwsSqsConsumingEventPublisher.fromConfig({
config,
eventService,
logger,
scheduler,
});
expect(publishers.length).toEqual(1);
const publisher = publishers[0];

const eventBroker = new TestEventBroker();
await publisher.setEventBroker(eventBroker);
await publisher.start();

await taskFn!();
await taskFn!();
await taskFn!();

expect(eventBroker.published.length).toEqual(2);
expect(eventBroker.published[0].topic).toEqual('fake1');
expect(eventBroker.published[0].eventPayload).toEqual({
expect(eventService.published).toHaveLength(2);
expect(eventService.published[0].topic).toEqual('fake1');
expect(eventService.published[0].eventPayload).toEqual({
event: 'payload1',
});
expect(eventBroker.published[0].metadata).toEqual({
expect(eventService.published[0].metadata).toEqual({
'X-Custom-Attr': 'value',
});

expect(eventBroker.published[1].topic).toEqual('fake1');
expect(eventBroker.published[1].eventPayload).toEqual({
expect(eventService.published[1].topic).toEqual('fake1');
expect(eventService.published[1].eventPayload).toEqual({
event: 'payload2',
});
expect(eventBroker.published[1].metadata).toEqual({});
expect(eventService.published[1].metadata).toEqual({});
});
});
Expand Up @@ -21,10 +21,10 @@ import {
ReceiveMessageCommandInput,
SQSClient,
} from '@aws-sdk/client-sqs';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { Config } from '@backstage/config';
import { EventBroker, EventPublisher } from '@backstage/plugin-events-node';
import { Logger } from 'winston';
import { EventService } from '@backstage/plugin-events-node';
import { AwsSqsEventSourceConfig, readConfig } from './config';

/**
Expand All @@ -34,28 +34,34 @@ import { AwsSqsEventSourceConfig, readConfig } from './config';
* @public
*/
// TODO(pjungermann): add prom metrics? (see plugins/catalog-backend/src/util/metrics.ts, etc.)
export class AwsSqsConsumingEventPublisher implements EventPublisher {
export class AwsSqsConsumingEventPublisher {
private readonly topic: string;
private readonly receiveParams: ReceiveMessageCommandInput;
private readonly sqs: SQSClient;
private readonly queueUrl: string;
private readonly taskTimeoutSeconds: number;
private readonly waitTimeAfterEmptyReceiveMs;
private eventBroker?: EventBroker;

static fromConfig(env: {
config: Config;
logger: Logger;
eventService: EventService;
logger: LoggerService;
scheduler: PluginTaskScheduler;
}): AwsSqsConsumingEventPublisher[] {
return readConfig(env.config).map(
config =>
new AwsSqsConsumingEventPublisher(env.logger, env.scheduler, config),
new AwsSqsConsumingEventPublisher(
env.logger,
env.eventService,
env.scheduler,
config,
),
);
}

private constructor(
private readonly logger: Logger,
private readonly logger: LoggerService,
private readonly eventService: EventService,
private readonly scheduler: PluginTaskScheduler,
config: AwsSqsEventSourceConfig,
) {
Expand All @@ -80,12 +86,7 @@ export class AwsSqsConsumingEventPublisher implements EventPublisher {
config.waitTimeAfterEmptyReceive.as('milliseconds');
}

async setEventBroker(eventBroker: EventBroker): Promise<void> {
this.eventBroker = eventBroker;
return this.start();
}

private async start(): Promise<void> {
async start(): Promise<void> {
const id = `events.awsSqs.publisher:${this.topic}`;
const logger = this.logger.child({
class: AwsSqsConsumingEventPublisher.prototype.constructor.name,
Expand Down Expand Up @@ -172,7 +173,7 @@ export class AwsSqsConsumingEventPublisher implements EventPublisher {
}
});

this.eventBroker!.publish({
this.eventService.publish({
topic: this.topic,
eventPayload,
metadata,
Expand Down
Expand Up @@ -14,26 +14,28 @@
* limitations under the License.
*/

import { createServiceFactory } from '@backstage/backend-plugin-api';
import { mockServices, startTestBackend } from '@backstage/backend-test-utils';
import { eventsExtensionPoint } from '@backstage/plugin-events-node/alpha';
import { TestEventBroker } from '@backstage/plugin-events-backend-test-utils';
import { eventServiceRef } from '@backstage/plugin-events-node';
import { TestEventService } from '@backstage/plugin-events-backend-test-utils';
import { eventsModuleAwsSqsConsumingEventPublisher } from './eventsModuleAwsSqsConsumingEventPublisher';
import { AwsSqsConsumingEventPublisher } from '../publisher/AwsSqsConsumingEventPublisher';

describe('eventsModuleAwsSqsConsumingEventPublisher', () => {
it('should be correctly wired and set up', async () => {
let addedPublishers: AwsSqsConsumingEventPublisher[] | undefined;
const extensionPoint = {
addPublishers: (publishers: any) => {
addedPublishers = publishers;
const eventService = new TestEventService();
const eventServiceFactory = createServiceFactory({
service: eventServiceRef,
deps: {},
async factory({}) {
return eventService;
},
};
});

const scheduler = mockServices.scheduler.mock();

await startTestBackend({
extensionPoints: [[eventsExtensionPoint, extensionPoint]],
features: [
eventServiceFactory(),
eventsModuleAwsSqsConsumingEventPublisher(),
mockServices.rootConfig.factory({
data: {
Expand Down Expand Up @@ -65,14 +67,6 @@ describe('eventsModuleAwsSqsConsumingEventPublisher', () => {
],
});

expect(addedPublishers).not.toBeUndefined();
expect(addedPublishers!.length).toEqual(2);

const eventBroker = new TestEventBroker();
await Promise.all(
addedPublishers!.map(publisher => publisher.setEventBroker(eventBroker)),
);

// publisher.connect(..) was causing the polling for events to be scheduled
expect(scheduler.scheduleTask).toHaveBeenCalledWith(
expect.objectContaining({ id: 'events.awsSqs.publisher:fake1' }),
Expand Down
Expand Up @@ -18,8 +18,7 @@ import {
coreServices,
createBackendModule,
} from '@backstage/backend-plugin-api';
import { loggerToWinstonLogger } from '@backstage/backend-common';
import { eventsExtensionPoint } from '@backstage/plugin-events-node/alpha';
import { eventServiceRef } from '@backstage/plugin-events-node';
import { AwsSqsConsumingEventPublisher } from '../publisher/AwsSqsConsumingEventPublisher';

/**
Expand All @@ -34,19 +33,19 @@ export const eventsModuleAwsSqsConsumingEventPublisher = createBackendModule({
env.registerInit({
deps: {
config: coreServices.rootConfig,
events: eventsExtensionPoint,
eventService: eventServiceRef,
logger: coreServices.logger,
scheduler: coreServices.scheduler,
},
async init({ config, events, logger, scheduler }) {
const winstonLogger = loggerToWinstonLogger(logger);
async init({ config, eventService, logger, scheduler }) {
const sqs = AwsSqsConsumingEventPublisher.fromConfig({
config: config,
logger: winstonLogger,
eventService,
logger,
scheduler: scheduler,
});

events.addPublishers(sqs);
await Promise.all(sqs.map(publisher => publisher.start()));
},
});
},
Expand Down
3 changes: 2 additions & 1 deletion plugins/events-backend-module-azure/api-report.md
Expand Up @@ -4,11 +4,12 @@
```ts
import { EventParams } from '@backstage/plugin-events-node';
import { EventService } from '@backstage/plugin-events-node';
import { SubTopicEventRouter } from '@backstage/plugin-events-node';

// @public
export class AzureDevOpsEventRouter extends SubTopicEventRouter {
constructor();
constructor(options: { eventService: EventService });
// (undocumented)
protected determineSubTopic(params: EventParams): string | undefined;
}
Expand Down
Expand Up @@ -14,37 +14,44 @@
* limitations under the License.
*/

import { TestEventBroker } from '@backstage/plugin-events-backend-test-utils';
import { TestEventService } from '@backstage/plugin-events-backend-test-utils';
import { AzureDevOpsEventRouter } from './AzureDevOpsEventRouter';

describe('AzureDevOpsEventRouter', () => {
const eventRouter = new AzureDevOpsEventRouter();
const eventService = new TestEventService();
const eventRouter = new AzureDevOpsEventRouter({ eventService });
const topic = 'azureDevOps';
const eventPayload = { eventType: 'test.type', test: 'payload' };
const metadata = {};

it('no $.eventType', () => {
const eventBroker = new TestEventBroker();
eventRouter.setEventBroker(eventBroker);
beforeEach(() => {
eventService.reset();
});

it('subscribed to topic', () => {
eventRouter.subscribe();

expect(eventService.subscribed).toHaveLength(1);
expect(eventService.subscribed[0].id).toEqual('AzureDevOpsEventRouter');
expect(eventService.subscribed[0].topics).toEqual([topic]);
});

it('no $.eventType', () => {
eventRouter.onEvent({
topic,
eventPayload: { invalid: 'payload' },
metadata,
});

expect(eventBroker.published).toEqual([]);
expect(eventService.published).toEqual([]);
});

it('with $.eventType', () => {
const eventBroker = new TestEventBroker();
eventRouter.setEventBroker(eventBroker);

eventRouter.onEvent({ topic, eventPayload, metadata });

expect(eventBroker.published.length).toBe(1);
expect(eventBroker.published[0].topic).toEqual('azureDevOps.test.type');
expect(eventBroker.published[0].eventPayload).toEqual(eventPayload);
expect(eventBroker.published[0].metadata).toEqual(metadata);
expect(eventService.published).toHaveLength(1);
expect(eventService.published[0].topic).toEqual('azureDevOps.test.type');
expect(eventService.published[0].eventPayload).toEqual(eventPayload);
expect(eventService.published[0].metadata).toEqual(metadata);
});
});
Expand Up @@ -16,6 +16,7 @@

import {
EventParams,
EventService,
SubTopicEventRouter,
} from '@backstage/plugin-events-node';

Expand All @@ -27,8 +28,11 @@ import {
* @public
*/
export class AzureDevOpsEventRouter extends SubTopicEventRouter {
constructor() {
super('azureDevOps');
constructor(options: { eventService: EventService }) {
super({
eventService: options.eventService,
topic: 'azureDevOps',
});
}

protected determineSubTopic(params: EventParams): string | undefined {
Expand Down

0 comments on commit fad74bc

Please sign in to comment.