Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge pull request #19 from esteban-aliverti/master

Async DialogueHelper
  • Loading branch information...
commit d729375efcd419491ec51dcc7ac377741507c5d0 2 parents 72a348a + 837b3af
Esteban authored August 13, 2012
254  drools-mas-generic-client/src/main/java/org/drools/mas/helpers/DialogueHelper.java
@@ -4,18 +4,23 @@
4 4
 import java.net.MalformedURLException;
5 5
 import java.net.URL;
6 6
 import java.net.URLConnection;
  7
+import java.util.ArrayList;
7 8
 import java.util.LinkedHashMap;
8 9
 import java.util.List;
9 10
 import java.util.UUID;
  11
+import java.util.concurrent.ExecutorService;
  12
+import java.util.concurrent.Executors;
10 13
 import java.util.logging.Level;
11 14
 import java.util.logging.Logger;
12 15
 import javax.xml.namespace.QName;
13 16
 import javax.xml.ws.BindingProvider;
14  
-import org.drools.mas.*;
  17
+import org.drools.mas.ACLMessage;
  18
+import org.drools.mas.Act;
  19
+import org.drools.mas.Encodings;
15 20
 import org.drools.mas.body.acts.AbstractMessageBody;
16 21
 import org.drools.mas.body.acts.Inform;
17  
-import org.drools.mas.body.content.Action;
18 22
 import org.drools.mas.body.acts.InformIf;
  23
+import org.drools.mas.body.content.Action;
19 24
 import org.drools.mas.util.ACLMessageFactory;
20 25
 import org.drools.mas.util.MessageContentEncoder;
21 26
 import org.drools.mas.util.MessageContentFactory;
@@ -24,15 +29,45 @@
24 29
 public class DialogueHelper {
25 30
 
26 31
     public static int WSDL_RETRIEVAL_TIMEOUT = 2000;
  32
+    public static int EXECUTOR_SERVICE_THREAD_NUMBER = 5;
  33
+    
27 34
     private int connectionTimeout = 0;
28 35
     private int receiveTimeout = 0;
29 36
     
30 37
     boolean multiReturnValue = false;
31  
-    private AbstractMessageBody returnBody;
32 38
     private Encodings encode = Encodings.XML;
33 39
     private URL endpointURL;
34 40
     private QName qname;
35 41
     
  42
+    private ExecutorService executorService = Executors.newFixedThreadPool(EXECUTOR_SERVICE_THREAD_NUMBER);
  43
+    
  44
+    protected static interface DialogueHelperCommand{
  45
+        void execute();
  46
+    }
  47
+    
  48
+    private DialogueHelperCallback defaultDialogueHelperCallback = new DialogueHelperCallbackImpl(){
  49
+
  50
+        @Override
  51
+        public void onSuccess(List<ACLMessage> messages) {
  52
+        }
  53
+
  54
+        @Override
  55
+        public void onError(Throwable t) {
  56
+            Logger.getLogger(DialogueHelper.class.getName()).log(Level.SEVERE, "Agent invocation failed", t);
  57
+        }
  58
+        
  59
+
  60
+        @Override
  61
+        public long getTimeoutForResponses() {
  62
+            return 0;
  63
+        }
  64
+
  65
+        @Override
  66
+        public long getMinimumWaitTimeForResponses() {
  67
+            return 0;
  68
+        }
  69
+        
  70
+    };
36 71
     
37 72
     public DialogueHelper(String url) {
38 73
         this(url, 0);
@@ -79,15 +114,62 @@ private void checkEndpointAvailability(int wSDLRetrievalTimeout) {
79 114
         }
80 115
     }
81 116
 
  117
+    
  118
+
  119
+    public String invokeRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
  120
+        return this.doRequest(sender, receiver, methodName, args, callback);
  121
+    }
  122
+    
  123
+    public String invokeRequest(String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
  124
+        return invokeRequest(UUID.randomUUID().toString(), "", methodName, args, callback);
  125
+    }
  126
+    
  127
+    /**
  128
+     * 
  129
+     * @param sender
  130
+     * @param methodName
  131
+     * @param args
  132
+     * @return
  133
+     * @throws UnsupportedOperationException
  134
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  135
+     * be notified about exceptions in the service invocation.
  136
+     */
  137
+    @Deprecated
82 138
     public String invokeRequest(String sender, String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
83 139
         return invokeRequest(sender, "", methodName, args);
84 140
     }
85 141
 
  142
+    /**
  143
+     * 
  144
+     * @param methodName
  145
+     * @param args
  146
+     * @return
  147
+     * @throws UnsupportedOperationException
  148
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  149
+     * be notified about exceptions in the service invocation.
  150
+     */
  151
+    @Deprecated
86 152
     public String invokeRequest(String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
87 153
         return invokeRequest(UUID.randomUUID().toString(), "", methodName, args);
88 154
     }
89  
-
  155
+    
  156
+    /**
  157
+     * 
  158
+     * @param sender
  159
+     * @param receiver
  160
+     * @param methodName
  161
+     * @param args
  162
+     * @return
  163
+     * @throws UnsupportedOperationException
  164
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  165
+     * be notified about exceptions in the service invocation.
  166
+     */
  167
+    @Deprecated
90 168
     public String invokeRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
  169
+        return this.doRequest(sender, receiver, methodName, args, null);
  170
+    }
  171
+    
  172
+    protected String doRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
91 173
         multiReturnValue = false;
92 174
         for (Object o : args.values()) {
93 175
             if (o == Variable.v) {
@@ -95,73 +177,123 @@ public String invokeRequest(String sender, String receiver, String methodName, L
95 177
                 break;
96 178
             }
97 179
         }
98  
-        AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
99  
-
100 180
         ACLMessageFactory factory = new ACLMessageFactory(encode);
101 181
 
102 182
         Action action = MessageContentFactory.newActionContent(methodName, args);
103 183
         ACLMessage req = factory.newRequestMessage(sender, receiver, action);
104 184
 
105  
-        asyncServicePort.tell(req);
106  
-//        List<ACLMessage> answers = asyncServicePort.getResponses(req.getId());
107  
-
108  
-//        if ( answers.size() == 0 ) { return req.getId(); }
109  
-
110  
-//        ACLMessage answer = answers.get(0);
111  
-//        if ( ! Act.AGREE.equals( answer.getPerformative() ) ) {
112  
-//            throw new UnsupportedOperationException(" Request " + methodName + " was not agreed with args " + args );
113  
-//        }
114  
-//
115  
-//        if ( ! multiReturnValue ) {
116  
-//            returnBody = answers.size() == 2 ? ( (Inform) answers.get( 1 ).getBody() ) : null;
117  
-//        } else {
118  
-//            returnBody = answers.size() == 2 ? ( (InformRef) answers.get( 1 ).getBody() ) : null;
119  
-//        }
  185
+        this.tell(req, callback, callback != null);
  186
+
120 187
         return req.getId();
121 188
     }
122 189
 
  190
+    public String invokeQueryIf(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
  191
+        return this.doQueryIf(sender, receiver, proposition, callback);
  192
+    }
  193
+    
  194
+    /**
  195
+     * 
  196
+     * @param sender
  197
+     * @param receiver
  198
+     * @param proposition
  199
+     * @return
  200
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  201
+     * be notified about exceptions in the service invocation.
  202
+     */
  203
+    @Deprecated
123 204
     public String invokeQueryIf(String sender, String receiver, Object proposition) {
124  
-        AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
  205
+        return this.doQueryIf(sender, receiver, proposition, null);
  206
+    }
  207
+    
  208
+    protected String doQueryIf(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
125 209
 
126 210
         ACLMessageFactory factory = new ACLMessageFactory(Encodings.XML);
127  
-
128 211
         ACLMessage qryif = factory.newQueryIfMessage(sender, receiver, proposition);
129  
-        asyncServicePort.tell(qryif);
130  
-        List<ACLMessage> answers = asyncServicePort.getResponses(qryif.getId());
131  
-
132  
-        if (answers.size() == 0) {
133  
-            return qryif.getId();
134  
-        }
135 212
 
136  
-        returnBody = ((InformIf) answers.get(0).getBody());
  213
+        this.tell(qryif, callback, callback != null);
  214
+        
137 215
         return qryif.getId();
138 216
 
139 217
     }
140 218
 
  219
+    public String invokeInform(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
  220
+        return this.doInform(sender, receiver, proposition, callback);
  221
+    }
  222
+
  223
+    /**
  224
+     * 
  225
+     * @param sender
  226
+     * @param receiver
  227
+     * @param proposition
  228
+     * @return
  229
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  230
+     * be notified about exceptions in the service invocation.
  231
+     */
  232
+    @Deprecated
141 233
     public String invokeInform(String sender, String receiver, Object proposition) {
142  
-        AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
  234
+        return this.doInform(sender, receiver, proposition, null);
  235
+    }
  236
+
  237
+    protected String doInform(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
143 238
         ACLMessageFactory factory = new ACLMessageFactory(encode);
144 239
         ACLMessage newInformMessage = factory.newInformMessage(sender, receiver, proposition);
145  
-        asyncServicePort.tell(newInformMessage);
146 240
 
  241
+        this.tell(newInformMessage, callback, callback != null);
  242
+        
147 243
         return newInformMessage.getId();
148 244
     }
149 245
 
  246
+    public String invokeConfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
  247
+        return this.doConfirm(sender, receiver, proposition, callback);
  248
+    }
  249
+
  250
+    /**
  251
+     * 
  252
+     * @param sender
  253
+     * @param receiver
  254
+     * @param proposition
  255
+     * @return
  256
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  257
+     * be notified about exceptions in the service invocation.
  258
+     */
  259
+    @Deprecated
150 260
     public String invokeConfirm(String sender, String receiver, Object proposition) {
151  
-        AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
  261
+        return this.doConfirm(sender, receiver, proposition, null);
  262
+    }
  263
+    
  264
+    protected String doConfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
152 265
         ACLMessageFactory factory = new ACLMessageFactory(encode);
153 266
         ACLMessage newConfirmMessage = factory.newConfirmMessage(sender, receiver, proposition);
154  
-        asyncServicePort.tell(newConfirmMessage);
155 267
 
  268
+        this.tell(newConfirmMessage, callback, callback != null);
  269
+        
156 270
         return newConfirmMessage.getId();
157 271
     }
158 272
 
  273
+    public String invokeDisconfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
  274
+        return this.doDisconfirm(sender, receiver, proposition, callback);
  275
+    }
  276
+
  277
+    /**
  278
+     * 
  279
+     * @param sender
  280
+     * @param receiver
  281
+     * @param proposition
  282
+     * @return
  283
+     * @deprecated Without using a DialogueHelperCallback there is no way to 
  284
+     * be notified about exceptions in the service invocation.
  285
+     */
  286
+    @Deprecated
159 287
     public String invokeDisconfirm(String sender, String receiver, Object proposition) {
160  
-        AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
  288
+        return this.doDisconfirm(sender, receiver, proposition, null);
  289
+    }
  290
+    
  291
+    protected String doDisconfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
161 292
         ACLMessageFactory factory = new ACLMessageFactory(encode);
162 293
         ACLMessage newDisconfirmMessage = factory.newDisconfirmMessage(sender, receiver, proposition);
163  
-        asyncServicePort.tell(newDisconfirmMessage);
164 294
 
  295
+        this.tell(newDisconfirmMessage, callback, callback != null);
  296
+        
165 297
         return newDisconfirmMessage.getId();
166 298
     }
167 299
 
@@ -243,6 +375,54 @@ private AsyncDroolsAgentService getAsyncDroolsAgentService(){
243 375
         
244 376
     }
245 377
     
  378
+    protected void tell(final ACLMessage message, DialogueHelperCallback callback, final boolean waitForAnswers){
  379
+        
  380
+        final DialogueHelperCallback finalCallback = callback != null? callback : this.defaultDialogueHelperCallback;
  381
+        
  382
+        Runnable runnable = new Runnable() {
  383
+
  384
+            private AsyncDroolsAgentService asyncDroolsAgentService;
  385
+            
  386
+            public void run() {
  387
+                try{
  388
+                    asyncDroolsAgentService = getAsyncDroolsAgentService();
  389
+                    asyncDroolsAgentService.tell(message);
  390
+                    
  391
+                    if (waitForAnswers){
  392
+                        List<ACLMessage> results = this.waitForAnswers(message.getId(), finalCallback.getExpectedResponsesNumber(), finalCallback.getMinimumWaitTimeForResponses(), finalCallback.getTimeoutForResponses());
  393
+                        finalCallback.onSuccess(results);
  394
+                    }
  395
+                }catch (Throwable t){
  396
+                    finalCallback.onError(t);
  397
+                }
  398
+            }
  399
+            
  400
+            private List<ACLMessage> waitForAnswers( String id, int expectedMessagesNumber, long minimumWaitTime, long timeout) {
  401
+                
  402
+                
  403
+                List<ACLMessage> answers = new ArrayList<ACLMessage>();
  404
+                long waitTime = minimumWaitTime;
  405
+                do {
  406
+                    try {
  407
+                        Logger.getLogger(DialogueHelper.class.getName()).log(Level.FINER, "Answer for {0} is not ready, wait... ", id);
  408
+                        Thread.sleep( waitTime );
  409
+                    } catch ( InterruptedException ex ) {
  410
+                        Logger.getLogger(DialogueHelper.class.getName()).log(Level.WARNING, "Thread could not be put to sleep", ex);
  411
+                    }
  412
+                    List<ACLMessage> incomingAnswers = asyncDroolsAgentService.getResponses(id);
  413
+                    answers.addAll( incomingAnswers );
  414
+
  415
+                    waitTime *= 2;
  416
+                } while ( answers.size() != expectedMessagesNumber && waitTime < timeout );
  417
+                return answers;
  418
+
  419
+            }
  420
+            
  421
+        };
  422
+        
  423
+        this.executorService.submit(runnable);
  424
+    }
  425
+    
246 426
     public void setConnectionTimeout(int connectionTimeout) {
247 427
         this.connectionTimeout = connectionTimeout;
248 428
     }
@@ -250,4 +430,4 @@ public void setConnectionTimeout(int connectionTimeout) {
250 430
     public void setReceiveTimeout(int receiveTimeout) {
251 431
         this.receiveTimeout = receiveTimeout;
252 432
     }
253  
-}
  433
+}
25  drools-mas-generic-client/src/main/java/org/drools/mas/helpers/DialogueHelperCallback.java
... ...
@@ -0,0 +1,25 @@
  1
+/*
  2
+ * To change this template, choose Tools | Templates
  3
+ * and open the template in the editor.
  4
+ */
  5
+package org.drools.mas.helpers;
  6
+
  7
+import java.util.List;
  8
+import org.drools.mas.ACLMessage;
  9
+
  10
+/**
  11
+ *
  12
+ * @author esteban
  13
+ */
  14
+public interface DialogueHelperCallback {
  15
+    
  16
+    void onSuccess(List<ACLMessage> messages);
  17
+
  18
+    void onError(Throwable t);
  19
+    
  20
+    int getExpectedResponsesNumber();
  21
+    
  22
+    long getTimeoutForResponses();
  23
+    
  24
+    long getMinimumWaitTimeForResponses();
  25
+}
50  drools-mas-generic-client/src/main/java/org/drools/mas/helpers/DialogueHelperCallbackImpl.java
... ...
@@ -0,0 +1,50 @@
  1
+/*
  2
+ * To change this template, choose Tools | Templates
  3
+ * and open the template in the editor.
  4
+ */
  5
+package org.drools.mas.helpers;
  6
+
  7
+import java.util.List;
  8
+import org.drools.mas.ACLMessage;
  9
+
  10
+
  11
+public class DialogueHelperCallbackImpl implements DialogueHelperCallback {
  12
+
  13
+    int expectedResponsesNumber = 1;
  14
+    
  15
+    long timeoutForResponses = 500;
  16
+    
  17
+    long minimumWaitTimeForResponses = 50;
  18
+    
  19
+    public void onSuccess(List<ACLMessage> messages) {
  20
+    }
  21
+
  22
+    public void onError(Throwable t) {
  23
+    }
  24
+
  25
+    public int getExpectedResponsesNumber() {
  26
+        return expectedResponsesNumber;
  27
+    }
  28
+
  29
+    public void setExpectedResponsesNumber(int expectedResponsesNumber) {
  30
+        this.expectedResponsesNumber = expectedResponsesNumber;
  31
+    }
  32
+
  33
+    public long getTimeoutForResponses() {
  34
+        return timeoutForResponses;
  35
+    }
  36
+
  37
+    public void setTimeoutForResponses(long timeoutForResponses) {
  38
+        this.timeoutForResponses = timeoutForResponses;
  39
+    }
  40
+
  41
+    public long getMinimumWaitTimeForResponses() {
  42
+        return minimumWaitTimeForResponses;
  43
+    }
  44
+
  45
+    public void setMinimumWaitTimeForResponses(long minimumWaitTimeForResponses) {
  46
+        this.minimumWaitTimeForResponses = minimumWaitTimeForResponses;
  47
+    }
  48
+
  49
+    
  50
+}

0 notes on commit d729375

Please sign in to comment.
Something went wrong with that request. Please try again.