Skip to content

Commit

Permalink
#218 Implementing lock based proxy manager for managed hazelcast
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-bukhtoyarov committed Jan 28, 2022
1 parent 642510d commit abe78de
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public <T> CompletableFuture<CommandResult<T>> executeAsync(K key, Request<T> re
throw new UnsupportedOperationException();
}

@Override
protected CompletableFuture<Void> removeAsync(Object key) {
return null;
}

protected abstract LockBasedTransaction allocateTransaction(K key);

protected abstract void releaseTransaction(LockBasedTransaction transaction);
Expand All @@ -70,13 +75,8 @@ private <T> CommandResult<T> execute(Request<T> request, LockBasedTransaction tr
transaction.begin();
try {
try {
byte[] persistedDataOnBeginOfTransaction;
LockResult lockResult = transaction.lock();
if (lockResult == LockResult.DATA_EXISTS_AND_LOCKED) {
persistedDataOnBeginOfTransaction = transaction.getData();
} else if (command.isInitializationCommand()) {
persistedDataOnBeginOfTransaction = null;
} else {
byte[] persistedDataOnBeginOfTransaction = transaction.lockAndGet();
if (persistedDataOnBeginOfTransaction == null && !command.isInitializationCommand()) {
return CommandResult.bucketNotFound();
}
GenericEntry entry = new GenericEntry(persistedDataOnBeginOfTransaction, request.getBackwardCompatibilityVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
* The typical flow is following:
* <ol>
* <li>begin - {@link #begin()}</li>
* <li>lock - {@link #lock()}</li>
* <li>getData - {@link #getData()}</li>
* <li>lock - {@link #lockAndGet()}</li>
* <li>update - {@link #update(byte[])}</li>
* <li>unlock - {@link #unlock()}</li>
* <li>commit - {@link #commit()}</li>
* </ol>
*/
Expand All @@ -50,25 +50,18 @@ public interface LockBasedTransaction {
void commit();

/**
* Locks data by the key associated with this transaction.
* There is strong guarantee that {@link #unlock()} will be called if {@link #lock()} returns successfully.
* Locks data by the key associated with this transaction and returns data that is associated with the key.
* There is strong guarantee that {@link #unlock()} will be called if {@link #lockAndGet()} returns successfully.
*
* @return lock result
* @return Returns the data by the key associated with this transaction, or null data associated with key does not exist
*/
LockResult lock();
byte[] lockAndGet();

/**
* Unlocks data by the key associated with this transaction.
*/
void unlock();

/**
* Returns the data by the key associated with this transaction.
*
* @return persisted state of bucket
*/
byte[] getData();

/**
* Creates the data by the key associated with this transaction.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractLockBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockBasedTransaction;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockResult;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class LockBasedProxyManagerMock<K> extends AbstractLockBasedProxyManager<K> {

Expand All @@ -34,17 +32,12 @@ public LockBasedProxyManagerMock(ClientSideConfig clientSideConfig) {
super(clientSideConfig);
}

@Override
protected CompletableFuture<Void> removeAsync(K key) {
stateMap.remove(key);
return CompletableFuture.completedFuture(null);
}

@Override
protected LockBasedTransaction allocateTransaction(K key) {
byte[] backup = stateMap.get(key);

return new LockBasedTransaction() {

@Override
public void begin() {
// do nothing
Expand Down Expand Up @@ -77,23 +70,15 @@ public void commit() {
}

@Override
public LockResult lock() {
return backup == null? LockResult.DATA_NOT_EXISTS_AND_LOCKED : LockResult.DATA_EXISTS_AND_LOCKED;
public byte[] lockAndGet() {
return backup;
}

@Override
public void unlock() {
// do nothing
}

@Override
public byte[] getData() {
if (backup == null) {
throw new IllegalStateException();
}
return backup;
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractLockBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockBasedTransaction;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockResult;

import javax.sql.DataSource;
import java.sql.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class AdvisoryLockBasedPostgreSQLProxyManager extends AbstractLockBasedProxyManager<Long> {

Expand Down Expand Up @@ -77,11 +75,6 @@ protected void releaseTransaction(LockBasedTransaction transaction) {
}
}

@Override
protected CompletableFuture<Void> removeAsync(Long key) {
throw new UnsupportedOperationException();
}

@Override
public void removeProxy(Long key) {
// TODO implement removal
Expand All @@ -93,8 +86,6 @@ private class PostgreAdvisoryLockBasedTransaction implements LockBasedTransactio
private final long key;
private final Connection connection;

private byte[] bucketStateBeforeTransaction;

private PostgreAdvisoryLockBasedTransaction(long key, Connection connection) {
this.key = key;
this.connection = connection;
Expand All @@ -114,7 +105,7 @@ public void begin() {
}

@Override
public LockResult lock() {
public byte[] lockAndGet() {
try {
// acquire pessimistic lock
String lockSQL = "SELECT pg_advisory_xact_lock(?)";
Expand All @@ -129,10 +120,9 @@ public LockResult lock() {
selectStatement.setLong(1, key);
try (ResultSet rs = selectStatement.executeQuery()) {
if (rs.next()) {
bucketStateBeforeTransaction = rs.getBytes("state");
return LockResult.DATA_EXISTS_AND_LOCKED;
return rs.getBytes("state");
} else {
return LockResult.DATA_NOT_EXISTS_AND_LOCKED;
return null;
}
}
}
Expand Down Expand Up @@ -212,11 +202,6 @@ public void unlock() {
// do nothing, because advisory lock will be auto unlocked when transaction finishes
}

@Override
public byte[] getData() {
return bucketStateBeforeTransaction;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractLockBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockBasedTransaction;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockResult;

import javax.sql.DataSource;
import java.sql.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class SelectForUpdateBasedPostgreSQLProxyManager extends AbstractLockBasedProxyManager<Long> {

Expand Down Expand Up @@ -77,11 +75,6 @@ protected void releaseTransaction(LockBasedTransaction transaction) {
}
}

@Override
protected CompletableFuture<Void> removeAsync(Long key) {
throw new UnsupportedOperationException();
}

@Override
public void removeProxy(Long key) {
// TODO implement removal
Expand All @@ -93,8 +86,6 @@ private class PostgreLockBasedTransaction implements LockBasedTransaction {
private final long key;
private final Connection connection;

private byte[] bucketStateBeforeTransaction;

private PostgreLockBasedTransaction(long key, Connection connection) {
this.key = key;
this.connection = connection;
Expand All @@ -114,19 +105,19 @@ public void begin() {
}

@Override
public LockResult lock() {
public byte[] lockAndGet() {
try {
String selectForUpdateSQL = "SELECT state FROM buckets WHERE id = ? FOR UPDATE";
try (PreparedStatement selectStatement = connection.prepareStatement(selectForUpdateSQL)) {
selectStatement.setLong(1, key);
try (ResultSet rs = selectStatement.executeQuery()) {
if (rs.next()) {
bucketStateBeforeTransaction = rs.getBytes("state");
byte[] bucketStateBeforeTransaction = rs.getBytes("state");
if (bucketStateBeforeTransaction != null) {
return LockResult.DATA_EXISTS_AND_LOCKED;
return bucketStateBeforeTransaction;
} else {
// we detected fake data that inserted by previous transaction
return LockResult.DATA_NOT_EXISTS_AND_LOCKED;
return null;
}
}
}
Expand All @@ -145,16 +136,16 @@ public LockResult lock() {
}
}

// it is need to execute select for update again in order to obtain the lock
// it needs to execute select for update again in order to obtain the lock
try (PreparedStatement selectStatement = connection.prepareStatement(selectForUpdateSQL)) {
selectStatement.setLong(1, key);
try (ResultSet rs = selectStatement.executeQuery()) {
if (!rs.next()) {
// query does not see the record which inserted on step above
throw new IllegalStateException("Something unexpected happen, it needs to read PostgreSQL manual");
}
// we need to return epmty Optional because bucket is not initialized yet
return LockResult.DATA_NOT_EXISTS_AND_LOCKED;
// we need to return empty Optional because bucket is not initialized yet
return null;
}
}
} catch (SQLException e) {
Expand Down Expand Up @@ -221,11 +212,6 @@ public void unlock() {
// do nothing, because locked rows will be auto unlocked when transaction finishes
}

@Override
public byte[] getData() {
return bucketStateBeforeTransaction;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.github.bucket4j.grid.hazelcast;

import com.hazelcast.core.IMap;
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractLockBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockBasedTransaction;

import java.util.Objects;

public class HazelcastLockBasedProxyManager<K> extends AbstractLockBasedProxyManager<K> {

private final IMap<K, byte[]> map;

public HazelcastLockBasedProxyManager(IMap<K, byte[]> map) {
this(map, ClientSideConfig.getDefault());
}

public HazelcastLockBasedProxyManager(IMap<K, byte[]> map, ClientSideConfig clientSideConfig) {
super(clientSideConfig);
this.map = Objects.requireNonNull(map);
}

@Override
public void removeProxy(K key) {
map.remove(key);
}

@Override
public boolean isAsyncModeSupported() {
// Because Hazelcast IMap does not provide "lockAsync" API.
return false;
}

@Override
protected LockBasedTransaction allocateTransaction(K key) {
return new LockBasedTransaction() {

@Override
public void begin() {
// do nothing
}

@Override
public void rollback() {
// do nothing
}

@Override
public void commit() {
// do nothing
}

@Override
public byte[] lockAndGet() {
map.lock(key);
return map.get(key);
}

@Override
public void unlock() {
map.unlock(key);
}

@Override
public void create(byte[] data) {
map.put(key, data);
}

@Override
public void update(byte[] data) {
map.put(key, data);
}
};
}

@Override
protected void releaseTransaction(LockBasedTransaction transaction) {
// do nothing
}

}

0 comments on commit abe78de

Please sign in to comment.