Skip to content

Commit

Permalink
Moved testFloodReplicatedMapWithDeletedEntries() test to a separate c…
Browse files Browse the repository at this point in the history
…lass from TcpReplicationSoakTest because they require different old deleted entries cleanup timeout configurations
  • Loading branch information
leventov committed Oct 3, 2015
1 parent 71599fd commit 7460074
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 41 deletions.
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.map;

import com.google.common.collect.Maps;
import net.openhft.chronicle.hash.replication.SingleChronicleHashReplication;
import net.openhft.chronicle.hash.replication.TcpTransportAndNetworkConfig;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.model.Byteable;
import net.openhft.lang.model.DataValueClasses;
import net.openhft.lang.values.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class OldDeletedEntriesCleanupTest {

private ChronicleMap<Integer, CharSequence> map1;
private ChronicleMap<Integer, CharSequence> map2;
static int s_port = 8093;

@Before
public void setup() throws IOException {
IntValue value = DataValueClasses.newDirectReference(IntValue.class);
((Byteable) value).bytes(new ByteBufferBytes(ByteBuffer.allocateDirect(4)), 0);

final InetSocketAddress endpoint = new InetSocketAddress("localhost", s_port + 1);

{
final TcpTransportAndNetworkConfig tcpConfig1 = TcpTransportAndNetworkConfig.of(s_port,
endpoint).autoReconnectedUponDroppedConnection(true)
.heartBeatInterval(1, TimeUnit.SECONDS)
.tcpBufferSize(1024 * 64);


map1 = ChronicleMapBuilder.of(Integer.class, CharSequence.class)
.entries(Builder.SIZE + Builder.SIZE)
.removedEntryCleanupTimeout(1, TimeUnit.MILLISECONDS)
.actualSegments(1)
.averageValue("test" + 1000)
.replication(SingleChronicleHashReplication.builder()
.tcpTransportAndNetwork(tcpConfig1)
.name("map1")
.createWithId((byte) 1))
.instance()
.name("map1")
.create();
}
{
final TcpTransportAndNetworkConfig tcpConfig2 = TcpTransportAndNetworkConfig.of
(s_port + 1).autoReconnectedUponDroppedConnection(true)
.heartBeatInterval(1, TimeUnit.SECONDS)
.tcpBufferSize(1024 * 64);

map2 = ChronicleMapBuilder.of(Integer.class, CharSequence.class)
.entries(Builder.SIZE + Builder.SIZE)
.removedEntryCleanupTimeout(1, TimeUnit.MILLISECONDS)
.averageValue("test" + 1000)
.replication(SingleChronicleHashReplication.builder()
.tcpTransportAndNetwork(tcpConfig2)
.name("map2")
.createWithId((byte) 2))
.instance()
.name("map2")
.create();

}
s_port += 2;
}


@After
public void tearDown() throws InterruptedException {

for (final Closeable closeable : new Closeable[]{map1, map2}) {
try {
closeable.close();
} catch (Exception e) {
e.printStackTrace();
}
}

System.gc();
}

Set<Thread> threads;

@Before
public void sampleThreads() {
threads = Thread.getAllStackTraces().keySet();
}

@After
public void checkThreadsShutdown() {
ChannelReplicationTest.checkThreadsShutdown(threads);
}


@Test
public void testFloodReplicatedMapWithDeletedEntries() throws InterruptedException {
try {
Random r = ThreadLocalRandom.current();
NavigableSet<Integer> put = new TreeSet<>();
for (int i = 0; i < Builder.SIZE * 10; i++) {
if (r.nextBoolean() || put.isEmpty()) {
int key = r.nextInt();
map1.put(key, "test");
put.add(key);
} else {
Integer key = put.pollFirst();
map1.remove(key);
}
// Thread.sleep(0, 1000);
}
System.out.println("\nwaiting till equal");

waitTillEqual(15000);

if (!map1.equals(map2))
Assert.assertEquals(Maps.difference(map1, map2).toString(), map1, map2);
} finally {
map1.close();
map2.close();
}
}

private void waitTillEqual(final int timeOutMs) throws InterruptedException {

Map map1UnChanged = new HashMap();
Map map2UnChanged = new HashMap();

int numberOfTimesTheSame = 0;
long startTime = System.currentTimeMillis();
for (int t = 0; t < timeOutMs + 100; t++) {
if (map1.equals(map2)) {
if (map1.equals(map1UnChanged) && map2.equals(map2UnChanged)) {
numberOfTimesTheSame++;
} else {
numberOfTimesTheSame = 0;
map1UnChanged = new HashMap(map1);
map2UnChanged = new HashMap(map2);
}
Thread.sleep(1);
if (numberOfTimesTheSame == 10) {
System.out.println("same");
break;
}

}
Thread.sleep(1);
if (System.currentTimeMillis() - startTime > timeOutMs)
break;
}
}
}
80 changes: 39 additions & 41 deletions src/test/java/net/openhft/chronicle/map/TcpReplicationSoakTest.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@


package net.openhft.chronicle.map; package net.openhft.chronicle.map;


import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import net.openhft.chronicle.hash.replication.ReplicableEntry;
import net.openhft.chronicle.hash.replication.SingleChronicleHashReplication; import net.openhft.chronicle.hash.replication.SingleChronicleHashReplication;
import net.openhft.chronicle.hash.replication.TcpTransportAndNetworkConfig; import net.openhft.chronicle.hash.replication.TcpTransportAndNetworkConfig;
import net.openhft.lang.io.ByteBufferBytes; import net.openhft.lang.io.ByteBufferBytes;
Expand All @@ -38,7 +40,6 @@
*/ */
public class TcpReplicationSoakTest { public class TcpReplicationSoakTest {



private ChronicleMap<Integer, CharSequence> map1; private ChronicleMap<Integer, CharSequence> map1;
private ChronicleMap<Integer, CharSequence> map2; private ChronicleMap<Integer, CharSequence> map2;
static int s_port = 8093; static int s_port = 8093;
Expand All @@ -50,23 +51,25 @@ public void setup() throws IOException {


final InetSocketAddress endpoint = new InetSocketAddress("localhost", s_port + 1); final InetSocketAddress endpoint = new InetSocketAddress("localhost", s_port + 1);


ChronicleMapBuilder<Integer, CharSequence> builder = ChronicleMapBuilder
.of(Integer.class, CharSequence.class)
.entries(Builder.SIZE + Builder.SIZE)
.actualSegments(1)
.averageValue("test" + 1000);

{ {
final TcpTransportAndNetworkConfig tcpConfig1 = TcpTransportAndNetworkConfig.of(s_port, final TcpTransportAndNetworkConfig tcpConfig1 = TcpTransportAndNetworkConfig.of(s_port,
endpoint).autoReconnectedUponDroppedConnection(true) endpoint).autoReconnectedUponDroppedConnection(true)
.heartBeatInterval(1, TimeUnit.SECONDS) .heartBeatInterval(1, TimeUnit.SECONDS)
.tcpBufferSize(1024 * 64); .tcpBufferSize(1024 * 64);




map1 = ChronicleMapBuilder.of(Integer.class, CharSequence.class) map1 = builder
.entries(Builder.SIZE + Builder.SIZE) .instance()
.removedEntryCleanupTimeout(1, TimeUnit.MILLISECONDS) .replicated(SingleChronicleHashReplication.builder()
.actualSegments(1)
.averageValue("test" + 1000)
.replication(SingleChronicleHashReplication.builder()
.tcpTransportAndNetwork(tcpConfig1) .tcpTransportAndNetwork(tcpConfig1)
.name("map1") .name("map1")
.createWithId((byte) 1)) .createWithId((byte) 1))
.instance()
.name("map1") .name("map1")
.create(); .create();
} }
Expand All @@ -76,18 +79,14 @@ public void setup() throws IOException {
.heartBeatInterval(1, TimeUnit.SECONDS) .heartBeatInterval(1, TimeUnit.SECONDS)
.tcpBufferSize(1024 * 64); .tcpBufferSize(1024 * 64);


map2 = ChronicleMapBuilder.of(Integer.class, CharSequence.class) map2 = builder
.entries(Builder.SIZE + Builder.SIZE) .instance()
.removedEntryCleanupTimeout(1, TimeUnit.MILLISECONDS) .replicated(SingleChronicleHashReplication.builder()
.averageValue("test" + 1000)
.replication(SingleChronicleHashReplication.builder()
.tcpTransportAndNetwork(tcpConfig2) .tcpTransportAndNetwork(tcpConfig2)
.name("map2") .name("map2")
.createWithId((byte) 2)) .createWithId((byte) 2))
.instance()
.name("map2") .name("map2")
.create(); .create();

} }
s_port += 2; s_port += 2;
} }
Expand Down Expand Up @@ -144,39 +143,38 @@ public void testSoakTestWithRandomData() throws IOException, InterruptedExceptio


waitTillEqual(15000); waitTillEqual(15000);


Assert.assertEquals(map1, map2); if (map1.equals(map2)) {
System.out.println("same");
return;
}
MapDifference<Integer, CharSequence> difference = Maps.difference(map1, map2);
Map<Integer, CharSequence> onlyOnMap1 = difference.entriesOnlyOnLeft();
System.out.println("only on map1:");
for (Integer k : onlyOnMap1.keySet()) {
printState(map1, k);
}
Map<Integer, CharSequence> onlyOnMap2 = difference.entriesOnlyOnRight();
System.out.println("only on map2:");
for (Integer k : onlyOnMap2.keySet()) {
printState(map2, k);
}
if (!map1.equals(map2)) {
Assert.assertEquals(difference.toString(), map1, map2);
}
} finally { } finally {
map1.close(); map1.close();
map2.close(); map2.close();
} }


} }


@Test private void printState(ChronicleMap<Integer, CharSequence> m, Integer k) {
public void testFloodReplicatedMapWithDeletedEntries() throws InterruptedException { try (ExternalMapQueryContext<Integer, CharSequence, ?> q = m.queryContext(k)) {
try { MapEntry<Integer, CharSequence> entry = q.entry();
Random r = ThreadLocalRandom.current(); ReplicableEntry re = (ReplicableEntry) entry;
NavigableSet<Integer> put = new TreeSet<>(); System.out.println(k + "=" + entry.value() + ", isChanged=" +
for (int i = 0; i < Builder.SIZE * 10; i++) { re.isChanged() + " oTs: " + re.originTimestamp() +
if (r.nextBoolean() || put.isEmpty()) { " oId: " + re.originIdentifier());
int key = r.nextInt();
map1.put(key, "test");
put.add(key);
} else {
Integer key = put.pollFirst();
map1.remove(key);
}
// Thread.sleep(0, 1000);
}
System.out.println("\nwaiting till equal");

waitTillEqual(15000);

if (!map1.equals(map2))
Assert.assertEquals(Maps.difference(map1, map2).toString(), map1, map2);
} finally {
map1.close();
map2.close();
} }
} }


Expand Down

0 comments on commit 7460074

Please sign in to comment.