Skip to content

Commit

Permalink
applying review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraRoatis committed May 17, 2018
1 parent 42e02af commit 99d9b24
Showing 1 changed file with 55 additions and 45 deletions.
100 changes: 55 additions & 45 deletions modMcf/src/org/aion/mcf/trie/JournalPruneDataSource.java
Expand Up @@ -35,6 +35,7 @@
package org.aion.mcf.trie;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.aion.base.db.IByteArrayKeyValueDatabase;
Expand Down Expand Up @@ -83,18 +84,14 @@ public int getTotRefs() {
// block hash => updates
private LinkedHashMap<ByteArrayWrapper, Updates> blockUpdates = new LinkedHashMap<>();
private Updates currentUpdates = new Updates();
private boolean enabled = true;
private AtomicBoolean enabled = new AtomicBoolean(true);

public JournalPruneDataSource(IByteArrayKeyValueDatabase src) {
this.src = src;
}

public void setPruneEnabled(boolean e) {
lock.writeLock().lock();

enabled = e;

lock.writeLock().unlock();
public void setPruneEnabled(boolean _enabled) {
enabled.set(_enabled);
}

public void put(byte[] key, byte[] value) {
Expand All @@ -103,28 +100,32 @@ public void put(byte[] key, byte[] value) {
lock.writeLock().lock();

try {
ByteArrayWrapper keyW = new ByteArrayWrapper(key);

// Check to see the value exists.
if (value != null) {
if (enabled.get()) {
// pruning enabled
ByteArrayWrapper keyW = ByteArrayWrapper.wrap(key);

// If it exists and pruning is enabled.
if (enabled) {
// Check to see the value exists.
if (value != null) {
// If it exists and pruning is enabled.
currentUpdates.insertedKeys.add(keyW);
incRef(keyW);
}

// put to source database.
src.put(key, value);
// put to source database.
src.put(key, value);

} else {
checkOpen();
} else {
checkOpen();

// Value does not exist, so we delete from current updates
if (enabled) {
// Value does not exist, so we delete from current updates
currentUpdates.deletedKeys.add(keyW);
}
// delete is not sent to source db
} else {
// pruning disabled
if (value != null) {
src.put(key, value);
} else {
checkOpen();
}
}
} catch (Exception e) {
if (e instanceof RuntimeException) {
Expand All @@ -138,17 +139,17 @@ public void put(byte[] key, byte[] value) {
}

public void delete(byte[] key) {
if (!enabled.get()) {
return;
}
checkNotNull(key);

lock.writeLock().lock();

try {
checkOpen();

if (!enabled) {
return;
}
currentUpdates.deletedKeys.add(new ByteArrayWrapper(key));
currentUpdates.deletedKeys.add(ByteArrayWrapper.wrap(key));
// delete is delayed
} catch (Exception e) {
if (e instanceof RuntimeException) {
Expand All @@ -169,19 +170,23 @@ public void putBatch(Map<byte[], byte[]> inputMap) {

try {
Map<byte[], byte[]> insertsOnly = new HashMap<>();
for (Map.Entry<byte[], byte[]> entry : inputMap.entrySet()) {
ByteArrayWrapper keyW = new ByteArrayWrapper(entry.getKey());
if (entry.getValue() != null) {
if (enabled) {
if (enabled.get()) {
for (Map.Entry<byte[], byte[]> entry : inputMap.entrySet()) {
ByteArrayWrapper keyW = ByteArrayWrapper.wrap(entry.getKey());
if (entry.getValue() != null) {
currentUpdates.insertedKeys.add(keyW);
incRef(keyW);
}
insertsOnly.put(entry.getKey(), entry.getValue());
} else {
if (enabled) {
insertsOnly.put(entry.getKey(), entry.getValue());
} else {
currentUpdates.deletedKeys.add(keyW);
}
}
} else {
for (Map.Entry<byte[], byte[]> entry : inputMap.entrySet()) {
if (entry.getValue() != null) {
insertsOnly.put(entry.getKey(), entry.getValue());
}
}
}
src.putBatch(insertsOnly);
} catch (Exception e) {
Expand Down Expand Up @@ -214,13 +219,14 @@ private Ref decRef(ByteArrayWrapper keyW) {
}

public void storeBlockChanges(byte[] blockHash, long blockNumber) {
if (!enabled.get()) {
return;
}

lock.writeLock().lock();

try {
if (!enabled) {
return;
}
ByteArrayWrapper hash = new ByteArrayWrapper(blockHash);
ByteArrayWrapper hash = ByteArrayWrapper.wrap(blockHash);
currentUpdates.blockHeader = hash;
currentUpdates.blockNumber = blockNumber;
blockUpdates.put(hash, currentUpdates);
Expand All @@ -231,13 +237,14 @@ public void storeBlockChanges(byte[] blockHash, long blockNumber) {
}

public void prune(byte[] blockHash, long blockNumber) {
if (!enabled.get()) {
return;
}

lock.writeLock().lock();

try {
if (!enabled) {
return;
}
ByteArrayWrapper blockHashW = new ByteArrayWrapper(blockHash);
ByteArrayWrapper blockHashW = ByteArrayWrapper.wrap(blockHash);
Updates updates = blockUpdates.remove(blockHashW);
if (updates != null) {
for (ByteArrayWrapper insertedKey : updates.insertedKeys) {
Expand Down Expand Up @@ -313,6 +320,7 @@ public Optional<byte[]> get(byte[] key) {
try {
return src.get(key);
} catch (Exception e) {
LOG.error("Could not get key due to ", e);
throw e;
} finally {
lock.readLock().unlock();
Expand All @@ -324,6 +332,7 @@ public Set<byte[]> keys() {
try {
return src.keys();
} catch (Exception e) {
LOG.error("Could not get keys due to ", e);
throw e;
} finally {
lock.readLock().unlock();
Expand Down Expand Up @@ -353,18 +362,18 @@ public void commitBatch() {

@Override
public void deleteBatch(Collection<byte[]> keys) {
if (!enabled.get()) {
return;
}
checkNotNull(keys);

lock.writeLock().lock();

try {
checkOpen();

if (!enabled) {
return;
}
// deletes are delayed
keys.forEach(key -> currentUpdates.deletedKeys.add(new ByteArrayWrapper(key)));
keys.forEach(key -> currentUpdates.deletedKeys.add(ByteArrayWrapper.wrap(key)));
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw e;
Expand All @@ -389,6 +398,7 @@ public boolean isEmpty() {
return src.isEmpty();
}
} catch (Exception e) {
LOG.error("Could not check if empty due to ", e);
throw e;
} finally {
lock.readLock().unlock();
Expand Down

0 comments on commit 99d9b24

Please sign in to comment.