Skip to content

Commit

Permalink
added support for entrySet() to textWire
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Apr 29, 2015
1 parent e4e8efe commit a6bd9e1
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@

import net.openhft.chronicle.engine.client.ClientWiredStatelessTcpConnectionHub;
import net.openhft.chronicle.engine.client.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.*;

import static net.openhft.chronicle.engine.client.ClientWiredStatelessTcpConnectionHub.CoreFields.reply;
import static net.openhft.chronicle.map.ClientWiredStatelessChronicleEntrySet.EntrySetEventId.*;
import static net.openhft.chronicle.map.ClientWiredStatelessChronicleEntrySet.Params.key;
import static net.openhft.chronicle.map.ClientWiredStatelessChronicleEntrySet.Params.*;


class ClientWiredStatelessChronicleEntrySet<K, V> extends MapStatelessClient<K, V, ClientWiredStatelessChronicleEntrySet.EntrySetEventId>
Expand All @@ -22,9 +19,10 @@ class ClientWiredStatelessChronicleEntrySet<K, V> extends MapStatelessClient<K,
public ClientWiredStatelessChronicleEntrySet(@NotNull final String channelName,
@NotNull final ClientWiredStatelessTcpConnectionHub hub,
final long cid,
@NotNull final Class<V> vClass) {
@NotNull final Class<V> vClass,
@NotNull final Class<K> kClass) {

super(channelName, hub, "entrySet", cid, vClass);
super(channelName, hub, "entrySet", cid, kClass, vClass);
}


Expand All @@ -46,7 +44,47 @@ public boolean contains(Object o) {
@NotNull
@Override
public Iterator<Map.Entry<K, V>> iterator() {
return null;
final int numberOfSegments = proxyReturnUint16(numberOfSegements);

// todo itterate the segments
return segmentIterator(1);

}


/**
* gets the iterator for a given segment
*
* @param segment the maps segment number
* @return and iterator for the {@code segment}
*/
@NotNull
Iterator<Map.Entry<K, V>> segmentIterator(int segment) {

final Map<K, V> e = new HashMap<K, V>();

proxyReturnWireConsumerInOut(iterator,

valueOut -> valueOut.uint16(segment),

wireIn -> {

final ValueIn read = wireIn.read(reply);

while (read.hasNextSequenceItem()) {

read.sequence(s -> s.marshallable(r -> {
final K k = r.read(key).object(kClass);
final V v = r.read(value).object(vClass);
e.put(k, v);
}));
}

return e;
});


return e.entrySet().iterator();
}

@NotNull
Expand Down Expand Up @@ -104,14 +142,17 @@ protected boolean eventReturnsNull(@NotNull EntrySetEventId methodName) {


enum Params implements WireKey {
key
key,
value,
segment
}

enum EntrySetEventId implements ParameterizeWireKey {
size,
isEmpty,
remove(key),
iterator;
numberOfSegements,
iterator(segment);

private final WireKey[] params;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -61,11 +58,11 @@ class ClientWiredStatelessChronicleMap<K, V> extends MapStatelessClient<K, V, Ev

public ClientWiredStatelessChronicleMap(
@NotNull final ClientWiredChronicleMapStatelessBuilder config,
@NotNull final Class kClass,
@NotNull final Class vClass,
@NotNull final Class<K> kClass,
@NotNull final Class<V> vClass,
@NotNull final String channelName,
@NotNull final ClientWiredStatelessTcpConnectionHub hub) {
super(channelName, hub, "MAP", 0, vClass);
super(channelName, hub, "MAP", 0, kClass, vClass);

this.putReturnsNull = config.putReturnsNull();
this.removeReturnsNull = config.removeReturnsNull();
Expand Down Expand Up @@ -206,7 +203,32 @@ public int hashCode() {

@NotNull
public String toString() {
return "todo";
final ClientWiredStatelessChronicleEntrySet<K, V> entrySet = entrySet();

final Iterator<Map.Entry<K, V>> entries = entrySet.segmentIterator(1);

if (!entries.hasNext())
return "{}";

StringBuilder sb = new StringBuilder();
sb.append('{');


while (entries.hasNext()) {

final Map.Entry<K, V> e = entries.next();
final K key = e.getKey();
final V value = e.getValue();
sb.append(key == this ? "(this Map)" : key);
sb.append('=');
sb.append(value == this ? "(this Map)" : value);
if (!entries.hasNext())
return sb.append('}').toString();
sb.append(',').append(' ');
}

return sb.toString();

}

@NotNull
Expand Down Expand Up @@ -324,7 +346,7 @@ public Collection<V> values() {
private final Map<Long, String> cidToCsp = new HashMap<>();

@NotNull
public Set<Map.Entry<K, V>> entrySet() {
public ClientWiredStatelessChronicleEntrySet entrySet() {


long cid = proxyReturnWireConsumer(entrySet, (WireIn wireIn) -> {
Expand All @@ -343,7 +365,7 @@ public Set<Map.Entry<K, V>> entrySet() {
return cidRef[0];
});

return new ClientWiredStatelessChronicleEntrySet<K, V>(channelName, hub, cid, vClass);
return new ClientWiredStatelessChronicleEntrySet<K, V>(channelName, hub, cid, vClass, kClass);
}


Expand Down
10 changes: 8 additions & 2 deletions src/main/java/net/openhft/chronicle/map/MapStatelessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
public abstract class MapStatelessClient<K, V, E extends ParameterizeWireKey>
extends AbstactStatelessClient<E> {

private Class<V> vClass;
Class<K> kClass;
Class<V> vClass;


/**
* @param channelName
* @param hub
* @param type the type of wire handler for example "MAP" or "QUEUE"
* @param cid used by proxies such as the entry-set
* @param kClass
*/
public MapStatelessClient(@NotNull String channelName,
@NotNull ClientWiredStatelessTcpConnectionHub hub,
@NotNull String type,
long cid, @NotNull final Class vClass) {
long cid,
@NotNull final Class<K> kClass,
@NotNull final Class<V> vClass) {
super(channelName, hub, type, cid);

this.vClass = vClass;
this.kClass = kClass;
}

@Nullable
Expand Down
39 changes: 32 additions & 7 deletions src/main/java/net/openhft/chronicle/map/MapWireHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static net.openhft.chronicle.engine.client.ClientWiredStatelessTcpConnectionHub.CoreFields.csp;
import static net.openhft.chronicle.engine.client.ClientWiredStatelessTcpConnectionHub.CoreFields.reply;
import static net.openhft.chronicle.engine.client.StringUtils.endsWith;
import static net.openhft.chronicle.map.AbstactStatelessClient.toParameters;
import static net.openhft.chronicle.map.MapWireHandler.EventId.*;
import static net.openhft.chronicle.map.MapWireHandler.Params.*;
import static net.openhft.chronicle.wire.Wires.acquireStringBuilder;
Expand Down Expand Up @@ -131,7 +130,6 @@ public void accept(@NotNull final WireHandlers wireHandlers) {
this.publishLater = wireHandlers;
}


@Override
public void process(@NotNull final Wire in, @NotNull final Wire out) throws StreamCorruptedException {
try {
Expand Down Expand Up @@ -234,24 +232,51 @@ public void accept(WireIn wireIn) {
// old value
if (EntrySetEventId.iterator.contentEquals(eventName)) {
write(b -> {
final ValueOut valueOut = outWire.writeEventName(() -> "entry");
b.delegate.entrySet().forEach(e ->
valueOut.sequence(toParameters(put,
e.getKey(),
e.getValue())));
final ValueOut valueOut = outWire.writeEventName(CoreFields.reply);
b.delegate.entrySet().forEach(e -> {

final Consumer<ValueOut> writer = out -> {

out.marshallable(m -> {

final byte[] key = (byte[]) e.getKey();
final byte[] value = (byte[]) e.getValue();

m.write(Params.key);
outWire.bytes().write(key);

m.write(Params.value);
outWire.bytes().write(value);

});

};
valueOut.sequence(writer);
}

);
}

);
return;
}


if (EntrySetEventId.numberOfSegements.contentEquals(eventName)) {
write(b -> outWire.write(reply).int32(1));
return;
}




throw new IllegalStateException("unsupported event=" + eventName);
}


// -- THESE METHODS ARE ONLY MAP METHODS


if (keySet.contentEquals(eventName)) {
throw new UnsupportedOperationException("todo");
}
Expand Down

0 comments on commit a6bd9e1

Please sign in to comment.