Skip to content

Commit

Permalink
extract defaultChannel for topic into util
Browse files Browse the repository at this point in the history
  • Loading branch information
elf-pavlik committed Apr 15, 2024
1 parent e8c00f4 commit 2dd3166
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 23 deletions.
Expand Up @@ -12,7 +12,7 @@ export interface StreamingHttpEmitterInput {

/**
* Emits notifications on StreamingHTTPChannel2023 streams.
* Uses the StreamingHTTPs found in the provided map.
* Uses the response streams found in the provided map.
* The key should be the identifier of the topic resource.
*/
export class StreamingHttp2023Emitter extends AsyncHandler<StreamingHttpEmitterInput> {
Expand Down
@@ -0,0 +1,17 @@
import type { ResourceIdentifier } from '../../../http/representation/ResourceIdentifier';
import { NOTIFY } from '../../../util/Vocabularies';
import type { NotificationChannel } from '../NotificationChannel';

/**
* Default StreamingHTTPChanel2023 for a topic.
* Currently chennel description is only used internally and never sent to the client.
* The default channel uses Turtle.
*/
export function defaultChannel(topic: ResourceIdentifier): NotificationChannel {
return {
id: `${topic.path}.channel`,
type: NOTIFY.StreamingHTTPChannel2023,
topic: topic.path,
accept: 'text/turtle',
};
}
@@ -1,13 +1,12 @@
import { randomUUID } from 'node:crypto';
import type { RepresentationMetadata } from '../../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../../logging/LogUtil';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import { StaticHandler } from '../../../util/handlers/StaticHandler';
import { type AS, NOTIFY, type VocabularyTerm } from '../../../util/Vocabularies';
import type { AS, VocabularyTerm } from '../../../util/Vocabularies';
import type { ActivityEmitter } from '../ActivityEmitter';
import type { NotificationHandler } from '../NotificationHandler';
import type { NotificationChannel } from '../NotificationChannel';
import { defaultChannel } from './StreamingHttp2023Util';

/**
* Listens to an {@link ActivityEmitter} and calls the stored {@link NotificationHandler}s in case of an event
Expand Down Expand Up @@ -38,12 +37,7 @@ export class StreamingHttpListeningActivityHandler extends StaticHandler {
activity: VocabularyTerm<typeof AS>,
metadata: RepresentationMetadata,
): Promise<void> {
const channel: NotificationChannel = {
id: `urn:uuid:${randomUUID()}`,
type: NOTIFY.StreamingHTTPChannel2023,
topic: topic.path,
accept: 'text/turtle',
};
const channel = defaultChannel(topic);
try {
await this.source.handleSafe({ channel, activity, topic, metadata });
} catch (error) {
Expand Down
@@ -1,5 +1,4 @@
import { PassThrough } from 'node:stream';
import { randomUUID } from 'node:crypto';
import type { Credentials } from '../../../authentication/Credentials';
import type { CredentialsExtractor } from '../../../authentication/CredentialsExtractor';
import type { Authorizer } from '../../../authorization/Authorizer';
Expand All @@ -13,12 +12,11 @@ import type { OperationHttpHandlerInput } from '../../OperationHttpHandler';
import { OperationHttpHandler } from '../../OperationHttpHandler';
import { guardStream } from '../../../util/GuardedStream';
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
import type { NotificationChannel } from '../NotificationChannel';
import { NOTIFY } from '../../../util/Vocabularies';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import type { NotificationGenerator } from '../generate/NotificationGenerator';
import type { NotificationSerializer } from '../serialize/NotificationSerializer';
import type { StreamingHttpMap } from './StreamingHttpMap';
import { defaultChannel } from './StreamingHttp2023Util';

/**
* Handles request to Streaming HTTP receiveFrom endopints.
Expand Down Expand Up @@ -51,12 +49,7 @@ export class StreamingHttpRequestHandler extends OperationHttpHandler {
stream.on('error', (): boolean => this.streamMap.deleteEntry(topic, stream));
stream.on('close', (): boolean => this.streamMap.deleteEntry(topic, stream));

const channel: NotificationChannel = {
id: `urn:uuid:${randomUUID()}`,
type: NOTIFY.StreamingHTTPChannel2023,
topic,
accept: 'text/turtle',
};
const channel = defaultChannel({ path: topic });
// Send initial notification
try {
const notification = await this.generator.handle({ channel, topic: { path: topic }});
Expand All @@ -66,16 +59,13 @@ export class StreamingHttpRequestHandler extends OperationHttpHandler {
this.logger.error(`Problem emitting initial notification: ${createErrorMessage(error)}`);
}
// Pre-established channels use Turtle
const representation = new BasicRepresentation(topic, operation.target, 'text/turtle');
const representation = new BasicRepresentation(topic, operation.target, channel.accept);
return new OkResponseDescription(
representation.metadata,
stream,
);
}

/**
* TODO: consider removing duplication with {@link NotificationsSubscriber}
*/
private async authorize(credentials: Credentials, topic: string): Promise<void> {
const requestedModes = new IdentifierSetMultiMap<AccessMode>([[{ path: topic }, AccessMode.read ]]);
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ].join(',')}`);
Expand Down
@@ -0,0 +1,15 @@
import { defaultChannel } from '../../../../../src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Util';
import { NOTIFY } from '../../../../../src/util/Vocabularies';

describe('defaultChannel', (): void => {
it('returns description given topic.', (): void => {
const topic = { path: 'http://example.com/foo' };
const channel = defaultChannel(topic);
expect(channel).toEqual({
id: `${topic.path}.channel`,
type: NOTIFY.StreamingHTTPChannel2023,
topic: topic.path,
accept: 'text/turtle',
});
});
});

0 comments on commit 2dd3166

Please sign in to comment.