Skip to content

Commit

Permalink
frontend: add SelectionContext for event subscription
Browse files Browse the repository at this point in the history
Motivation:

In order to support door- and user specific root paths (see issue #5318)
additional information must be provided to the EventSource plugins.
Such a change implies breaking backwards compatibility with the
EventSource SPI; however, to the best of my knowledge, there are
currently no third-party EventSource plugins.

To minimise the impact of any future changes, this patch introduces the
SelectionContext interface.  This should allow future patches to provide
additional information within SelectionContext without breaking
backwards compatibility.

Modification:

Introduce the SelectionContext interface.

The channel-ID is now provided through the SelectionContext.  A future
patch will add extra information in SelectionContext.

Result:

This patch breaks the EventSource SPI, so any third-party EventSource
plugins that use this SPI must be updated.

Target: master
Requires-notes: yes
Requires-book: no
Request: 6.1
Request: 6.0
Request: 5.2
Patch: https://rb.dcache.org/r/12316/
Acked-by: Tigran Mkrtchyan
  • Loading branch information
paulmillar authored and mksahakyan committed May 4, 2020
1 parent 16396cf commit 28d7d6b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 7 deletions.
Expand Up @@ -54,6 +54,7 @@

import org.dcache.auth.Subjects;
import org.dcache.restful.events.spi.EventStream;
import org.dcache.restful.events.spi.SelectionContext;
import org.dcache.restful.events.spi.SelectionResult;
import org.dcache.restful.events.spi.SelectionStatus;
import org.dcache.restful.util.CloseableWithTasks;
Expand Down Expand Up @@ -381,12 +382,20 @@ private synchronized void sendSubscriptionEvent(String type,
}
}

public SubscriptionResult subscribe(String channelId, String eventType, JsonNode selector)
public SubscriptionResult subscribe(final String channelId, String eventType,
JsonNode selector)
{
EventStream es = repository.getEventStream(eventType)
.orElseThrow(() -> new BadRequestException("Unknown event type: " + eventType));

SelectionResult result = es.select(channelId,
SelectionContext context = new SelectionContext() {
@Override
public String channelId() {
return channelId;
}
};

SelectionResult result = es.select(context,
(id,event) -> {
try {
sendEvent(eventType, id, event);
Expand Down
Expand Up @@ -91,6 +91,7 @@ public interface EventStream
/**
* Make provision for supplying selected events to the receiver. The
* {@literal receiver} argument must not block when receiving an event.
* @param context ancillary information for EventStream implementations.
* @param receiver the recipient of a selection-id and JSON event data.
* The selection-id must correspond to some
* {@link SelectedEventStream#getId()} value and the JSON event data must
Expand All @@ -100,6 +101,7 @@ public interface EventStream
* @return the result of processing the subscription request
*/
@NotNull
SelectionResult select(String channelId, @NotNull BiConsumer<String,JsonNode> receiver,
SelectionResult select(@NotNull SelectionContext context,
@NotNull BiConsumer<String,JsonNode> receiver,
@NotNull JsonNode selector);
}
@@ -0,0 +1,32 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2020 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.dcache.restful.events.spi;

/**
* Optional, ancillary information supplied when a client is selecting events
* from an EventStream. Unlike other (core) information, the information here
* is available but not all plugins will need it.
*/
public interface SelectionContext
{
/**
* The ID of the channel within which the selection is being made.
*/
String channelId();
}
Expand Up @@ -74,6 +74,7 @@
import org.dcache.events.SystemEvent;
import org.dcache.restful.events.spi.EventStream;
import org.dcache.restful.events.spi.SelectedEventStream;
import org.dcache.restful.events.spi.SelectionContext;
import org.dcache.restful.events.spi.SelectionResult;
import org.dcache.restful.util.RequestUser;
import org.dcache.vehicles.FileAttributes;
Expand Down Expand Up @@ -424,13 +425,14 @@ private void sendEventToSelections(InotifyEvent event, Collection<InotifySelecti
}

@Override
public SelectionResult select(String channelId, BiConsumer<String,JsonNode> receiver,
public SelectionResult select(SelectionContext context, BiConsumer<String,JsonNode> receiver,
JsonNode serialisedSelector)
{
try {
InotifySelector selector = mapper.readerFor(InotifySelector.class)
.readValue(serialisedSelector);
return selector.validationError().orElseGet(() -> select(channelId, receiver, selector));
return selector.validationError().orElseGet(() ->
select(context.channelId(), receiver, selector));
} catch (JsonMappingException e) {
int index = e.getMessage().indexOf('\n');
String msg = index == -1 ? e.getMessage() : e.getMessage().substring(0, index);
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.function.BiConsumer;

import org.dcache.restful.events.spi.EventStream;
import org.dcache.restful.events.spi.SelectionContext;

import static org.dcache.restful.util.transfers.Json.readFromJar;

Expand Down Expand Up @@ -104,8 +105,8 @@ public ObjectNode eventSchema()
}

@Override
public SelectionResult select(String channelId, BiConsumer<String,JsonNode> receiver,
JsonNode serialisedSelector)
public SelectionResult select(SelectionContext context,
BiConsumer<String,JsonNode> receiver, JsonNode serialisedSelector)
{
Selector selector = deserialise(serialisedSelector);
SelectionResult result = selector.validationError();
Expand Down

0 comments on commit 28d7d6b

Please sign in to comment.