Skip to content

Commit 00a5d5d

Browse files
committed
Tie up loose ends with subscription handling
1 parent dbd4cfb commit 00a5d5d

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

src/main/java/graphql/servlet/internal/ApolloSubscriptionProtocolHandler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10+
import javax.websocket.CloseReason;
1011
import javax.websocket.Session;
1112
import javax.websocket.server.HandshakeRequest;
1213
import java.io.IOException;
1314
import java.util.HashMap;
1415
import java.util.Map;
1516

1617
import static graphql.servlet.internal.ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_COMPLETE;
18+
import static graphql.servlet.internal.ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_CONNECTION_TERMINATE;
1719
import static graphql.servlet.internal.ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_DATA;
1820
import static graphql.servlet.internal.ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_ERROR;
1921

2022
/**
23+
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
24+
*
2125
* @author Andrew Potter
2226
*/
2327
public class ApolloSubscriptionProtocolHandler extends SubscriptionProtocolHandler {
@@ -57,6 +61,21 @@ public void onMessage(HandshakeRequest request, Session session, WsSessionSubscr
5761
))
5862
);
5963
break;
64+
65+
case GQL_STOP:
66+
unsubscribe(subscriptions, message.id);
67+
break;
68+
69+
case GQL_CONNECTION_TERMINATE:
70+
try {
71+
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "client requested " + GQL_CONNECTION_TERMINATE.getType()));
72+
} catch (IOException e) {
73+
log.error("Unable to close websocket session!", e);
74+
}
75+
break;
76+
77+
default:
78+
throw new IllegalArgumentException("Unknown message type: " + message.getType());
6079
}
6180
}
6281

src/main/java/graphql/servlet/internal/FallbackSubscriptionProtocolHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import javax.websocket.Session;
44
import javax.websocket.server.HandshakeRequest;
55
import java.io.IOException;
6+
import java.util.UUID;
67

78
/**
89
* @author Andrew Potter
@@ -17,8 +18,16 @@ public FallbackSubscriptionProtocolHandler(SubscriptionHandlerInput subscription
1718

1819
@Override
1920
public void onMessage(HandshakeRequest request, Session session, WsSessionSubscriptions subscriptions, String text) throws Exception {
20-
subscribe(session, input.getQueryInvoker().query(input.getInvocationInputFactory().create(
21-
input.getGraphQLObjectMapper().readGraphQLRequest(text))), subscriptions, session.getId());
21+
subscribe(
22+
session,
23+
input.getQueryInvoker().query(
24+
input.getInvocationInputFactory().create(
25+
input.getGraphQLObjectMapper().readGraphQLRequest(text)
26+
)
27+
),
28+
subscriptions,
29+
UUID.randomUUID().toString()
30+
);
2231
}
2332

2433
@Override

src/main/java/graphql/servlet/internal/SubscriptionProtocolHandler.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ protected void subscribe(Session session, ExecutionResult executionResult, WsSes
3333

3434
if (data instanceof Publisher) {
3535
@SuppressWarnings("unchecked") final Publisher<ExecutionResult> publisher = (Publisher<ExecutionResult>) data;
36-
final AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
36+
final AtomicSubscriptionReference subscriptionReference = new AtomicSubscriptionReference();
3737

3838
publisher.subscribe(new Subscriber<ExecutionResult>() {
3939
@Override
@@ -57,16 +57,39 @@ public void onError(Throwable throwable) {
5757
log.error("Subscription error", throwable);
5858
subscriptions.cancel(id);
5959
sendErrorMessage(session, id);
60-
// sendMessage(session, ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_ERROR, id);
6160
}
6261

6362
@Override
6463
public void onComplete() {
6564
subscriptions.cancel(id);
6665
sendCompleteMessage(session, id);
67-
// sendMessage(session, ApolloSubscriptionProtocolHandler.OperationMessage.Type.GQL_COMPLETE, id);
6866
}
6967
});
7068
}
7169
}
70+
71+
protected void unsubscribe(WsSessionSubscriptions subscriptions, String id) {
72+
subscriptions.cancel(id);
73+
}
74+
75+
static class AtomicSubscriptionReference {
76+
private final AtomicReference<Subscription> reference = new AtomicReference<>(null);
77+
78+
public void set(Subscription subscription) {
79+
if(reference.get() != null) {
80+
throw new IllegalStateException("Cannot overwrite subscription!");
81+
}
82+
83+
reference.set(subscription);
84+
}
85+
86+
public Subscription get() {
87+
Subscription subscription = reference.get();
88+
if(subscription == null) {
89+
throw new IllegalStateException("Subscription has not been initialized yet!");
90+
}
91+
92+
return subscription;
93+
}
94+
}
7295
}

0 commit comments

Comments
 (0)