Skip to content

Commit

Permalink
#410 simplify some remote java commands in order to its easier re-imp…
Browse files Browse the repository at this point in the history
…lementation in lua
  • Loading branch information
Vladimir Buhtoyarov committed Oct 14, 2023
1 parent 8fb0ff8 commit 5283bb7
Show file tree
Hide file tree
Showing 21 changed files with 156 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,9 @@ public <T> CommandResult<T> execute(K key, Request<T> request) {
cache.asMap().compute(key, (K k, RemoteBucketState previousState) -> {
Long clientSideTime = request.getClientSideTime();
long timeNanos = clientSideTime != null ? clientSideTime : System.currentTimeMillis() * 1_000_000;
RemoteBucketState[] stateHolder = new RemoteBucketState[] {
previousState == null ? null : previousState.copy()
};
MutableBucketEntry entry = new MutableBucketEntry() {
@Override
public boolean exists() {
return stateHolder[0] != null;
}
@Override
public void set(RemoteBucketState state) {
stateHolder[0] = state;
}
@Override
public RemoteBucketState get() {
return stateHolder[0];
}
};
resultHolder[0] = request.getCommand().execute(entry, timeNanos);
return stateHolder[0];
MutableBucketEntry entryWrapper = new MutableBucketEntry(previousState == null ? null : previousState.copy());
resultHolder[0] = request.getCommand().execute(entryWrapper, timeNanos);
return entryWrapper.exists() ? entryWrapper.get() : null;
});

return resultHolder[0];
Expand Down Expand Up @@ -147,4 +131,4 @@ private static long getCurrentTime(ClientSideConfig clientSideConfig) {
return clock.isPresent() ? clock.get().currentTimeNanos() : System.currentTimeMillis() * 1_000_000;
}

}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.github.bucket4j.TimeMeter;
import io.github.bucket4j.distributed.proxy.AbstractProxyManager;
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.GenericEntry;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.remote.Request;

Expand Down Expand Up @@ -69,14 +69,14 @@ public <T> CompletableFuture<CommandResult<T>> executeAsync(K key, Request<T> re
private <T> CommandResult<T> execute(Request<T> request, CompareAndSwapOperation operation) {
RemoteCommand<T> command = request.getCommand();
byte[] originalStateBytes = operation.getStateData().orElse(null);
GenericEntry entry = new GenericEntry(originalStateBytes, request.getBackwardCompatibilityVersion());
MutableBucketEntry entry = new MutableBucketEntry(originalStateBytes);
CommandResult<T> result = command.execute(entry, getClientSideTime());
if (!entry.isModified()) {
if (!entry.isStateModified()) {
return result;
}

byte[] newStateBytes = entry.getModifiedStateBytes();
if (operation.compareAndSwap(originalStateBytes, newStateBytes, entry.getModifiedState())) {
byte[] newStateBytes = entry.getStateBytes(request.getBackwardCompatibilityVersion());
if (operation.compareAndSwap(originalStateBytes, newStateBytes, entry.get())) {
return result;
} else {
return null;
Expand All @@ -97,14 +97,14 @@ private <T> CompletableFuture<CommandResult<T>> executeAsync(Request<T> request,
.thenApply((Optional<byte[]> originalStateBytes) -> originalStateBytes.orElse(null))
.thenCompose((byte[] originalStateBytes) -> {
RemoteCommand<T> command = request.getCommand();
GenericEntry entry = new GenericEntry(originalStateBytes, request.getBackwardCompatibilityVersion());
MutableBucketEntry entry = new MutableBucketEntry(originalStateBytes);
CommandResult<T> result = command.execute(entry, getClientSideTime());
if (!entry.isModified()) {
if (!entry.isStateModified()) {
return CompletableFuture.completedFuture(result);
}

byte[] newStateBytes = entry.getModifiedStateBytes();
return operation.compareAndSwap(originalStateBytes, newStateBytes, entry.getModifiedState()).thenApply((casWasSuccessful) -> casWasSuccessful? result : null);
byte[] newStateBytes = entry.getStateBytes(request.getBackwardCompatibilityVersion());
return operation.compareAndSwap(originalStateBytes, newStateBytes, entry.get()).thenApply((casWasSuccessful) -> casWasSuccessful? result : null);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import io.github.bucket4j.TimeMeter;
import io.github.bucket4j.distributed.proxy.AbstractProxyManager;
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.GenericEntry;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockAndGetResult;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.remote.Request;

Expand Down Expand Up @@ -90,14 +89,14 @@ private <T> CommandResult<T> execute(Request<T> request, LockBasedTransaction tr
}

try {
GenericEntry entry = new GenericEntry(persistedDataOnBeginOfTransaction, request.getBackwardCompatibilityVersion());
MutableBucketEntry entry = new MutableBucketEntry(persistedDataOnBeginOfTransaction);
CommandResult<T> result = command.execute(entry, super.getClientSideTime());
if (entry.isModified()) {
byte[] bytes = entry.getModifiedStateBytes();
if (entry.isStateModified()) {
byte[] bytes = entry.getStateBytes(request.getBackwardCompatibilityVersion());
if (persistedDataOnBeginOfTransaction == null) {
transaction.create(bytes, entry.getModifiedState());
transaction.create(bytes, entry.get());
} else {
transaction.update(bytes, entry.getModifiedState());
transaction.update(bytes, entry.get());
}
}
transaction.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import io.github.bucket4j.TimeMeter;
import io.github.bucket4j.distributed.proxy.AbstractProxyManager;
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.generic.GenericEntry;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.remote.Request;

import java.util.concurrent.CompletableFuture;


/**
* The base class for proxy managers that built on top of idea that underlining storage provide Compare-And-Swap functionality.
*
Expand Down Expand Up @@ -116,11 +118,12 @@ private <T> CommandResult<T> execute(Request<T> request, SelectForUpdateBasedTra
}

try {
GenericEntry entry = new GenericEntry(persistedDataOnBeginOfTransaction, request.getBackwardCompatibilityVersion());
MutableBucketEntry entry = new MutableBucketEntry(persistedDataOnBeginOfTransaction);
CommandResult<T> result = command.execute(entry, super.getClientSideTime());
if (entry.isModified()) {
byte[] bytes = entry.getModifiedStateBytes();
transaction.update(bytes, entry.getModifiedState());
if (entry.isStateModified()) {
RemoteBucketState modifiedState = entry.get();
byte[] bytes = entry.getStateBytes(request.getBackwardCompatibilityVersion());
transaction.update(bytes, modifiedState);
}
transaction.commit();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import io.github.bucket4j.distributed.proxy.optimization.*;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MultiResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.remote.BucketEntryWrapper;
import io.github.bucket4j.distributed.remote.commands.ConsumeIgnoringRateLimitsCommand;
import io.github.bucket4j.distributed.remote.commands.CreateSnapshotCommand;
import io.github.bucket4j.distributed.remote.commands.MultiCommand;
Expand Down Expand Up @@ -111,7 +111,7 @@ private <T> CommandResult<T> tryConsumeLocally(RemoteCommand<T> command) {
}

// execute local command
BucketEntryWrapper entry = new BucketEntryWrapper(state.copy());
MutableBucketEntry entry = new MutableBucketEntry(state.copy());
CommandResult<T> result = command.execute(entry, currentTimeNanos);
long locallyConsumedTokens = command.getConsumedTokens(result.getData());
if (locallyConsumedTokens == Long.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private <T> CommandResult<T> tryConsumeLocally(RemoteCommand<T> command) {
}

// execute local command
BucketEntryWrapper entry = new BucketEntryWrapper(state.copy());
MutableBucketEntry entry = new MutableBucketEntry(state.copy());
CommandResult<T> result = command.execute(entry, currentTimeNanos);
long locallyConsumedTokens = command.getConsumedTokens(result.getData());
if (locallyConsumedTokens == Long.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private <T> CommandResult<T> tryConsumeLocally(RemoteCommand<T> command) {
MultiCommand multiCommand = new MultiCommand(commands);

// execute local command
BucketEntryWrapper entry = new BucketEntryWrapper(state.copy());
MutableBucketEntry entry = new MutableBucketEntry(state.copy());
MultiResult multiResult = multiCommand.execute(entry, currentTimeNanos).getData();

if (multiCommand.getConsumedTokens(multiResult) == Long.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import io.github.bucket4j.distributed.proxy.AsyncCommandExecutor;
import io.github.bucket4j.distributed.proxy.CommandExecutor;
import io.github.bucket4j.distributed.proxy.optimization.OptimizationListener;
import io.github.bucket4j.distributed.remote.BucketEntryWrapper;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MultiResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.remote.commands.CreateSnapshotCommand;
Expand Down Expand Up @@ -108,7 +108,7 @@ private <T> CommandResult<T> tryExecuteLocally(RemoteCommand<T> command) {
}

// execute local command
BucketEntryWrapper entry = new BucketEntryWrapper(state.copy());
MutableBucketEntry entry = new MutableBucketEntry(state.copy());
CommandResult<T> result = command.execute(entry, currentTimeNanos);
long locallyConsumedTokens = command.getConsumedTokens(result.getData());
if (locallyConsumedTokens == Long.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import static io.github.bucket4j.distributed.serialization.InternalSerializationHelper.*;

public abstract class AbstractBinaryTransaction implements MutableBucketEntry {
public abstract class AbstractBinaryTransaction {

private final byte[] requestBytes;
private Version backwardCompatibilityVersion;
Expand All @@ -48,9 +48,22 @@ public byte[] execute() {
backwardCompatibilityVersion = request.getBackwardCompatibilityVersion();

try {
RemoteBucketState currentState = null;
if (exists()) {
byte[] stateBytes = getRawState();
currentState = deserializeState(stateBytes);
}
MutableBucketEntry entryWrapper = new MutableBucketEntry(currentState);

long time = request.getClientSideTime() != null? request.getClientSideTime(): System.currentTimeMillis() * 1_000_000;
RemoteCommand<?> command = request.getCommand();
CommandResult<?> result = command.execute(this, time);
CommandResult<?> result = command.execute(entryWrapper, time);

if (entryWrapper.isStateModified()) {
RemoteBucketState newState = entryWrapper.get();
setRawState(serializeState(newState, backwardCompatibilityVersion));
}

return serializeResult(result, request.getBackwardCompatibilityVersion());
} catch (UnsupportedTypeException e) {
return serializeResult(CommandResult.unsupportedType(e.getTypeId()), backwardCompatibilityVersion);
Expand All @@ -61,22 +74,11 @@ public byte[] execute() {
}
}

@Override
public void set(RemoteBucketState state) {
byte[] stateBytes = serializeState(state, backwardCompatibilityVersion);
setRawState(stateBytes);
}

@Override
public RemoteBucketState get() {
byte[] stateBytes = getRawState();
return deserializeState(stateBytes);
}

protected abstract byte[] getRawState();

protected abstract void setRawState(byte[] stateBytes);

public abstract boolean exists();


}

This file was deleted.

Loading

0 comments on commit 5283bb7

Please sign in to comment.