Skip to content

Commit 7d33aaa

Browse files
committed
Update gradle wrapper and attempt to rework subscriptions
1 parent 8bfff43 commit 7d33aaa

File tree

9 files changed

+108
-65
lines changed

9 files changed

+108
-65
lines changed

gradle/wrapper/gradle-wrapper.jar

117 Bytes
Binary file not shown.
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
#Mon Apr 24 13:12:01 EDT 2017
21
distributionBase=GRADLE_USER_HOME
32
distributionPath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-bin.zip

gradlew

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
3333
# Use the maximum available, or set MAX_FD != -1 to use that value.
3434
MAX_FD="maximum"
3535

36-
warn ( ) {
36+
warn () {
3737
echo "$*"
3838
}
3939

40-
die ( ) {
40+
die () {
4141
echo
4242
echo "$*"
4343
echo
@@ -155,7 +155,7 @@ if $cygwin ; then
155155
fi
156156

157157
# Escape application args
158-
save ( ) {
158+
save () {
159159
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
160160
echo " "
161161
}

src/main/java/graphql/servlet/GraphQLObjectMapper.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,15 @@ public Map<String, Object> createResultFromExecutionResult(ExecutionResult execu
129129
}
130130

131131
public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult) {
132+
return convertSanitizedExecutionResult(executionResult, true);
133+
}
134+
135+
public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult, boolean includeData) {
132136
final Map<String, Object> result = new LinkedHashMap<>();
133-
result.put("data", executionResult.getData());
137+
138+
if(includeData) {
139+
result.put("data", executionResult.getData());
140+
}
134141

135142
if (areErrorsPresent(executionResult)) {
136143
result.put("errors", executionResult.getErrors());

src/main/java/graphql/servlet/GraphQLWebsocketServlet.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import graphql.servlet.internal.SubscriptionProtocolFactory;
77
import graphql.servlet.internal.SubscriptionProtocolHandler;
88
import graphql.servlet.internal.WsSessionSubscriptions;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
911

1012
import javax.websocket.CloseReason;
1113
import javax.websocket.Endpoint;
@@ -23,15 +25,15 @@
2325
import java.util.stream.Collectors;
2426
import java.util.stream.Stream;
2527

26-
import static graphql.servlet.AbstractGraphQLHttpServlet.log;
27-
2828
/**
2929
* Must be used with {@link #modifyHandshake(ServerEndpointConfig, HandshakeRequest, HandshakeResponse)}
3030
*
3131
* @author Andrew Potter
3232
*/
3333
public class GraphQLWebsocketServlet extends Endpoint {
3434

35+
private static final Logger log = LoggerFactory.getLogger(GraphQLWebsocketServlet.class);
36+
3537
private static final String HANDSHAKE_REQUEST_KEY = HandshakeRequest.class.getName();
3638
private static final String PROTOCOL_HANDLER_REQUEST_KEY = SubscriptionProtocolHandler.class.getName();
3739
private static final CloseReason ERROR_CLOSE_REASON = new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Internal Server Error");
@@ -47,15 +49,9 @@ public class GraphQLWebsocketServlet extends Endpoint {
4749
}
4850

4951
private final Map<Session, WsSessionSubscriptions> sessionSubscriptionCache = new HashMap<>();
50-
private final GraphQLQueryInvoker queryInvoker;
51-
private final GraphQLInvocationInputFactory invocationInputFactory;
52-
private final GraphQLObjectMapper graphQLObjectMapper;
5352
private final SubscriptionHandlerInput subscriptionHandlerInput;
5453

5554
public GraphQLWebsocketServlet(GraphQLQueryInvoker queryInvoker, GraphQLInvocationInputFactory invocationInputFactory, GraphQLObjectMapper graphQLObjectMapper) {
56-
this.queryInvoker = queryInvoker;
57-
this.invocationInputFactory = invocationInputFactory;
58-
this.graphQLObjectMapper = graphQLObjectMapper;
5955
this.subscriptionHandlerInput = new SubscriptionHandlerInput(invocationInputFactory, queryInvoker, graphQLObjectMapper);
6056
}
6157

@@ -73,7 +69,7 @@ public void onOpen(Session session, EndpointConfig endpointConfig) {
7369
@Override
7470
public void onMessage(String text) {
7571
try {
76-
subscriptionProtocolHandler.onMessage(request, session, text);
72+
subscriptionProtocolHandler.onMessage(request, session, subscriptions, text);
7773
} catch (Throwable t) {
7874
log.error("Error executing websocket query for session: {}", session.getId(), t);
7975
closeUnexpectedly(session, t);

src/main/java/graphql/servlet/internal/ApolloSubscriptionProtocolHandler.java

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.fasterxml.jackson.annotation.JsonInclude;
55
import com.fasterxml.jackson.annotation.JsonValue;
66
import graphql.ExecutionResult;
7-
import graphql.servlet.GraphQLObjectMapper;
87
import org.reactivestreams.Publisher;
98
import org.reactivestreams.Subscriber;
109
import org.reactivestreams.Subscription;
@@ -32,7 +31,7 @@ public ApolloSubscriptionProtocolHandler(SubscriptionHandlerInput subscriptionHa
3231
}
3332

3433
@Override
35-
public void onMessage(HandshakeRequest request, Session session, String text) {
34+
public void onMessage(HandshakeRequest request, Session session, WsSessionSubscriptions subscriptions, String text) {
3635
OperationMessage message;
3736
try {
3837
message = input.getGraphQLObjectMapper().getJacksonMapper().readValue(text, OperationMessage.class);
@@ -45,12 +44,13 @@ public void onMessage(HandshakeRequest request, Session session, String text) {
4544
switch(message.getType()) {
4645
case GQL_CONNECTION_INIT:
4746
sendMessage(session, OperationMessage.Type.GQL_CONNECTION_ACK, message.getId());
48-
// sendMessage(session, OperationMessage.Type.GQL_CONNECTION_KEEP_ALIVE, message.getId());
47+
sendMessage(session, OperationMessage.Type.GQL_CONNECTION_KEEP_ALIVE, message.getId());
4948
break;
5049

5150
case GQL_START:
5251
handleSubscriptionStart(
5352
session,
53+
subscriptions,
5454
message.id,
5555
input.getQueryInvoker().query(input.getInvocationInputFactory().create(
5656
input.getGraphQLObjectMapper().getJacksonMapper().convertValue(message.payload, GraphQLRequest.class)
@@ -61,45 +61,15 @@ public void onMessage(HandshakeRequest request, Session session, String text) {
6161
}
6262

6363
@SuppressWarnings("unchecked")
64-
private void handleSubscriptionStart(Session session, String id, ExecutionResult executionResult) {
64+
private void handleSubscriptionStart(Session session, WsSessionSubscriptions subscriptions, String id, ExecutionResult executionResult) {
6565
executionResult = input.getGraphQLObjectMapper().sanitizeErrors(executionResult);
66-
OperationMessage.Type type = input.getGraphQLObjectMapper().areErrorsPresent(executionResult) ? OperationMessage.Type.GQL_ERROR : OperationMessage.Type.GQL_DATA;
67-
68-
Object data = executionResult.getData();
69-
if(data instanceof Publisher) {
70-
if(type == OperationMessage.Type.GQL_DATA) {
71-
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
72-
73-
((Publisher<ExecutionResult>) data).subscribe(new Subscriber<ExecutionResult>() {
74-
@Override
75-
public void onSubscribe(Subscription subscription) {
76-
subscriptionReference.set(subscription);
77-
subscriptionReference.get().request(1);
78-
}
79-
80-
@Override
81-
public void onNext(ExecutionResult executionResult) {
82-
subscriptionReference.get().request(1);
83-
Map<String, Object> result = new HashMap<>();
84-
result.put("data", executionResult.getData());
85-
sendMessage(session, OperationMessage.Type.GQL_DATA, id, result);
86-
}
87-
88-
@Override
89-
public void onError(Throwable throwable) {
90-
log.error("Subscription error", throwable);
91-
sendMessage(session, OperationMessage.Type.GQL_ERROR, id);
92-
}
93-
94-
@Override
95-
public void onComplete() {
96-
sendMessage(session, OperationMessage.Type.GQL_COMPLETE, id);
97-
}
98-
});
99-
}
66+
67+
if(input.getGraphQLObjectMapper().areErrorsPresent(executionResult)) {
68+
sendMessage(session, OperationMessage.Type.GQL_ERROR, id, input.getGraphQLObjectMapper().convertSanitizedExecutionResult(executionResult, false));
69+
return;
10070
}
10171

102-
sendMessage(session, type, id, input.getGraphQLObjectMapper().convertSanitizedExecutionResult(executionResult));
72+
subscribe(executionResult, subscriptions, id);
10373
}
10474

10575
private void sendMessage(Session session, OperationMessage.Type type, String id) {

src/main/java/graphql/servlet/internal/FallbackSubscriptionProtocolHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public FallbackSubscriptionProtocolHandler(SubscriptionHandlerInput subscription
1515
}
1616

1717
@Override
18-
public void onMessage(HandshakeRequest request, Session session, String text) throws Exception {
18+
public void onMessage(HandshakeRequest request, Session session, WsSessionSubscriptions subscriptions, String text) throws Exception {
1919
session.getBasicRemote().sendText(input.getGraphQLObjectMapper().serializeResultAsJson(
2020
input.getQueryInvoker().query(input.getInvocationInputFactory().create(input.getGraphQLObjectMapper().readGraphQLRequest(text), request))
2121
));
Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,68 @@
11
package graphql.servlet.internal;
22

3+
import graphql.ExecutionResult;
4+
import org.reactivestreams.Publisher;
5+
import org.reactivestreams.Subscriber;
6+
import org.reactivestreams.Subscription;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
310
import javax.websocket.Session;
411
import javax.websocket.server.HandshakeRequest;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.concurrent.atomic.AtomicReference;
515

616
/**
717
* @author Andrew Potter
818
*/
9-
public interface SubscriptionProtocolHandler {
10-
void onMessage(HandshakeRequest request, Session session, String text) throws Exception;
19+
public abstract class SubscriptionProtocolHandler {
20+
21+
private static final Logger log = LoggerFactory.getLogger(SubscriptionProtocolHandler.class);
22+
23+
abstract void onMessage(HandshakeRequest request, Session session, WsSessionSubscriptions subscriptions, String text) throws Exception;
24+
25+
protected void subscribe(ExecutionResult executionResult, WsSessionSubscriptions subscriptions, String id) {
26+
final Object data = executionResult.getData();
27+
28+
if(data instanceof Publisher) {
29+
@SuppressWarnings("unchecked")
30+
final Publisher<ExecutionResult> publisher = (Publisher<ExecutionResult>) data;
31+
final AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
32+
33+
publisher.subscribe(new Subscriber<ExecutionResult>() {
34+
@Override
35+
public void onSubscribe(Subscription subscription) {
36+
subscriptionReference.set(subscription);
37+
subscriptionReference.get().request(1);
38+
39+
subscriptions.add(id, subscriptionReference.get());
40+
}
41+
42+
@Override
43+
public void onNext(ExecutionResult executionResult) {
44+
subscriptionReference.get().request(1);
45+
Map<String, Object> result = new HashMap<>();
46+
result.put("data", executionResult.getData());
47+
sendMessage(session, ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_DATA, id, result);
48+
}
49+
50+
@Override
51+
public void onError(Throwable throwable) {
52+
log.error("Subscription error", throwable);
53+
subscriptions.cancel(id);
54+
sendMessage(session, ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_ERROR, id);
55+
}
56+
57+
@Override
58+
public void onComplete() {
59+
subscriptions.cancel(id);
60+
sendMessage(session, ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_COMPLETE, id);
61+
}
62+
});
63+
}
64+
}
65+
}
66+
67+
public static
1168
}

src/main/java/graphql/servlet/internal/WsSessionSubscriptions.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import org.reactivestreams.Subscription;
44

5-
import java.util.ArrayList;
6-
import java.util.List;
5+
import java.util.HashMap;
6+
import java.util.Map;
77

88
/**
99
* @author Andrew Potter
@@ -12,29 +12,43 @@ public class WsSessionSubscriptions {
1212
private final Object lock = new Object();
1313

1414
private boolean closed = false;
15-
private List<Subscription> subscriptions = new ArrayList<>();
15+
private Map<String, Subscription> subscriptions = new HashMap<>();
1616

1717
public void add(Subscription subscription) {
18+
add(getImplicitId(subscription), subscription);
19+
}
20+
21+
public void add(String id, Subscription subscription) {
1822
synchronized (lock) {
1923
if(closed) {
2024
throw new IllegalStateException("Websocket was already closed!");
2125
}
22-
subscriptions.add(subscription);
26+
subscriptions.put(id, subscription);
2327
}
2428
}
2529

2630
public void cancel(Subscription subscription) {
31+
cancel(getImplicitId(subscription));
32+
}
33+
34+
public void cancel(String id) {
2735
synchronized (lock) {
28-
subscriptions.remove(subscription);
29-
subscription.cancel();
36+
Subscription subscription = subscriptions.remove(id);
37+
if(subscription != null) {
38+
subscription.cancel();
39+
}
3040
}
3141
}
3242

3343
public void close() {
3444
synchronized (lock) {
3545
closed = true;
36-
subscriptions.forEach(Subscription::cancel);
37-
subscriptions = new ArrayList<>();
46+
subscriptions.forEach((k, v) -> v.cancel());
47+
subscriptions = new HashMap<>();
3848
}
3949
}
50+
51+
private String getImplicitId(Subscription subscription) {
52+
return String.valueOf(subscription.hashCode());
53+
}
4054
}

0 commit comments

Comments
 (0)