forked from infinispan/infinispan
/
ContinuousQueryImpl.java
116 lines (100 loc) · 4.58 KB
/
ContinuousQueryImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package org.infinispan.client.hotrod.event;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.filter.Filters;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.protostream.ProtobufUtil;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.remote.client.ContinuousQueryResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A container of continuous query listeners for a cache.
* <p>This class is not threadsafe.
*
* @author anistor@redhat.com
* @since 8.2
* @private
*/
public final class ContinuousQueryImpl<K, V> implements ContinuousQuery<K, V> {
private final RemoteCache<K, V> cache;
private final SerializationContext serializationContext;
private final List<ClientEntryListener> listeners = new ArrayList<>();
public ContinuousQueryImpl(RemoteCache<K, V> cache) {
if (cache == null) {
throw new IllegalArgumentException("cache parameter cannot be null");
}
this.cache = cache;
serializationContext = ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager());
}
/**
* Registers a continuous query listener that uses a query DSL based filter. The listener will receive notifications
* when a cache entry joins or leaves the matching set defined by the query.
*
* @param listener the continuous query listener instance
* @param query the query to be used for determining the matching set
*/
public <C> void addContinuousQueryListener(Query query, ContinuousQueryListener<K, C> listener) {
ClientEntryListener eventListener = new ClientEntryListener(serializationContext, listener);
Object[] factoryParams = Filters.makeFactoryParams(query);
cache.addClientListener(eventListener, factoryParams, null);
listeners.add(eventListener);
}
public void removeContinuousQueryListener(ContinuousQueryListener<K, ?> listener) {
for (Iterator<ClientEntryListener> it = listeners.iterator(); it.hasNext(); ) {
ClientEntryListener l = it.next();
if (l.listener == listener) {
cache.removeClientListener(l);
it.remove();
break;
}
}
}
public List<ContinuousQueryListener<K, ?>> getListeners() {
List<ContinuousQueryListener<K, ?>> queryListeners = new ArrayList<>(listeners.size());
for (ClientEntryListener l : listeners) {
queryListeners.add(l.listener);
}
return queryListeners;
}
public void removeAllListeners() {
for (ClientEntryListener l : listeners) {
cache.removeClientListener(l);
}
listeners.clear();
}
@ClientListener(filterFactoryName = Filters.CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
converterFactoryName = Filters.CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
useRawData = true, includeCurrentState = true)
private static final class ClientEntryListener<K, V> {
private final SerializationContext serializationContext;
private final ContinuousQueryListener listener;
public ClientEntryListener(SerializationContext serializationContext, ContinuousQueryListener listener) {
this.serializationContext = serializationContext;
this.listener = listener;
}
@ClientCacheEntryCreated
@ClientCacheEntryModified
@ClientCacheEntryRemoved
@ClientCacheEntryExpired
public void handleClientCacheEntryCreatedEvent(ClientCacheEntryCustomEvent<byte[]> event) throws IOException {
ContinuousQueryResult cqr = ProtobufUtil.fromByteArray(serializationContext, event.getEventData(), ContinuousQueryResult.class);
Object key = ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getKey());
Object value = cqr.getValue() != null ? ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getValue()) : cqr.getProjection();
if (cqr.isJoining()) {
listener.resultJoining(key, value);
} else {
listener.resultLeaving(key);
}
}
}
}