Skip to content

Commit

Permalink
fix linting erros
Browse files Browse the repository at this point in the history
  • Loading branch information
elf-pavlik committed Apr 13, 2024
1 parent 7df8200 commit 5dc8afa
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 218 deletions.
38 changes: 19 additions & 19 deletions config/http/notifications/streaming-http/http.json
Expand Up @@ -7,9 +7,9 @@
"valueRaw": ".notifications/StreamingHTTPChannel2023/"
},
{
"comment": "Creates updatesViaStreamingHTTP2023 Link relations",
"@id": "urn:solid-server:default:StreamingHTTPMetadataWriter",
"@type": "StreamingHTTPMetadataWriter",
"comment": "Creates updatesViaStreamingHttp2023 Link relations",
"@id": "urn:solid-server:default:StreamingHttpMetadataWriter",
"@type": "StreamingHttpMetadataWriter",
"baseUrl": { "@id": "urn:solid-server:default:variable:baseUrl" },
"pathPrefix": { "@id": "urn:solid-server:default:variable:streamingHTTPReceiveFromPrefix" }
},
Expand All @@ -18,20 +18,20 @@
"@id": "urn:solid-server:default:MetadataWriter",
"@type": "ParallelHandler",
"handlers": [
{ "@id": "urn:solid-server:default:StreamingHTTPMetadataWriter" }
{ "@id": "urn:solid-server:default:StreamingHttpMetadataWriter" }
]
},
{
"comment": "Handles the request targeting a StreamingHTTPChannel2023 receiveFrom endpoint.",
"@id": "urn:solid-server:default:StreamingHTTP2023Router",
"@id": "urn:solid-server:default:StreamingHttp2023Router",
"@type": "OperationRouterHandler",
"baseUrl": { "@id": "urn:solid-server:default:variable:baseUrl" },
"allowedMethods": [ "GET" ],
"allowedPathNames": [ "/StreamingHTTPChannel2023/" ],
"handler": {
"@id": "urn:solid-server:default:StreamingHTTP2023RequestHandler",
"@type": "StreamingHTTPRequestHandler",
"streamMap": { "@id": "urn:solid-server:default:StreamingHTTPMap" },
"@id": "urn:solid-server:default:StreamingHttp2023RequestHandler",
"@type": "StreamingHttpRequestHandler",
"streamMap": { "@id": "urn:solid-server:default:StreamingHttpMap" },
"pathPrefix": { "@id": "urn:solid-server:default:variable:streamingHTTPReceiveFromPrefix" },
"generator": { "@id": "urn:solid-server:default:BaseNotificationGenerator" },
"serializer": { "@id": "urn:solid-server:default:BaseNotificationSerializer" },
Expand All @@ -45,31 +45,31 @@
"@id": "urn:solid-server:default:NotificationTypeHandler",
"@type": "WaterfallHandler",
"handlers": [
{ "@id": "urn:solid-server:default:StreamingHTTP2023Router" }
{ "@id": "urn:solid-server:default:StreamingHttp2023Router" }
]
},
{
"comment": "Opened response streams will be stored in this Map.",
"@id": "urn:solid-server:default:StreamingHTTPMap",
"@type": "StreamingHTTPMap"
"@id": "urn:solid-server:default:StreamingHttpMap",
"@type": "StreamingHttpMap"
},
{
"comment": "Emits serialized notifications through StreamingHTTP.",
"@id": "urn:solid-server:default:StreamingHTTP2023Emitter",
"@type": "StreamingHTTP2023Emitter",
"streamMap": { "@id": "urn:solid-server:default:StreamingHTTPMap" }
"comment": "Emits serialized notifications through Streaming HTTP.",
"@id": "urn:solid-server:default:StreamingHttp2023Emitter",
"@type": "StreamingHttp2023Emitter",
"streamMap": { "@id": "urn:solid-server:default:StreamingHttpMap" }
},
{
"comment": "Listens to the activities emitted by the MonitoringStore.",
"@id": "urn:solid-server:default:StreamingHTTPListeningActivityHandler",
"@type": "StreamingHTTPListeningActivityHandler",
"@id": "urn:solid-server:default:StreamingHttpListeningActivityHandler",
"@type": "StreamingHttpListeningActivityHandler",
"emitter": { "@id": "urn:solid-server:default:ResourceStore" },
"source": {
"comment": "Handles the generation and serialization of notifications for StreamingHTTPChannel2023",
"@type": "ComposedNotificationHandler",
"generator": { "@id": "urn:solid-server:default:BaseNotificationGenerator" },
"serializer": { "@id": "urn:solid-server:default:BaseNotificationSerializer" },
"emitter": { "@id": "urn:solid-server:default:StreamingHTTP2023Emitter" },
"emitter": { "@id": "urn:solid-server:default:StreamingHttp2023Emitter" },
"eTagHandler": { "@id": "urn:solid-server:default:ETagHandler" }
}
},
Expand All @@ -78,7 +78,7 @@
"@id": "urn:solid-server:default:PrimaryParallelInitializer",
"@type": "ParallelHandler",
"handlers": [
{ "@id": "urn:solid-server:default:StreamingHTTPListeningActivityHandler" }
{ "@id": "urn:solid-server:default:StreamingHttpListeningActivityHandler" }
]
}
]
Expand Down
10 changes: 5 additions & 5 deletions src/index.ts
Expand Up @@ -404,11 +404,11 @@ export * from './server/notifications/WebSocketChannel2023/WebSocketMap';
export * from './server/notifications/WebSocketChannel2023/WebSocketChannel2023Type';

// Server/Notifications/StreamingHTTPChannel2023
export * from './server/notifications/StreamingHTTPChannel2023/StreamingHTTPRequestHandler';
export * from './server/notifications/StreamingHTTPChannel2023/StreamingHTTPMap';
export * from './server/notifications/StreamingHTTPChannel2023/StreamingHTTPListeningActivityHandler';
export * from './server/notifications/StreamingHTTPChannel2023/StreamingHTTP2023Emitter';
export * from './server/notifications/StreamingHTTPChannel2023/StreamingHTTPMetadataWriter';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpMap';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpListeningActivityHandler';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpMetadataWriter';

// Server/Notifications
export * from './server/notifications/ActivityEmitter';
Expand Down
@@ -1,11 +1,11 @@
import { getLoggerFor } from '../../../logging/LogUtil';
import { StreamingHTTPMap } from './StreamingHTTPMap';

import type { Representation } from '../../../http/representation/Representation';
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import { NotificationChannel } from '../NotificationChannel';
import type { NotificationChannel } from '../NotificationChannel';
import type { StreamingHttpMap } from './StreamingHttpMap';

export interface StreamingHTTPEmitterInput {
export interface StreamingHttpEmitterInput {
channel: NotificationChannel;
representation: Representation;
}
Expand All @@ -15,21 +15,21 @@ export interface StreamingHTTPEmitterInput {
* Uses the StreamingHTTPs found in the provided map.
* The key should be the identifier of the topic resource.
*/
export class StreamingHTTP2023Emitter extends AsyncHandler<StreamingHTTPEmitterInput> {
export class StreamingHttp2023Emitter extends AsyncHandler<StreamingHttpEmitterInput> {
protected readonly logger = getLoggerFor(this);

constructor(
private readonly streamMap: StreamingHTTPMap
public constructor(
private readonly streamMap: StreamingHttpMap,
) {
super()
super();
}

public async handle({ channel, representation }: StreamingHTTPEmitterInput): Promise<void> {
public async handle({ channel, representation }: StreamingHttpEmitterInput): Promise<void> {
// Called as a NotificationEmitter: emit the notification
const streams = this.streamMap.get(channel.topic);
if (streams) {
for (const stream of streams) {
representation.data.pipe(stream, { end: false })
representation.data.pipe(stream, { end: false });
}
} else {
representation.data.destroy();
Expand Down
Expand Up @@ -4,10 +4,10 @@ import type { ResourceIdentifier } from '../../../http/representation/ResourceId
import { getLoggerFor } from '../../../logging/LogUtil';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import { StaticHandler } from '../../../util/handlers/StaticHandler';
import { NOTIFY, type AS, type VocabularyTerm } from '../../../util/Vocabularies';
import type { ActivityEmitter } from '.././ActivityEmitter';
import type { NotificationHandler } from '.././NotificationHandler';
import { NotificationChannel } from '../NotificationChannel';
import { type AS, NOTIFY, type VocabularyTerm } from '../../../util/Vocabularies';
import type { ActivityEmitter } from '../ActivityEmitter';
import type { NotificationHandler } from '../NotificationHandler';
import type { NotificationChannel } from '../NotificationChannel';

/**
* Listens to an {@link ActivityEmitter} and calls the stored {@link NotificationHandler}s in case of an event
Expand All @@ -17,12 +17,12 @@ import { NotificationChannel } from '../NotificationChannel';
* No class takes this one as input, so to make sure Components.js instantiates it,
* it needs to be added somewhere where its presence has no impact, such as the list of initializers.
*/
export class StreamingHTTPListeningActivityHandler extends StaticHandler {
export class StreamingHttpListeningActivityHandler extends StaticHandler {
protected readonly logger = getLoggerFor(this);

public constructor(
emitter: ActivityEmitter,
private readonly source: NotificationHandler
private readonly source: NotificationHandler,
) {
super();

Expand All @@ -39,16 +39,14 @@ export class StreamingHTTPListeningActivityHandler extends StaticHandler {
metadata: RepresentationMetadata,
): Promise<void> {
const channel: NotificationChannel = {
// TODO decide what IRI should denote a pre-established channel
id: `urn:uuid:${randomUUID()}`,
type: NOTIFY.StreamingHTTPChannel2023,
topic: topic.path,
accept: 'text/turtle'
}
accept: 'text/turtle',
};
try {
await this.source.handleSafe({ channel, activity, topic, metadata })
await this.source.handleSafe({ channel, activity, topic, metadata });
} catch (error) {
// TODO: do we need to catch if only one channel per topic?
this.logger.error(`Error trying to handle notification for ${topic.path}: ${createErrorMessage(error)}`);
}
}
Expand Down
@@ -1,10 +1,10 @@
import type { PassThrough } from 'stream';
import type { PassThrough } from 'node:stream';
import type { SingleThreaded } from '../../../init/cluster/SingleThreaded';
import { WrappedSetMultiMap } from '../../../util/map/WrappedSetMultiMap';

/**
* A {@link SetMultiMap} linking identifiers to a set of StreamingHTTP streams.
* A {@link SetMultiMap} linking identifiers to a set of Streaming HTTP streams.
* An extension of {@link WrappedSetMultiMap} to make sure Components.js allows us to create this in the config,
* as {@link WrappedSetMultiMap} has a constructor not supported.
*/
export class StreamingHTTPMap extends WrappedSetMultiMap<string, PassThrough> implements SingleThreaded {}
export class StreamingHttpMap extends WrappedSetMultiMap<string, PassThrough> implements SingleThreaded {}
@@ -1,5 +1,5 @@
import { getLoggerFor } from '../../../logging/LogUtil';
import type { HttpResponse } from '../../../server/HttpResponse';
import type { HttpResponse } from '../../HttpResponse';
import { addHeader } from '../../../util/HeaderUtil';
import type { RepresentationMetadata } from '../../../http/representation/RepresentationMetadata';
import { MetadataWriter } from '../../../http/output/metadata/MetadataWriter';
Expand All @@ -8,21 +8,21 @@ import { MetadataWriter } from '../../../http/output/metadata/MetadataWriter';
* A {@link MetadataWriter} that adds link to the receiveFrom endpoint
* of the corresponding Streaming HTTP notifications channel
*/
export class StreamingHTTPMetadataWriter extends MetadataWriter {
export class StreamingHttpMetadataWriter extends MetadataWriter {
protected readonly logger = getLoggerFor(this);

public constructor(
private readonly baseUrl: string,
private readonly pathPrefix: string
private readonly pathPrefix: string,
) {
super();
}

public async handle(input: { response: HttpResponse; metadata: RepresentationMetadata }): Promise<void> {
const resourcePath = input.metadata.identifier.value.replace(this.baseUrl, '')
const receiveFrom = `${this.baseUrl}${this.pathPrefix}${resourcePath}`
const link = `<${receiveFrom}>; rel="http://www.w3.org/ns/solid/terms#updatesViaStreamingHTTP2023"`
this.logger.debug('Adding updatesViaStreamingHTTP2023 to the Link header');
const resourcePath = input.metadata.identifier.value.replace(this.baseUrl, '');
const receiveFrom = `${this.baseUrl}${this.pathPrefix}${resourcePath}`;
const link = `<${receiveFrom}>; rel="http://www.w3.org/ns/solid/terms#updatesViaStreamingHttp2023"`;
this.logger.debug('Adding updatesViaStreamingHttp2023 to the Link header');
addHeader(input.response, 'Link', link);
}
}
@@ -1,4 +1,5 @@
import { PassThrough } from 'stream';
import { PassThrough } from 'node:stream';
import { randomUUID } from 'node:crypto';
import type { Credentials } from '../../../authentication/Credentials';
import type { CredentialsExtractor } from '../../../authentication/CredentialsExtractor';
import type { Authorizer } from '../../../authorization/Authorizer';
Expand All @@ -12,72 +13,69 @@ import type { OperationHttpHandlerInput } from '../../OperationHttpHandler';
import { OperationHttpHandler } from '../../OperationHttpHandler';
import { guardStream } from '../../../util/GuardedStream';
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
import { StreamingHTTPMap } from './StreamingHTTPMap';
import { NotificationChannel } from '../NotificationChannel';
import { randomUUID } from 'node:crypto';
import type { NotificationChannel } from '../NotificationChannel';
import { NOTIFY } from '../../../util/Vocabularies';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import { NotificationGenerator } from '../generate/NotificationGenerator';
import { NotificationSerializer } from '../serialize/NotificationSerializer';
import type { NotificationGenerator } from '../generate/NotificationGenerator';
import type { NotificationSerializer } from '../serialize/NotificationSerializer';
import type { StreamingHttpMap } from './StreamingHttpMap';

/**
* Handles request to StreamingHTTP receiveFrom endopints.
* All allowed requests are stored in the {@link StreamingHTTPMap}
* Handles request to Streaming HTTP receiveFrom endopints.
* All allowed requests are stored in the {@link StreamingHttpMap}
*/
export class StreamingHTTPRequestHandler extends OperationHttpHandler {
export class StreamingHttpRequestHandler extends OperationHttpHandler {
protected logger = getLoggerFor(this);

constructor(
private readonly streamMap: StreamingHTTPMap,
public constructor(
private readonly streamMap: StreamingHttpMap,
private readonly pathPrefix: string,
private readonly generator: NotificationGenerator,
private readonly serializer: NotificationSerializer,
private readonly credentialsExtractor: CredentialsExtractor,
private readonly permissionReader: PermissionReader,
private readonly authorizer: Authorizer
private readonly authorizer: Authorizer,
) {
super()
super();
}

public async handle({ operation, request }: OperationHttpHandlerInput): Promise<ResponseDescription> {
const topic = operation.target.path.replace(this.pathPrefix, '')
const topic = operation.target.path.replace(this.pathPrefix, '');

// Verify if the client is allowed to connect
const credentials = await this.credentialsExtractor.handleSafe(request);
await this.authorize(credentials, topic);

const stream = guardStream(new PassThrough())
const stream = guardStream(new PassThrough());
this.streamMap.add(topic, stream);
stream.on('error', () => this.streamMap.deleteEntry(topic, stream));
stream.on('close', () => this.streamMap.deleteEntry(topic, stream));
stream.on('error', (): boolean => this.streamMap.deleteEntry(topic, stream));
stream.on('close', (): boolean => this.streamMap.deleteEntry(topic, stream));

// TODO: de-duplicate with StreamingHTTPListeningActivityHandler
const channel: NotificationChannel = {
// TODO decide what IRI should denote a pre-established channel
id: `urn:uuid:${randomUUID()}`,
type: NOTIFY.StreamingHTTPChannel2023,
topic,
accept: 'text/turtle'
}
accept: 'text/turtle',
};
// Send initial notification
try {
const notification = await this.generator.handle({ channel, topic: { path: topic } });
const notification = await this.generator.handle({ channel, topic: { path: topic }});
const representation = await this.serializer.handleSafe({ channel, notification });
representation.data.pipe(stream, { end: false })
representation.data.pipe(stream, { end: false });
} catch (error: unknown) {
this.logger.error(`Problem emitting initial notification: ${createErrorMessage(error)}`);
}
// Pre-established channels use Turtle
const representation = new BasicRepresentation(topic, operation.target, 'text/turtle');
return new OkResponseDescription(
representation.metadata,
stream
stream,
);
}

/**
* TODO: consider removing duplication with {@link NotificationsSubscriber}
*/
* TODO: consider removing duplication with {@link NotificationsSubscriber}
*/
private async authorize(credentials: Credentials, topic: string): Promise<void> {
const requestedModes = new IdentifierSetMultiMap<AccessMode>([[{ path: topic }, AccessMode.read ]]);
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ].join(',')}`);
Expand Down

0 comments on commit 5dc8afa

Please sign in to comment.