Skip to content

Commit e0fa40b

Browse files
committed
Refactor subscriptions
1 parent 3dbad11 commit e0fa40b

File tree

49 files changed

+1471
-952
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1471
-952
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ PROJECT_DEV_ID = apottere
1010
PROJECT_DEV_NAME = Andrew Potter
1111

1212
LIB_GRAPHQL_JAVA_VER = 13.0
13-
LIB_JACKSON_VER = 2.9.9
13+
LIB_JACKSON_VER = 2.10.0
1414

1515
SOURCE_COMPATIBILITY = 1.8
1616
TARGET_COMPATIBILITY = 1.8
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
import org.reactivestreams.Subscription;
5+
6+
public class AtomicSubscriptionSubscription {
7+
8+
private final AtomicReference<Subscription> reference = new AtomicReference<>(null);
9+
10+
public void set(Subscription subscription) {
11+
if (reference.get() != null) {
12+
throw new IllegalStateException("Cannot overwrite subscription!");
13+
}
14+
15+
reference.set(subscription);
16+
}
17+
18+
public Subscription get() {
19+
Subscription subscription = reference.get();
20+
if (subscription == null) {
21+
throw new IllegalStateException("Subscription has not been initialized yet!");
22+
}
23+
24+
return subscription;
25+
}
26+
27+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import graphql.ExecutionResult;
4+
import graphql.execution.reactive.SingleSubscriberPublisher;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import lombok.Getter;
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.reactivestreams.Publisher;
12+
import org.reactivestreams.Subscription;
13+
14+
@Slf4j
15+
@RequiredArgsConstructor
16+
public class DefaultSubscriptionSession implements SubscriptionSession {
17+
18+
@Getter
19+
private final GraphQLSubscriptionMapper mapper;
20+
private SingleSubscriberPublisher<String> publisher = new SingleSubscriberPublisher<>();
21+
private SessionSubscriptions subscriptions = new SessionSubscriptions();
22+
23+
@Override
24+
public void send(String message) {
25+
Objects.requireNonNull(message, "message is required");
26+
log.info("Offer message: {}", message);
27+
publisher.offer(message);
28+
}
29+
30+
@Override
31+
public void sendMessage(Object payload) {
32+
Objects.requireNonNull(payload, "payload is required");
33+
log.info("Send message: {}", payload);
34+
send(mapper.serialize(payload));
35+
}
36+
37+
@Override
38+
public void subscribe(String id, Publisher<ExecutionResult> dataPublisher) {
39+
dataPublisher.subscribe(new SessionSubscriber(this, id));
40+
}
41+
42+
@Override
43+
public void add(String id, Subscription subscription) {
44+
subscriptions.add(id, subscription);
45+
}
46+
47+
@Override
48+
public void unsubscribe(String id) {
49+
subscriptions.cancel(id);
50+
}
51+
52+
@Override
53+
public void sendDataMessage(String id, Object payload) {
54+
send(mapper.serialize(payload));
55+
}
56+
57+
@Override
58+
public void sendErrorMessage(String id) {
59+
60+
}
61+
62+
@Override
63+
public void sendCompleteMessage(String id) {
64+
65+
}
66+
67+
@Override
68+
public void close(String reason) {
69+
publisher.noMoreData();
70+
}
71+
72+
@Override
73+
public Map<String, Object> getUserProperties() {
74+
return new HashMap<>();
75+
}
76+
77+
@Override
78+
public boolean isOpen() {
79+
return true;
80+
}
81+
82+
@Override
83+
public String getId() {
84+
return null;
85+
}
86+
87+
@Override
88+
public SessionSubscriptions getSubscriptions() {
89+
return subscriptions;
90+
}
91+
92+
@Override
93+
public Object unwrap() {
94+
throw new UnsupportedOperationException();
95+
}
96+
97+
@Override
98+
public Publisher<String> getPublisher() {
99+
return publisher;
100+
}
101+
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import graphql.kickstart.execution.GraphQLRequest;
4+
import graphql.kickstart.execution.input.GraphQLSingleInvocationInput;
5+
6+
public interface GraphQLSubscriptionInvocationInputFactory {
7+
8+
GraphQLSingleInvocationInput create(GraphQLRequest graphQLRequest, SubscriptionSession session);
9+
10+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import graphql.ExecutionResult;
5+
import graphql.kickstart.execution.GraphQLObjectMapper;
6+
import graphql.kickstart.execution.GraphQLRequest;
7+
import java.util.Map;
8+
import lombok.RequiredArgsConstructor;
9+
10+
@RequiredArgsConstructor
11+
public class GraphQLSubscriptionMapper {
12+
13+
private final GraphQLObjectMapper graphQLObjectMapper;
14+
15+
public GraphQLRequest readGraphQLRequest(Object payload) {
16+
return graphQLObjectMapper.getJacksonMapper().convertValue(payload, GraphQLRequest.class);
17+
}
18+
19+
public ExecutionResult sanitizeErrors(ExecutionResult executionResult) {
20+
return graphQLObjectMapper.sanitizeErrors(executionResult);
21+
}
22+
23+
public boolean areErrorsPresent(ExecutionResult executionResult) {
24+
return graphQLObjectMapper.areErrorsPresent(executionResult);
25+
}
26+
27+
public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult) {
28+
return graphQLObjectMapper.convertSanitizedExecutionResult(executionResult, false);
29+
}
30+
31+
public String serialize(Object payload) {
32+
try {
33+
return graphQLObjectMapper.getJacksonMapper().writeValueAsString(payload);
34+
} catch (JsonProcessingException e) {
35+
return e.getMessage();
36+
}
37+
}
38+
39+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import graphql.ExecutionResult;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
11+
@Slf4j
12+
@RequiredArgsConstructor
13+
class SessionSubscriber implements Subscriber<ExecutionResult> {
14+
15+
private final SubscriptionSession session;
16+
private final String id;
17+
private AtomicSubscriptionSubscription subscriptionReference = new AtomicSubscriptionSubscription();
18+
19+
@Override
20+
public void onSubscribe(Subscription subscription) {
21+
log.info("Subscribe to execution result: {}", subscription);
22+
subscriptionReference.set(subscription);
23+
subscriptionReference.get().request(1);
24+
25+
session.add(id, subscriptionReference.get());
26+
}
27+
28+
@Override
29+
public void onNext(ExecutionResult executionResult) {
30+
log.info("Next execution result: {}", executionResult);
31+
Map<String, Object> result = new HashMap<>();
32+
result.put("data", executionResult.getData());
33+
session.sendDataMessage(id, result);
34+
subscriptionReference.get().request(1);
35+
}
36+
37+
@Override
38+
public void onError(Throwable throwable) {
39+
log.error("Subscription error", throwable);
40+
session.unsubscribe(id);
41+
session.sendErrorMessage(id);
42+
}
43+
44+
@Override
45+
public void onComplete() {
46+
session.unsubscribe(id);
47+
session.sendCompleteMessage(id);
48+
}
49+
50+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import org.reactivestreams.Subscription;
6+
7+
/**
8+
* @author Andrew Potter
9+
*/
10+
public class SessionSubscriptions {
11+
12+
private final Object lock = new Object();
13+
14+
private boolean closed = false;
15+
private Map<String, Subscription> subscriptions = new ConcurrentHashMap<>();
16+
17+
public void add(Subscription subscription) {
18+
add(getImplicitId(subscription), subscription);
19+
}
20+
21+
public void add(String id, Subscription subscription) {
22+
synchronized (lock) {
23+
if (closed) {
24+
throw new IllegalStateException("Websocket was already closed!");
25+
}
26+
subscriptions.put(id, subscription);
27+
}
28+
}
29+
30+
public void cancel(Subscription subscription) {
31+
cancel(getImplicitId(subscription));
32+
}
33+
34+
public void cancel(String id) {
35+
Subscription subscription = subscriptions.remove(id);
36+
if (subscription != null) {
37+
subscription.cancel();
38+
}
39+
}
40+
41+
public void close() {
42+
synchronized (lock) {
43+
closed = true;
44+
subscriptions.forEach((k, v) -> v.cancel());
45+
subscriptions.clear();
46+
}
47+
}
48+
49+
private String getImplicitId(Subscription subscription) {
50+
return String.valueOf(subscription.hashCode());
51+
}
52+
53+
public int getSubscriptionCount() {
54+
return subscriptions.size();
55+
}
56+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package graphql.kickstart.execution.subscription;
1+
package graphql.kickstart.execution.subscriptions;
22

33
/**
44
* Marker interface
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package graphql.kickstart.execution.subscription;
1+
package graphql.kickstart.execution.subscriptions;
22

33
public class SubscriptionException extends Exception {
44

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package graphql.kickstart.execution.subscriptions;
2+
3+
public class SubscriptionHandler {
4+
5+
}

0 commit comments

Comments
 (0)