Skip to content

Commit

Permalink
Implement mechanism for providing unique ids for JSON-RPC clients (#5775
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Dmytro Kulieshov committed Jul 26, 2017
1 parent b12bf55 commit 953894f
Show file tree
Hide file tree
Showing 18 changed files with 428 additions and 138 deletions.
Expand Up @@ -15,20 +15,27 @@
import org.eclipse.che.api.core.websocket.impl.GuiceInjectorEndpointConfigurator;
import org.eclipse.che.api.core.websocket.impl.MessagesReSender;
import org.eclipse.che.api.core.websocket.impl.WebSocketSessionRegistry;
import org.eclipse.che.api.core.websocket.impl.WebsocketIdService;

import javax.inject.Inject;
import javax.websocket.server.ServerEndpoint;

/**
* Implementation of {@link BasicWebSocketEndpoint} for Che packaging.
* Add only mapping "/websocket/{endpoint-id}".
* Add only mapping "/websocket".
*/
@ServerEndpoint(value = "/websocket/{endpoint-id}", configurator = GuiceInjectorEndpointConfigurator.class)
@ServerEndpoint(value = "/websocket", configurator = GuiceInjectorEndpointConfigurator.class)
public class CheWebSocketEndpoint extends BasicWebSocketEndpoint {
@Inject
public CheWebSocketEndpoint(WebSocketSessionRegistry registry,
MessagesReSender reSender,
WebSocketMessageReceiver receiver) {
super(registry, reSender, receiver);
WebSocketMessageReceiver receiver,
WebsocketIdService websocketIdService) {
super(registry, reSender, receiver, websocketIdService);
}

@Override
protected String getEndpointId() {
return "master-websocket-endpoint";
}
}
Expand Up @@ -20,63 +20,76 @@
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;

/**
* Duplex WEB SOCKET endpoint, handles messages, errors, session open/close events.
*
* @author Dmitry Kuleshov
*/
public class BasicWebSocketEndpoint {
abstract public class BasicWebSocketEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(BasicWebSocketEndpoint.class);

private final WebSocketSessionRegistry registry;
private final MessagesReSender reSender;
private final WebSocketMessageReceiver receiver;
private final WebsocketIdService identificationService;


public BasicWebSocketEndpoint(WebSocketSessionRegistry registry,
MessagesReSender reSender,
WebSocketMessageReceiver receiver) {
WebSocketMessageReceiver receiver,
WebsocketIdService identificationService) {

this.registry = registry;
this.reSender = reSender;
this.receiver = receiver;
this.identificationService = identificationService;
}

@OnOpen
public void onOpen(Session session, @PathParam("endpoint-id") String endpointId) {
public void onOpen(Session session) {
String combinedEndpointId = getCombinedEndpointId(session);

LOG.debug("Web socket session opened");
LOG.debug("Endpoint: {}", endpointId);
LOG.debug("Endpoint: {}", combinedEndpointId);

session.setMaxIdleTimeout(0);

registry.add(endpointId, session);
reSender.resend(endpointId);
registry.add(combinedEndpointId, session);
reSender.resend(combinedEndpointId);
}

@OnMessage
public void onMessage(String message, @PathParam("endpoint-id") String endpointId) {
public void onMessage(String message, Session session) {
String combinedEndpointId = getCombinedEndpointId(session);

LOG.debug("Receiving a web socket message.");
LOG.debug("Endpoint: {}", endpointId);
LOG.debug("Endpoint: {}", combinedEndpointId);
LOG.debug("Message: {}", message);

receiver.receive(endpointId, message);
receiver.receive(combinedEndpointId, message);
}

@OnClose
public void onClose(CloseReason closeReason, @PathParam("endpoint-id") String endpointId) {
public void onClose(CloseReason closeReason, Session session) {
String combinedEndpointId = getCombinedEndpointId(session);

LOG.debug("Web socket session closed");
LOG.debug("Endpoint: {}", endpointId);
LOG.debug("Endpoint: {}", combinedEndpointId);
LOG.debug("Close reason: {}:{}", closeReason.getReasonPhrase(), closeReason.getCloseCode());

registry.remove(endpointId);
registry.remove(combinedEndpointId);
}

@OnError
public void onError(Throwable t, @PathParam("endpoint-id") String endpointId) {
public void onError(Throwable t) {
LOG.debug("Web socket session error");
LOG.debug("Endpoint: {}", endpointId);
LOG.debug("Error: {}", t);
}

private String getCombinedEndpointId(Session session) {
return registry.get(session).orElseGet(() -> identificationService.getCombinedId(getEndpointId()));
}

protected abstract String getEndpointId();
}
Expand Up @@ -14,7 +14,7 @@

import javax.inject.Singleton;
import javax.websocket.Session;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,17 +40,37 @@ public void add(String endpointId, Session session) {
sessionsMap.put(endpointId, session);
}

public void remove(String endpointId) {
public Optional<Session> remove(String endpointId) {
LOG.debug("Cancelling registration for session with endpoint {}", endpointId);

sessionsMap.remove(endpointId);
return Optional.ofNullable(sessionsMap.remove(endpointId));
}

public Optional<Session> remove(Session session) {
return get(session).flatMap(id -> Optional.ofNullable(sessionsMap.remove(id)));
}

public Optional<Session> get(String endpointId) {
return Optional.ofNullable(sessionsMap.get(endpointId));
}

public Set<Session> getByPartialMatch(String partialEndpointId) {
return sessionsMap.entrySet()
.stream()
.filter(it -> it.getKey().contains(partialEndpointId))
.map(Map.Entry::getValue)
.collect(toSet());
}

public Optional<String> get(Session session) {
return sessionsMap.entrySet()
.stream()
.filter(entry -> entry.getValue().equals(session))
.map(Map.Entry::getKey)
.findAny();
}

public Set<Session> getSessions() {
return sessionsMap.values().stream().collect(toSet());
return new HashSet<>(sessionsMap.values());
}
}
@@ -0,0 +1,84 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.core.websocket.impl;

import org.eclipse.che.api.core.jsonrpc.commons.JsonRpcException;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.websocket.Session;
import java.util.Random;

/**
* Identification service provide means to set and get unique identifiers for
* websocket clients. There are several identifier elements to distinguish:
* clientId, endpointId, combinedId. Client id is the identifier of a client
* that is passed over websocket to the client and back. EndpointId is called
* to identify a websocket endpoint client connects through. CombinedId is a
* combination of client and endpoint identifiers separated by a sequence of
* special charaters, it is used internally.
*/
@Singleton
public class WebsocketIdService {
private static final String ERROR_MESSAGE = "No session is associated with provided endpoint id";
private static final String SEPARATOR = "<-:->";
private static final Random GENERATOR = new Random();

private final WebSocketSessionRegistry registry;

@Inject
public WebsocketIdService(WebSocketSessionRegistry registry) {
this.registry = registry;
}

private static String clientId() {
return String.valueOf(GENERATOR.nextInt(Integer.MAX_VALUE));
}

@Inject
private void configureHandler(RequestHandlerConfigurator requestHandlerConfigurator) {
requestHandlerConfigurator.newConfiguration()
.methodName("websocketIdService/getId")
.noParams()
.resultAsString()
.withFunction(this::extractClientId);

requestHandlerConfigurator.newConfiguration()
.methodName("websocketIdService/setId")
.paramsAsString()
.noResult()
.withBiConsumer((oldCombinedId, newClientId) -> {
String endpointId = extractEndpointId(oldCombinedId);
String newCombinedId = getCombinedId(endpointId, newClientId);
Session session = registry.remove(oldCombinedId)
.orElseThrow(() -> new JsonRpcException(-27000, ERROR_MESSAGE));

registry.add(newCombinedId, session);
});
}

public String getCombinedId(String endpointId) {
return clientId() + SEPARATOR + endpointId;
}

public String getCombinedId(String endpointId, String clientId) {
return clientId + SEPARATOR + endpointId;
}

public String extractClientId(String combinedId) {
return combinedId.split(SEPARATOR)[0];
}

public String extractEndpointId(String combinedId) {
return combinedId.split(SEPARATOR)[1];
}
}

This file was deleted.

Expand Up @@ -25,6 +25,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Represents current context of the IDE application.
Expand Down Expand Up @@ -219,7 +220,13 @@ public interface AppContext {
*
* @return identifier
*/
String getAppId();
Optional<String> getApplicationWebsocketId();

/**
* Sets web application identifier. Most obvious use - to distinguish web applications
* on server side (e.g. connected via websocket)
*/
void setApplicationWebsocketId(String id);

/**
* Returns context properties, key-value storage that allows to store
Expand Down

0 comments on commit 953894f

Please sign in to comment.