Skip to content

Commit

Permalink
Remove AsyncMapStream, release resources with null data handler
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont committed Sep 13, 2017
1 parent 88297b6 commit c932fe9
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 79 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/shareddata/AsyncMap.java
Expand Up @@ -191,11 +191,11 @@ public interface AsyncMap<K, V> {
* In this case, the invocation will result in an {@link OutOfMemoryError}. * In this case, the invocation will result in an {@link OutOfMemoryError}.
* <p> * <p>
* The stream will be automatically closed if it fails or ends. * The stream will be automatically closed if it fails or ends.
* Otherwise you must invoke {@link AsyncMapStream#close(Handler)} after usage to avoid leaking resources. * Otherwise you must set a null {@link ReadStream#handler(Handler) data handler} after usage to avoid leaking resources.
* *
* @return a stream of map keys * @return a stream of map keys
*/ */
AsyncMapStream<K> keyStream(); ReadStream<K> keyStream();




/** /**
Expand All @@ -206,11 +206,11 @@ public interface AsyncMap<K, V> {
* In this case, the invocation will result in an {@link OutOfMemoryError}. * In this case, the invocation will result in an {@link OutOfMemoryError}.
* <p> * <p>
* The stream will be automatically closed if it fails or ends. * The stream will be automatically closed if it fails or ends.
* Otherwise you must invoke {@link AsyncMapStream#close(Handler)} after usage to avoid leaking resources. * Otherwise you must set a null {@link ReadStream#handler(Handler) data handler} after usage to avoid leaking resources.
* *
* @return a stream of map values * @return a stream of map values
*/ */
AsyncMapStream<V> valueStream(); ReadStream<V> valueStream();




/** /**
Expand All @@ -221,10 +221,10 @@ public interface AsyncMap<K, V> {
* In this case, the invocation will result in an {@link OutOfMemoryError}. * In this case, the invocation will result in an {@link OutOfMemoryError}.
* <p> * <p>
* The stream will be automatically closed if it fails or ends. * The stream will be automatically closed if it fails or ends.
* Otherwise you must invoke {@link AsyncMapStream#close(Handler)} after usage to avoid leaking resources. * Otherwise you must set a null {@link ReadStream#handler(Handler) data handler} after usage to avoid leaking resources.
* *
* @return a stream of map entries * @return a stream of map entries
*/ */
@GenIgnore @GenIgnore
AsyncMapStream<Entry<K, V>> entryStream(); ReadStream<Entry<K, V>> entryStream();
} }
34 changes: 0 additions & 34 deletions src/main/java/io/vertx/core/shareddata/AsyncMapStream.java

This file was deleted.

Expand Up @@ -23,12 +23,12 @@
import io.vertx.core.impl.Arguments; import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.shareddata.AsyncMap; import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.AsyncMapStream;
import io.vertx.core.shareddata.Counter; import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.LocalMap; import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.shareddata.Lock; import io.vertx.core.shareddata.Lock;
import io.vertx.core.shareddata.SharedData; import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.streams.ReadStream;


import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -237,17 +237,17 @@ public void entries(Handler<AsyncResult<Map<K, V>>> asyncResultHandler) {
} }


@Override @Override
public AsyncMapStream<K> keyStream() { public ReadStream<K> keyStream() {
return delegate.keyStream(); return delegate.keyStream();
} }


@Override @Override
public AsyncMapStream<V> valueStream() { public ReadStream<V> valueStream() {
return delegate.valueStream(); return delegate.valueStream();
} }


@Override @Override
public AsyncMapStream<Map.Entry<K, V>> entryStream() { public ReadStream<Map.Entry<K, V>> entryStream() {
return delegate.entryStream(); return delegate.entryStream();
} }
} }
Expand Down
23 changes: 11 additions & 12 deletions src/test/java/io/vertx/test/core/ClusterWideMapTest.java
Expand Up @@ -23,8 +23,8 @@
import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.AsyncMap; import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.AsyncMapStream;
import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.fakecluster.FakeClusterManager; import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -654,7 +654,7 @@ public void testKeyStream() {
Map<JsonObject, Buffer> map = genJsonToBuffer(100); Map<JsonObject, Buffer> map = genJsonToBuffer(100);
loadData(map, (vertx, asyncMap) -> { loadData(map, (vertx, asyncMap) -> {
List<JsonObject> keys = new ArrayList<>(); List<JsonObject> keys = new ArrayList<>();
AsyncMapStream<JsonObject> stream = asyncMap.keyStream(); ReadStream<JsonObject> stream = asyncMap.keyStream();
long pause = 500; long pause = 500;
Long start = System.nanoTime(); Long start = System.nanoTime();
stream.endHandler(end -> { stream.endHandler(end -> {
Expand Down Expand Up @@ -685,7 +685,7 @@ public void testValueStream() {
Map<JsonObject, Buffer> map = genJsonToBuffer(100); Map<JsonObject, Buffer> map = genJsonToBuffer(100);
loadData(map, (vertx, asyncMap) -> { loadData(map, (vertx, asyncMap) -> {
List<Buffer> values = new ArrayList<>(); List<Buffer> values = new ArrayList<>();
AsyncMapStream<Buffer> stream = asyncMap.valueStream(); ReadStream<Buffer> stream = asyncMap.valueStream();
AtomicInteger idx = new AtomicInteger(); AtomicInteger idx = new AtomicInteger();
long pause = 500; long pause = 500;
Long start = System.nanoTime(); Long start = System.nanoTime();
Expand Down Expand Up @@ -719,7 +719,7 @@ public void testEntryStream() {
Map<JsonObject, Buffer> map = genJsonToBuffer(100); Map<JsonObject, Buffer> map = genJsonToBuffer(100);
loadData(map, (vertx, asyncMap) -> { loadData(map, (vertx, asyncMap) -> {
List<Entry<JsonObject, Buffer>> entries = new ArrayList<>(); List<Entry<JsonObject, Buffer>> entries = new ArrayList<>();
AsyncMapStream<Entry<JsonObject, Buffer>> stream = asyncMap.entryStream(); ReadStream<Entry<JsonObject, Buffer>> stream = asyncMap.entryStream();
long pause = 500; long pause = 500;
Long start = System.nanoTime(); Long start = System.nanoTime();
stream.endHandler(end -> { stream.endHandler(end -> {
Expand Down Expand Up @@ -750,19 +750,18 @@ public void testClosedKeyStream() {
Map<JsonObject, Buffer> map = genJsonToBuffer(100); Map<JsonObject, Buffer> map = genJsonToBuffer(100);
loadData(map, (vertx, asyncMap) -> { loadData(map, (vertx, asyncMap) -> {
List<JsonObject> keys = new ArrayList<>(); List<JsonObject> keys = new ArrayList<>();
AsyncMapStream<JsonObject> stream = asyncMap.keyStream(); ReadStream<JsonObject> stream = asyncMap.keyStream();
stream.exceptionHandler(t -> { stream.exceptionHandler(t -> {
fail(t); fail(t);
}).handler(jsonObject -> { }).handler(jsonObject -> {
keys.add(jsonObject); keys.add(jsonObject);
if (jsonObject.getInteger("key") == 38) { if (jsonObject.getInteger("key") == 38) {
stream.close(onSuccess(v -> { stream.handler(null);
int emitted = keys.size(); int emitted = keys.size();
vertx.setTimer(500, tid -> { vertx.setTimer(500, tid -> {
assertTrue("Items emitted after close", emitted == keys.size()); assertTrue("Items emitted after close", emitted == keys.size());
testComplete(); testComplete();
}); });
}));
} }
}); });
}); });
Expand Down
Expand Up @@ -25,7 +25,6 @@
import io.vertx.core.impl.TaskQueue; import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.shareddata.AsyncMap; import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.AsyncMapStream;
import io.vertx.core.shareddata.Counter; import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock; import io.vertx.core.shareddata.Lock;
import io.vertx.core.shareddata.impl.AsynchronousCounter; import io.vertx.core.shareddata.impl.AsynchronousCounter;
Expand All @@ -34,6 +33,7 @@
import io.vertx.core.spi.cluster.ChoosableIterable; import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener; import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.core.streams.ReadStream;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
Expand Down Expand Up @@ -351,17 +351,17 @@ public void entries(Handler<AsyncResult<Map<K, V>>> asyncResultHandler) {
} }


@Override @Override
public AsyncMapStream<K> keyStream() { public ReadStream<K> keyStream() {
return new IterableStream<>(vertx.getOrCreateContext(), map.keySet()); return new IterableStream<>(vertx.getOrCreateContext(), map.keySet());
} }


@Override @Override
public AsyncMapStream<V> valueStream() { public ReadStream<V> valueStream() {
return new IterableStream<>(vertx.getOrCreateContext(), map.values()); return new IterableStream<>(vertx.getOrCreateContext(), map.values());
} }


@Override @Override
public AsyncMapStream<Entry<K, V>> entryStream() { public ReadStream<Entry<K, V>> entryStream() {
return new IterableStream<>(vertx.getOrCreateContext(), map.entrySet()); return new IterableStream<>(vertx.getOrCreateContext(), map.entrySet());
} }


Expand Down
27 changes: 8 additions & 19 deletions src/test/java/io/vertx/test/fakecluster/IterableStream.java
Expand Up @@ -16,11 +16,8 @@


package io.vertx.test.fakecluster; package io.vertx.test.fakecluster;


import io.vertx.core.AsyncResult;
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.shareddata.AsyncMapStream;


import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
Expand All @@ -29,7 +26,7 @@
/** /**
* @author Thomas Segismont * @author Thomas Segismont
*/ */
public class IterableStream<T> implements AsyncMapStream<T> { public class IterableStream<T> implements io.vertx.core.streams.ReadStream<T> {


private static final int BATCH_SIZE = 10; private static final int BATCH_SIZE = 10;


Expand Down Expand Up @@ -57,9 +54,13 @@ public synchronized IterableStream<T> exceptionHandler(Handler<Throwable> handle
@Override @Override
public synchronized IterableStream<T> handler(Handler<T> handler) { public synchronized IterableStream<T> handler(Handler<T> handler) {
checkClosed(); checkClosed();
this.dataHandler = handler; if (handler == null) {
if (dataHandler != null && !paused) { closed = true;
doRead(); } else {
dataHandler = handler;
if (!paused) {
doRead();
}
} }
return this; return this;
} }
Expand Down Expand Up @@ -137,16 +138,4 @@ public synchronized IterableStream<T> endHandler(Handler<Void> handler) {
endHandler = handler; endHandler = handler;
return this; return this;
} }

@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
context.runOnContext(v -> {
synchronized (this) {
closed = true;
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
}
});
}
} }

0 comments on commit c932fe9

Please sign in to comment.