From 205f6d79021a172805e0abd9d1dcf7e90b68c1e7 Mon Sep 17 00:00:00 2001 From: Ilya Lantukh Date: Mon, 25 Apr 2016 16:14:33 +0300 Subject: [PATCH] gg-11017 : Lock-free implementation. --- .../IgniteCacheDatabasePartitionManager.java | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabasePartitionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabasePartitionManager.java index ef31edbea4460..2deecdd37d8c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabasePartitionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabasePartitionManager.java @@ -2,14 +2,19 @@ import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.internal.pagemem.Page; +import org.jsr166.LongAdder8; public class IgniteCacheDatabasePartitionManager { @@ -33,7 +38,7 @@ public IgniteCacheDatabasePartitionManager(int partCount, Page page) { Partition partition = new Partition(i); - partition.lastApplied = cntr; + partition.lastApplied.set(cntr); parts.set(i, partition); } @@ -95,51 +100,51 @@ private class Partition { private final int id; - private NavigableSet updates = new TreeSet<>(); + private ConcurrentNavigableMap updates = new ConcurrentSkipListMap<>(); - private long lastApplied = 0; + private AtomicLong lastApplied = new AtomicLong(0); public Partition(int id) { this.id = id; } - public synchronized long getLastAppliedUpdate() { - return lastApplied; + public long getLastAppliedUpdate() { + return lastApplied.get(); } - public synchronized long getLastReceivedUpdate() { + public long getLastReceivedUpdate() { if (updates.isEmpty()) - return lastApplied; + return lastApplied.get(); - return updates.last(); + return updates.lastKey(); } - public synchronized void onUpdateReceived(long cntr) { - if (cntr <= lastApplied) - return; - - if (cntr > lastApplied + 1) { - updates.add(cntr); + public void onUpdateReceived(long cntr) { + boolean changed = updates.putIfAbsent(cntr, true) == null; + if (!changed) return; - } - int delta = 1; + while (true) { + Map.Entry entry = updates.firstEntry(); - Iterator iterator = updates.iterator(); + if (entry == null) + return; - while (iterator.hasNext()) { - long next = iterator.next(); + long first = entry.getKey(); - if (next != lastApplied + delta + 1) - break; + long cntr0 = lastApplied.get(); - delta++; - - iterator.remove(); + if (first <= cntr0) + updates.remove(first); + else if (first == cntr0 + 1) + if (lastApplied.compareAndSet(cntr0, first)) + updates.remove(first); + else + break; + else + break; } - - lastApplied += delta; } }