Skip to content

Commit

Permalink
gg-11017 : Lock-free implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ilantukh committed Apr 25, 2016
1 parent f1dbca5 commit 205f6d7
Showing 1 changed file with 31 additions and 26 deletions.
Expand Up @@ -2,14 +2,19 @@


import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.Page;
import org.jsr166.LongAdder8;


public class IgniteCacheDatabasePartitionManager { public class IgniteCacheDatabasePartitionManager {


Expand All @@ -33,7 +38,7 @@ public IgniteCacheDatabasePartitionManager(int partCount, Page page) {


Partition partition = new Partition(i); Partition partition = new Partition(i);


partition.lastApplied = cntr; partition.lastApplied.set(cntr);


parts.set(i, partition); parts.set(i, partition);
} }
Expand Down Expand Up @@ -95,51 +100,51 @@ private class Partition {


private final int id; private final int id;


private NavigableSet<Long> updates = new TreeSet<>(); private ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>();


private long lastApplied = 0; private AtomicLong lastApplied = new AtomicLong(0);


public Partition(int id) { public Partition(int id) {
this.id = id; this.id = id;
} }


public synchronized long getLastAppliedUpdate() { public long getLastAppliedUpdate() {
return lastApplied; return lastApplied.get();
} }


public synchronized long getLastReceivedUpdate() { public long getLastReceivedUpdate() {
if (updates.isEmpty()) if (updates.isEmpty())
return lastApplied; return lastApplied.get();


return updates.last(); return updates.lastKey();
} }


public synchronized void onUpdateReceived(long cntr) { public void onUpdateReceived(long cntr) {
if (cntr <= lastApplied) boolean changed = updates.putIfAbsent(cntr, true) == null;
return;

if (cntr > lastApplied + 1) {
updates.add(cntr);


if (!changed)
return; return;
}


int delta = 1; while (true) {
Map.Entry<Long, Boolean> entry = updates.firstEntry();


Iterator<Long> iterator = updates.iterator(); if (entry == null)
return;


while (iterator.hasNext()) { long first = entry.getKey();
long next = iterator.next();


if (next != lastApplied + delta + 1) long cntr0 = lastApplied.get();
break;


delta++; if (first <= cntr0)

updates.remove(first);
iterator.remove(); else if (first == cntr0 + 1)
if (lastApplied.compareAndSet(cntr0, first))
updates.remove(first);
else
break;
else
break;
} }

lastApplied += delta;
} }


} }
Expand Down

0 comments on commit 205f6d7

Please sign in to comment.