Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Cleaned up some stuff to not need another thread pool. Unfortunately …

…certain requests now have to be serialized but it is better me thinks.

Also fixed up compilation errors due to merge.
  • Loading branch information...
commit b461f08949be9e73c4f20c47793285daea52b18e 1 parent bcedd3d
William Burns authored
View
2  .gitignore
@@ -2,6 +2,8 @@
*.iws
*.ipr
*.iml
+.project
+.classpath
.idea/
.idea
classes/
View
4 src/org/jgroups/demos/ExecutionServiceDemo.java
@@ -94,14 +94,14 @@ protected ByteBufferStreamable(ByteBuffer buffer) {
}
@Override
- public void writeTo(DataOutputStream out) throws IOException {
+ public void writeTo(DataOutput out) throws IOException {
int size = buffer.limit() - buffer.position();
out.writeInt(size);
out.write(buffer.array(), buffer.position(), size);
}
@Override
- public void readFrom(DataInputStream in) throws IOException,
+ public void readFrom(DataInput in) throws IOException,
IllegalAccessException, InstantiationException {
buffer = ByteBuffer.allocate(in.readInt());
in.readFully(buffer.array());
View
37 src/org/jgroups/protocols/CENTRAL_EXECUTOR.java
@@ -175,45 +175,26 @@ protected void copyQueueTo(List<Address> new_joiners) {
// @see org.jgroups.protocols.Executing#sendToCoordinator(org.jgroups.protocols.Executing.Type, long, org.jgroups.Address)
@Override
protected void sendToCoordinator(Type type, final long requestId, final Address value) {
- Runnable runnable = null;
if (is_coord) {
+ if(log.isTraceEnabled())
+ log.trace("[redirect] <--> [" + local_addr + "] "
+ + type.name() + " [" + value
+ + (requestId != -1 ? " request id: " + requestId : "")
+ + "]");
switch(type) {
case RUN_REQUEST:
- runnable = new Runnable() {
- @Override
- public void run() {
- handleTaskRequest(requestId, value);
- }
- };
+ handleTaskRequest(requestId, value);
break;
case CONSUMER_READY:
- runnable = new Runnable() {
- @Override
- public void run() {
- handleConsumerReadyRequest(requestId, value);
- }
- };
+ handleConsumerReadyRequest(requestId, value);
break;
case CONSUMER_UNREADY:
- runnable = new Runnable() {
- @Override
- public void run() {
- // TODO: make it a handle method instead
- Owner consumer = new Owner(value, requestId);
- _consumersAvailable.remove(consumer);
- sendRemoveConsumerRequest(consumer);
- }
- };
+ handleConsumerUnreadyRequest(requestId, value);
break;
};
}
-
- if (runnable != null) {
- _executor.execute(runnable);
- }
- else {
+ else
sendRequest(coord, type, requestId, value);
- }
}
// @see org.jgroups.protocols.Executing#sendNewRunRequest(org.jgroups.protocols.Executing.Owner)
View
90 src/org/jgroups/protocols/Executing.java
@@ -1,6 +1,11 @@
package org.jgroups.protocols;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -18,8 +23,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
@@ -132,8 +135,6 @@
*/
protected Queue<Owner> _consumersAvailable = new ArrayDeque<Owner>();
- protected ExecutorService _executor = Executors.newFixedThreadPool(4);
-
protected static enum Type {
RUN_REQUEST, // request to coordinator from client to tell of a new task request
CONSUMER_READY, // request to coordinator from server to tell of a new consumer ready
@@ -293,20 +294,21 @@ else if (value instanceof Serializable ||
}
if (local_addr.equals(owner.getAddress())) {
+ if(log.isTraceEnabled())
+ log.trace("[redirect] <--> [" + local_addr + "] "
+ + type.name() + " [" + value
+ + (owner.requestId != -1 ? " request id: " +
+ owner.requestId : "")
+ + "]");
final Owner finalOwner = owner;
- _executor.execute(new Runnable() {
- @Override
- public void run() {
- if (type == Type.RESULT_SUCCESS) {
- handleValueResponse(local_addr,
- finalOwner.requestId, valueToSend);
- }
- else if (type == Type.RESULT_EXCEPTION){
- handleExceptionResponse(local_addr,
- finalOwner.requestId, (Throwable)valueToSend);
- }
- }
- });
+ if (type == Type.RESULT_SUCCESS) {
+ handleValueResponse(local_addr,
+ finalOwner.requestId, valueToSend);
+ }
+ else if (type == Type.RESULT_EXCEPTION){
+ handleExceptionResponse(local_addr,
+ finalOwner.requestId, (Throwable)valueToSend);
+ }
}
else {
sendRequest(owner.getAddress(), type, owner.requestId,
@@ -441,9 +443,7 @@ public Object up(Event evt) {
handleConsumerReadyRequest(req.request, (Address)req.object);
break;
case CONSUMER_UNREADY:
- Owner consumer = new Owner((Address)req.object, req.request);
- _consumersAvailable.remove(consumer);
- sendRemoveConsumerRequest(consumer);
+ handleConsumerUnreadyRequest(req.request, (Address)req.object);
break;
case CONSUMER_FOUND:
handleConsumerFoundResponse(req.request, (Address)req.object);
@@ -469,7 +469,7 @@ else if (objectToRun instanceof Callable) {
req.request);
break;
case RUN_REJECTED:
- // TODO: make this localfied
+ // We could make requests local for this, but is it really worth it
handleTaskRejectedResponse(msg.getSrc(), req.request);
break;
case RESULT_SUCCESS:
@@ -480,7 +480,7 @@ else if (objectToRun instanceof Callable) {
(Throwable)req.object);
break;
case INTERRUPT_RUN:
- // TODO: make this localfied
+ // We could make requests local for this, but is it really worth it
handleInterruptRequest(msg.getSrc(), req.request);
break;
case CREATE_CONSUMER_READY:
@@ -582,19 +582,8 @@ protected void handleTaskRequest(long requestId, Address address) {
}
if (consumer != null) {
- if (local_addr.equals(source.getAddress())) {
- _executor.execute(new Runnable() {
- @Override
- public void run() {
- handleConsumerFoundResponse(consumer.getRequestId(),
- consumer.getAddress());
- }
- });
- }
- else {
- sendRequest(source.getAddress(), Type.CONSUMER_FOUND,
- consumer.getRequestId(), consumer.getAddress());
- }
+ sendRequest(source.getAddress(), Type.CONSUMER_FOUND,
+ consumer.getRequestId(), consumer.getAddress());
sendRemoveConsumerRequest(consumer);
}
else {
@@ -619,24 +608,20 @@ protected void handleConsumerReadyRequest(long requestId, Address address) {
}
if (requestor != null) {
- if (local_addr.equals(requestor.getAddress())) {
- _executor.execute(new Runnable() {
- @Override
- public void run() {
- handleConsumerFoundResponse(source.getRequestId(), source.getAddress());
- }
- });
- }
- else {
- sendRequest(requestor.getAddress(), Type.CONSUMER_FOUND,
- source.getRequestId(), source.getAddress());
- }
+ sendRequest(requestor.getAddress(), Type.CONSUMER_FOUND,
+ source.getRequestId(), source.getAddress());
sendRemoveRunRequest(requestor);
}
else {
sendNewConsumerRequest(source);
}
}
+
+ protected void handleConsumerUnreadyRequest(long requestId, Address address) {
+ Owner consumer = new Owner(address, requestId);
+ _consumersAvailable.remove(consumer);
+ sendRemoveConsumerRequest(consumer);
+ }
protected void handleConsumerFoundResponse(long request, Address address) {
final Runnable runnable = _awaitingConsumer.poll();
@@ -656,12 +641,7 @@ protected void handleConsumerFoundResponse(long request, Address address) {
_awaitingReturn.put(owner, runnable);
// If local we pass along without serializing
if (local_addr.equals(owner.getAddress())) {
- _executor.submit(new Runnable () {
- @Override
- public void run() {
- handleTaskSubmittedRequest(runnable, local_addr, requestId);
- }
- });
+ handleTaskSubmittedRequest(runnable, local_addr, requestId);
}
else {
if (runnable instanceof DistributedFuture) {
@@ -695,7 +675,7 @@ protected void handleTaskSubmittedRequest(Runnable runnable, Address source,
* in case if the _tasks.take() call isn't registered quick
* enough after sending the Type.CONSUMER_READY message
*/
- received = _tasks.offer(runnable, 100, TimeUnit.MILLISECONDS);
+ received = _tasks.offer(runnable, 1000, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();

0 comments on commit b461f08

Please sign in to comment.
Something went wrong with that request. Please try again.