Skip to content

Commit

Permalink
Refactor Raft command requests to ensure consistency across requests …
Browse files Browse the repository at this point in the history
…by sharing a base type.
  • Loading branch information
kuujo committed Apr 25, 2015
1 parent b5a0314 commit acc5f13
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 821 deletions.
Expand Up @@ -403,10 +403,10 @@ public CompletableFuture<Buffer> read(Buffer key, Buffer entry, Consistency cons
.withEntry(entry) .withEntry(entry)
.withConsistency(consistency) .withConsistency(consistency)
.build(); .build();
runOnContext(request, state).whenComplete((response, error) -> { this.<ReadRequest, ReadResponse>runOnContext(request, state).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
if (response.status() == Response.Status.OK) { if (response.status() == Response.Status.OK) {
future.complete(response.asReadResponse().result()); future.complete(response.result());
} else { } else {
future.completeExceptionally(response.error().createException()); future.completeExceptionally(response.error().createException());
} }
Expand All @@ -428,10 +428,10 @@ public CompletableFuture<Buffer> write(Buffer key, Buffer entry, Consistency con
.withKey(key) .withKey(key)
.withEntry(entry) .withEntry(entry)
.build(); .build();
runOnContext(request, state).whenComplete((response, error) -> { this.<WriteRequest, WriteResponse>runOnContext(request, state).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
if (response.status() == Response.Status.OK) { if (response.status() == Response.Status.OK) {
future.complete(response.asWriteResponse().result()); future.complete(response.result());
} else { } else {
future.completeExceptionally(response.error().createException()); future.completeExceptionally(response.error().createException());
} }
Expand All @@ -452,10 +452,10 @@ public CompletableFuture<Buffer> delete(Buffer key, Buffer entry, Consistency co
DeleteRequest request = DeleteRequest.builder() DeleteRequest request = DeleteRequest.builder()
.withKey(key) .withKey(key)
.build(); .build();
runOnContext(request, state).whenComplete((response, error) -> { this.<DeleteRequest, DeleteResponse>runOnContext(request, state).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
if (response.status() == Response.Status.OK) { if (response.status() == Response.Status.OK) {
future.complete(response.asDeleteResponse().result()); future.complete(response.result());
} else { } else {
future.completeExceptionally(response.error().createException()); future.completeExceptionally(response.error().createException());
} }
Expand Down
Expand Up @@ -26,19 +26,19 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
abstract class AbstractRequest<T extends Request<T>> implements Request<T> { abstract class AbstractRequest<REQUEST extends Request<REQUEST>> implements Request<REQUEST> {
private final AtomicInteger references = new AtomicInteger(); private final AtomicInteger references = new AtomicInteger();
private ReferenceManager<T> referenceManager; private ReferenceManager<REQUEST> referenceManager;


protected AbstractRequest(ReferenceManager<T> referenceManager) { protected AbstractRequest(ReferenceManager<REQUEST> referenceManager) {
this.referenceManager = referenceManager; this.referenceManager = referenceManager;
} }


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T acquire() { public REQUEST acquire() {
references.incrementAndGet(); references.incrementAndGet();
return (T) this; return (REQUEST) this;
} }


@Override @Override
Expand All @@ -55,43 +55,43 @@ public int references() {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void close() { public void close() {
referenceManager.release((T) this); referenceManager.release((REQUEST) this);
} }


/** /**
* Abstract request builder. * Abstract request builder.
* *
* @param <T> The builder type. * @param <BUILDER> The builder type.
* @param <U> The request type. * @param <REQUEST> The request type.
*/ */
protected static abstract class Builder<T extends Builder<T, U>, U extends AbstractRequest<U>> implements Request.Builder<T, U> { protected static abstract class Builder<BUILDER extends Builder<BUILDER, REQUEST>, REQUEST extends AbstractRequest<REQUEST>> implements Request.Builder<BUILDER, REQUEST> {
protected final ReferencePool<U> pool; protected final ReferencePool<REQUEST> pool;
protected U request; protected REQUEST request;


protected Builder(Function<ReferenceManager<U>, U> factory) { protected Builder(Function<ReferenceManager<REQUEST>, REQUEST> factory) {
this.pool = new ReferencePool<>(factory); this.pool = new ReferencePool<>(factory);
} }


/** /**
* Resets the builder, acquiring a new request from the internal reference pool. * Resets the builder, acquiring a new request from the internal reference pool.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T reset() { BUILDER reset() {
request = pool.acquire(); request = pool.acquire();
return (T) this; return (BUILDER) this;
} }


/** /**
* Resets the builder with the given request. * Resets the builder with the given request.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T reset(U request) { BUILDER reset(REQUEST request) {
this.request = request; this.request = request;
return (T) this; return (BUILDER) this;
} }


@Override @Override
public U build() { public REQUEST build() {
return request; return request;
} }
} }
Expand Down
Expand Up @@ -27,13 +27,13 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
abstract class AbstractResponse<T extends Response<T>> implements Response<T> { abstract class AbstractResponse<RESPONSE extends Response<RESPONSE>> implements Response<RESPONSE> {
private final AtomicInteger references = new AtomicInteger(); private final AtomicInteger references = new AtomicInteger();
private final ReferenceManager<T> referenceManager; private final ReferenceManager<RESPONSE> referenceManager;
protected Status status = Status.OK; protected Status status = Status.OK;
protected RaftError error; protected RaftError error;


protected AbstractResponse(ReferenceManager<T> referenceManager) { protected AbstractResponse(ReferenceManager<RESPONSE> referenceManager) {
this.referenceManager = referenceManager; this.referenceManager = referenceManager;
} }


Expand All @@ -49,9 +49,9 @@ public RaftError error() {


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T acquire() { public RESPONSE acquire() {
references.incrementAndGet(); references.incrementAndGet();
return (T) this; return (RESPONSE) this;
} }


@Override @Override
Expand All @@ -68,7 +68,7 @@ public int references() {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void close() { public void close() {
referenceManager.release((T) this); referenceManager.release((RESPONSE) this);
} }


@Override @Override
Expand All @@ -79,55 +79,55 @@ public String toString() {
/** /**
* Abstract response builder. * Abstract response builder.
* *
* @param <T> The builder type. * @param <BUILDER> The builder type.
* @param <U> The response type. * @param <RESPONSE> The response type.
*/ */
protected static abstract class Builder<T extends Builder<T, U>, U extends AbstractResponse<U>> implements Response.Builder<T, U> { protected static abstract class Builder<BUILDER extends Builder<BUILDER, RESPONSE>, RESPONSE extends AbstractResponse<RESPONSE>> implements Response.Builder<BUILDER, RESPONSE> {
protected final ReferencePool<U> pool; protected final ReferencePool<RESPONSE> pool;
protected U response; protected RESPONSE response;


protected Builder(Function<ReferenceManager<U>, U> factory) { protected Builder(Function<ReferenceManager<RESPONSE>, RESPONSE> factory) {
this.pool = new ReferencePool<>(factory); this.pool = new ReferencePool<>(factory);
} }


/** /**
* Resets the builder, acquiring a new response from the internal reference pool. * Resets the builder, acquiring a new response from the internal reference pool.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T reset() { BUILDER reset() {
response = pool.acquire(); response = pool.acquire();
response.status = null; response.status = null;
response.error = null; response.error = null;
return (T) this; return (BUILDER) this;
} }


/** /**
* Resets the builder with the given response. * Resets the builder with the given response.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T reset(U response) { BUILDER reset(RESPONSE response) {
this.response = response; this.response = response;
return (T) this; return (BUILDER) this;
} }


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T withStatus(Status status) { public BUILDER withStatus(Status status) {
if (status == null) if (status == null)
throw new NullPointerException("status cannot be null"); throw new NullPointerException("status cannot be null");
response.status = status; response.status = status;
return (T) this; return (BUILDER) this;
} }


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T withError(RaftError error) { public BUILDER withError(RaftError error) {
response.error = error; response.error = error;
return (T) this; return (BUILDER) this;
} }


@Override @Override
public U build() { public RESPONSE build() {
if (response.status == null) if (response.status == null)
throw new NullPointerException("status cannot be null"); throw new NullPointerException("status cannot be null");
return response; return response;
Expand Down
@@ -0,0 +1,167 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.protocol.raft.rpc;

import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.io.util.ReferenceManager;

import java.util.Objects;
import java.util.function.Function;

/**
* Protocol command request.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public abstract class CommandRequest<REQUEST extends CommandRequest<REQUEST>> extends AbstractRequest<REQUEST> {
protected Buffer key;
protected Buffer entry;

public CommandRequest(ReferenceManager<REQUEST> referenceManager) {
super(referenceManager);
}

/**
* Returns the command key.
*
* @return The command key.
*/
public Buffer key() {
return key;
}

/**
* Returns the command entry.
*
* @return The command entry.
*/
public Buffer entry() {
return entry;
}

@Override
public void readObject(Buffer buffer) {
int keySize = buffer.readInt();
if (keySize > -1) {
key = buffer.slice(keySize);
buffer.skip(keySize);
}
int entrySize = buffer.readInt();
entry = buffer.slice(entrySize);
}

@Override
public void writeObject(Buffer buffer) {
if (key != null) {
buffer.writeInt((int) key.limit()).write(key);
} else {
buffer.writeInt(-1);
}
buffer.writeInt((int) entry.limit()).write(entry);
}

@Override
public void close() {
if (key != null)
key.release();
if (entry != null)
entry.release();
super.close();
}

@Override
public int hashCode() {
return Objects.hash(entry);
}

@Override
public boolean equals(Object object) {
if (object instanceof CommandRequest) {
CommandRequest request = (CommandRequest) object;
return ((request.key == null && key == null) || (request.key != null && key != null && request.key.equals(key)))
&& request.entry.equals(entry);
}
return false;
}

@Override
public String toString() {
return String.format("%s[entry=%s]", getClass().getSimpleName(), entry.toString());
}

/**
* Write request builder.
*/
public static abstract class Builder<BUILDER extends Builder<BUILDER, REQUEST>, REQUEST extends CommandRequest<REQUEST>> extends AbstractRequest.Builder<BUILDER, REQUEST> {

protected Builder(Function<ReferenceManager<REQUEST>, REQUEST> factory) {
super(factory);
}

/**
* Sets the request key.
*
* @param key The request key.
* @return The request builder.
*/
@SuppressWarnings("unchecked")
public BUILDER withKey(Buffer key) {
request.key = key;
return (BUILDER) this;
}

/**
* Sets the request entry.
*
* @param entry The request entry.
* @return The request builder.
*/
@SuppressWarnings("unchecked")
public BUILDER withEntry(Buffer entry) {
if (entry == null)
throw new NullPointerException("entry cannot be null");
request.entry = entry;
return (BUILDER) this;
}

@Override
public REQUEST build() {
super.build();
if (request.key != null)
request.key.acquire();
if (request.entry != null)
request.entry.acquire();
return request;
}

@Override
public int hashCode() {
return Objects.hash(request);
}

@Override
public boolean equals(Object object) {
return object instanceof Builder && ((Builder) object).request.equals(request);
}

@Override
public String toString() {
return String.format("%s[request=%s]", getClass().getCanonicalName(), request);
}

}

}

0 comments on commit acc5f13

Please sign in to comment.