Skip to content

Commit

Permalink
commits some fixes but still and issue with "wire entry set"
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Mar 12, 2015
1 parent 9cc5b85 commit baf9013
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 27 deletions.
Expand Up @@ -19,7 +19,7 @@
package net.openhft.chronicle.map;

import net.openhft.chronicle.hash.ChronicleHashStatelessClientBuilder;
import net.openhft.chronicle.map.ChronicleMap;import net.openhft.chronicle.map.ClientWiredStatelessChronicleMap;import net.openhft.chronicle.map.ClientWiredStatelessClientTcpConnectionHub;import net.openhft.chronicle.map.MapBuilder;import net.openhft.lang.MemoryUnit;
import net.openhft.lang.MemoryUnit;

import java.io.IOException;
import java.lang.Class;import java.lang.IllegalStateException;import java.lang.Override;import java.lang.String;import java.net.InetSocketAddress;
Expand All @@ -29,26 +29,26 @@
/**
* @author Rob Austin.
*/
public final class ClientWiredChronicleMapStatelessClientBuilder<K, V> implements
ChronicleHashStatelessClientBuilder<ClientWiredChronicleMapStatelessClientBuilder<K, V>,
public final class ClientWiredChronicleMapStatelessBuilder<K, V> implements
ChronicleHashStatelessClientBuilder<ClientWiredChronicleMapStatelessBuilder<K, V>,
ChronicleMap<K, V>>,
MapBuilder<ClientWiredChronicleMapStatelessClientBuilder<K, V>> {
MapBuilder<ClientWiredChronicleMapStatelessBuilder<K, V>> {

ClientWiredStatelessClientTcpConnectionHub hub;
ClientWiredStatelessTcpConnectionHub hub;
private Class keyClass;
private Class valueClass;
private byte localIdentifier;
private short channelID;
private boolean doHandShaking;

public ClientWiredChronicleMapStatelessClientBuilder(InetSocketAddress remoteAddress, Class keyClass, Class valueClass, short channelID) {
public ClientWiredChronicleMapStatelessBuilder(InetSocketAddress remoteAddress, Class keyClass, Class valueClass, short channelID) {
this.keyClass = keyClass;
this.valueClass = valueClass;
this.remoteAddress = remoteAddress;
this.channelID = channelID;
}

public ClientWiredChronicleMapStatelessClientBuilder(ClientWiredStatelessClientTcpConnectionHub hub, Class keyClass, Class valueClass, short channelID) {
public ClientWiredChronicleMapStatelessBuilder(ClientWiredStatelessTcpConnectionHub hub, Class keyClass, Class valueClass, short channelID) {
this.keyClass = keyClass;
this.valueClass = valueClass;
this.hub = hub;
Expand Down Expand Up @@ -79,7 +79,7 @@ InetSocketAddress remoteAddress() {
}

@Override
public ClientWiredChronicleMapStatelessClientBuilder<K, V> putReturnsNull(boolean putReturnsNull) {
public ClientWiredChronicleMapStatelessBuilder<K, V> putReturnsNull(boolean putReturnsNull) {
this.putReturnsNull = putReturnsNull;
return this;
}
Expand All @@ -89,7 +89,7 @@ boolean putReturnsNull() {
}

@Override
public ClientWiredChronicleMapStatelessClientBuilder<K, V> removeReturnsNull(boolean removeReturnsNull) {
public ClientWiredChronicleMapStatelessBuilder<K, V> removeReturnsNull(boolean removeReturnsNull) {
this.removeReturnsNull = removeReturnsNull;
return this;
}
Expand All @@ -99,7 +99,7 @@ boolean removeReturnsNull() {
}

@Override
public ClientWiredChronicleMapStatelessClientBuilder<K, V> timeout(long timeout, TimeUnit units) {
public ClientWiredChronicleMapStatelessBuilder<K, V> timeout(long timeout, TimeUnit units) {
this.timeoutMs = units.toMillis(timeout);
return this;
}
Expand All @@ -109,7 +109,7 @@ long timeoutMs() {
}

@Override
public ClientWiredChronicleMapStatelessClientBuilder<K, V> name(String name) {
public ClientWiredChronicleMapStatelessBuilder<K, V> name(String name) {
this.name = name;
return this;
}
Expand All @@ -119,7 +119,7 @@ String name() {
}

@Override
public ClientWiredChronicleMapStatelessClientBuilder<K, V> tcpBufferSize(int tcpBufferSize) {
public ClientWiredChronicleMapStatelessBuilder<K, V> tcpBufferSize(int tcpBufferSize) {
this.tcpBufferSize = tcpBufferSize;
return this;
}
Expand All @@ -133,7 +133,7 @@ public ChronicleMap<K, V> create() throws IOException {

// todo clean this up
if (hub == null)
hub = new ClientWiredStatelessClientTcpConnectionHub(this, localIdentifier, doHandShaking);
hub = new ClientWiredStatelessTcpConnectionHub(this, localIdentifier, doHandShaking);

if (!used.getAndSet(true)) {
return new ClientWiredStatelessChronicleMap<K, V>(this, keyClass, valueClass, channelID);
Expand All @@ -155,7 +155,7 @@ public short channelID() {
}


public ClientWiredStatelessClientTcpConnectionHub hub() {
public ClientWiredStatelessTcpConnectionHub hub() {
return hub;
}

Expand Down
Expand Up @@ -44,7 +44,7 @@ class ClientWiredStatelessChronicleMap<K, V> implements ChronicleMap<K, V>, Clon

private static final Logger LOG = LoggerFactory.getLogger(ClientWiredStatelessChronicleMap.class);

private final ClientWiredStatelessClientTcpConnectionHub hub;
private final ClientWiredStatelessTcpConnectionHub hub;

protected Class<K> kClass;
protected Class<V> vClass;
Expand All @@ -53,7 +53,7 @@ class ClientWiredStatelessChronicleMap<K, V> implements ChronicleMap<K, V>, Clon
private boolean removeReturnsNull;
private short channelID;

public ClientWiredStatelessChronicleMap(@NotNull final ClientWiredChronicleMapStatelessClientBuilder config,
public ClientWiredStatelessChronicleMap(@NotNull final ClientWiredChronicleMapStatelessBuilder config,
@NotNull final Class kClass,
@NotNull final Class vClass,
short channelID) {
Expand Down Expand Up @@ -199,7 +199,38 @@ public int hashCode() {

@NotNull
public String toString() {
return hub.proxyReturnString(TO_STRING.toString(), channelID);
final Map<K, V> result = toMap();
return result.toString();
}

private Map<K, V> toMap() {
final long startTime = System.currentTimeMillis();
final Map<K, V> result = new HashMap<K, V>();

// send
final long transactionId = proxySend("ENTRY_SET", startTime);
assert !hub.outBytesLock().isHeldByCurrentThread();
final long timeoutTime = startTime + hub.timeoutMs;

for (; ; ) {

// receive
hub.inBytesLock().lock();
try {
final Wire wireIn = hub.proxyReply(timeoutTime, transactionId);

if (wireIn.read(Fields.HAS_NEXT).bool()) {
result.put(
readK(Fields.RESULT_KEY, wireIn, null),
readV(Fields.RESULT_VALUE, wireIn, null));
} else
break;

} finally {
hub.inBytesLock().unlock();
}
}
return result;
}

@NotNull
Expand Down
Expand Up @@ -45,7 +45,7 @@
/**
* Created by Rob Austin
*/
public class ClientWiredStatelessClientTcpConnectionHub {
public class ClientWiredStatelessTcpConnectionHub {

private static final Logger LOG = LoggerFactory.getLogger(StatelessChronicleMap.class);

Expand All @@ -59,7 +59,7 @@ public class ClientWiredStatelessClientTcpConnectionHub {

@NotNull
private final AtomicLong transactionID = new AtomicLong(0);
private final ClientWiredChronicleMapStatelessClientBuilder config;
private final ClientWiredChronicleMapStatelessBuilder config;
@Nullable
protected CloseablesManager closeables;

Expand All @@ -84,7 +84,7 @@ public class ClientWiredStatelessClientTcpConnectionHub {
private boolean doHandShaking;


public ClientWiredStatelessClientTcpConnectionHub(ClientWiredChronicleMapStatelessClientBuilder config, byte localIdentifier, boolean doHandShaking) {
public ClientWiredStatelessTcpConnectionHub(ClientWiredChronicleMapStatelessBuilder config, byte localIdentifier, boolean doHandShaking) {
this.localIdentifier = localIdentifier;
this.doHandShaking = doHandShaking;
this.remoteAddress = config.remoteAddress();
Expand Down
41 changes: 34 additions & 7 deletions src/main/java/net/openhft/chronicle/map/MapWireHandler.java
Expand Up @@ -125,7 +125,7 @@ public void process(Wire in, Wire out) throws StreamCorruptedException {
}

private void writeChunked(long transactionId,
@NotNull final Function<Map, Iterator<byte[]>> function,
@NotNull final Function<BytesChronicleMap, Iterator> function,
@NotNull final Consumer<Iterator<byte[]>> c) {

final BytesChronicleMap m = bytesMap(channelId);
Expand Down Expand Up @@ -155,6 +155,37 @@ private void writeChunked(long transactionId,

}

private void writeEntryChunked(long transactionId,
@NotNull final Function<BytesChronicleMap, Iterator> function,
@NotNull final Consumer<Iterator<Map.Entry<byte[], byte[]>>> c) {

final BytesChronicleMap m = bytesMap(channelId);
final Iterator<Map.Entry<byte[], byte[]>> iterator = function.apply(m);

// this allows us to write more data than the buffer will allow
out = () -> {

// each chunk has its own transaction-id
outWire.write(TRANSACTION_ID).int64(transactionId);

write(map -> {

boolean hasNext = iterator.hasNext();
outWire.write(HAS_NEXT).bool(hasNext);

if (hasNext)
c.accept(iterator);
else
// setting out to NULL denotes that there are no more chunks
out = null;
});

};

out.run();

}

@SuppressWarnings("UnusedReturnValue")
void onEvent() {

Expand Down Expand Up @@ -182,12 +213,12 @@ void onEvent() {
}

if ("VALUES".contentEquals(methodName)) {
writeChunked(transactionId, map -> map.values().iterator(), writeElement);
writeChunked(transactionId, map -> map.delegate.values().iterator(), writeElement);
return;
}

if ("ENTRY_SET".contentEquals(methodName)) {
writeChunked(transactionId, map -> map.entrySet().iterator(), writeEntry);
writeEntryChunked(transactionId, m -> m.delegate.entrySet().iterator(), writeEntry);
return;
}

Expand Down Expand Up @@ -303,10 +334,6 @@ void onEvent() {
return;
}

if ("TO_STRING".contentEquals(methodName)) {
write(b -> outWire.write(RESULT).text(b.toString()));
return;
}

if ("APPLICATION_VERSION".contentEquals(methodName)) {
write(b -> outWire.write(RESULT).text(applicationVersion()));
Expand Down

0 comments on commit baf9013

Please sign in to comment.