Permalink
Browse files

Merged updates from the base library project

  • Loading branch information...
1 parent 306f924 commit 839240819c05d30f912e379be1e870cf305390b4 Peter Kieltyka committed Aug 22, 2008
View
@@ -5,9 +5,6 @@
build/*
bin/*
src/.settings/*
-src/org/amqp/AMQP.as
-src/org/amqp/headers/*
-src/org/amqp/methods/*
src/bin/*
test/.flexProperties
test/.settings/
View
@@ -2,7 +2,7 @@ library.name = amqp
library.author = Ben Hood
#flexsdk.dir = /opt/flex3
-flexsdk.dir = /Applications/Adobe\ Flex\ Builder\ 3/sdks/3.0.0/
+flexsdk.dir = /Applications/Adobe\ Flex\ Builder\ 3/sdks/3.1.0/
asdoc = ${flexsdk.dir}/lib/asdoc.jar
compc = ${flexsdk.dir}/lib/compc.jar
View
@@ -30,6 +30,11 @@ package org.amqp
public class Connection
{
+ private static const CLOSED:int = 0;
+ private static const CONNECTING:int = 1;
+ private static const CONNECTED:int = 2;
+
+ private var currentState:int = CLOSED;
private var shuttingDown:Boolean = false;
private var delegate:IODelegate;
private var session0:Session;
@@ -65,19 +70,25 @@ package org.amqp
}
public function start():void {
- delegate.open(connectionState);
+ if (currentState < CONNECTING) {
+ currentState = CONNECTING;
+ delegate.open(connectionState);
+ }
}
public function onSocketConnect(event:Event):void {
+ currentState = CONNECTED;
var header:ByteArray = AMQP.generateHeader();
delegate.writeBytes(header, 0, header.length);
}
- public function onSocketClose(event:Event):void {
+ public function onSocketClose(event:Event):void {
+ currentState = CLOSED;
handleForcedShutdown();
}
public function onSocketError(event:IOErrorEvent):void {
+ currentState = CLOSED;
trace(event.text);
}
@@ -31,11 +31,10 @@ package org.amqp.impl
import org.amqp.ProtocolEvent;
import org.amqp.SynchronousCommandClient;
import org.amqp.error.IllegalStateError;
- import org.amqp.headers.AccessProperties;
import org.amqp.headers.BasicProperties;
import org.amqp.headers.ChannelProperties;
import org.amqp.headers.ConnectionProperties;
- import org.amqp.methods.access.RequestOk;
+ import org.amqp.headers.ExchangeProperties;
import org.amqp.methods.basic.Consume;
import org.amqp.methods.basic.ConsumeOk;
import org.amqp.methods.basic.Deliver;
@@ -47,10 +46,8 @@ package org.amqp.impl
private static const STATE_CLOSED:int = 0;
private static const STATE_CONNECTION:int = new ConnectionProperties().getClassId();
private static const STATE_CHANNEL:int = new ChannelProperties().getClassId();
- private static const STATE_ACCESS:int = new AccessProperties().getClassId();
- private static const STATE_OPEN:int = new AccessProperties().getClassId() + 1;
-
- protected var ticket:int;
+ private static const STATE_OPEN:int = STATE_CHANNEL + 1;
+
protected var state:int = STATE_CONNECTION;
protected var QUEUE_SIZE:int = 100;
protected var commandQueue:PriorityQueue = new PriorityQueue(QUEUE_SIZE);
@@ -59,7 +56,6 @@ package org.amqp.impl
public function SessionStateHandler(){
addEventListener(new OpenOk(), onOpenOk);
- addEventListener(new RequestOk(), onRequestOk);
addEventListener(new CloseOk(), onCloseOk);
addEventListener(new Deliver(), onDeliver);
}
@@ -95,15 +91,6 @@ package org.amqp.impl
}
break;
}
- case STATE_CHANNEL: {
- if (cmd.method.getClassId() > STATE_ACCESS) {
- enqueueCommand(cmd);
- }
- else {
- sendCommand(cmd);
- }
- break;
- }
default: {
flushQueue();
sendCommand(cmd);
@@ -134,22 +121,14 @@ package org.amqp.impl
}
public function onOpenOk(event:ProtocolEvent):void {
- transition(STATE_CHANNEL);
- flushQueue(STATE_ACCESS);
- }
-
- public function onRequestOk(event:ProtocolEvent):void {
- transition(STATE_ACCESS);
- var accessRequestOk:RequestOk = event.command.method as RequestOk;
- ticket = accessRequestOk.ticket;
- flushQueue();
+ transition(STATE_OPEN);
+ flushQueue(-1);
}
/**
* This frees up any resources associated with this session.
**/
public function onCloseOk(event:ProtocolEvent):void {
- ticket = -1;
transition(STATE_CONNECTION);
}
@@ -181,40 +160,15 @@ package org.amqp.impl
}
private function sendCommand(cmd:Command):void {
- var method:Method = cmd.method;
- if (method.hasOwnProperty("_ticket")) {
- method.ticket = ticket;
- session.sendCommand(cmd);
- }
- else {
- session.sendCommand(cmd);
- }
+ session.sendCommand(cmd);
}
/**
* Cheap hack of an FSM
**/
private function transition(newState:int):void {
switch (state) {
- case STATE_CLOSED: { stateError(newState); }
- case STATE_CONNECTION: {
- if (newState == STATE_ACCESS) {
- stateError(newState);
- }
- else {
- state = newState;
- }
- break;
- }
- case STATE_ACCESS: {
- if (newState == STATE_CHANNEL) {
- stateError(newState);
- }
- else {
- state = newState;
- }
- break;
- }
+ case STATE_CLOSED: { stateError(newState); }
default: state = newState;
}
}
@@ -17,13 +17,16 @@
**/
package org.amqp.methods
{
- import flash.utils.IDataOutput;
- import flash.utils.ByteArray;
import com.ericfeminella.utils.Map;
- import org.amqp.LongString;
+
+ import flash.utils.ByteArray;
+ import flash.utils.IDataOutput;
+
import org.amqp.FrameHelper;
- import org.amqp.util.IOUtils;
+ import org.amqp.LongString;
import org.amqp.error.IllegalArgumentError;
+ import org.amqp.util.IOUtils;
+ import org.amqp.util.LongStringHelper;
public class MethodArgumentWriter
{
@@ -138,7 +141,7 @@ package org.amqp.methods
if(value is String) {
writeOctet(83); // 'S'
- writeShortstr(value as String);
+ writeString(value as String);
}
else if(value is LongString) {
writeOctet(83); // 'S'
@@ -0,0 +1,24 @@
+/**
+ * ---------------------------------------------------------------------------
+ * Copyright (C) 2008 0x6e6562
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ---------------------------------------------------------------------------
+ **/
+package org.amqp.patterns
+{
+ public interface Dispatcher
+ {
+ function dispatch(o:*, callback:Function):void;
+ }
+}
@@ -1,3 +1,20 @@
+/**
+ * ---------------------------------------------------------------------------
+ * Copyright (C) 2008 0x6e6562
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ---------------------------------------------------------------------------
+ **/
package org.amqp.patterns
{
import flash.utils.IDataInput;
@@ -25,7 +25,6 @@ package org.amqp.patterns.impl
import org.amqp.headers.BasicProperties;
import org.amqp.impl.SessionStateHandler;
import org.amqp.methods.access.Request;
- import org.amqp.methods.access.RequestOk;
import org.amqp.methods.basic.Publish;
import org.amqp.methods.channel.Open;
import org.amqp.methods.connection.OpenOk;
@@ -54,17 +53,9 @@ package org.amqp.patterns.impl
protected function openChannel(event:ProtocolEvent):void {
sessionHandler = connection.sessionManager.create();
-
var open:Open = new Open();
- var accessRequest:Request = new Request();
- accessRequest.realm = realm;
- accessRequest.passive = true;
- accessRequest.active = true;
- accessRequest.read = true;
- accessRequest.write = true;
- sessionHandler.dispatch(new Command(open));
- sessionHandler.dispatch(new Command(accessRequest));
- sessionHandler.addEventListener(new RequestOk(), onRequestOk);
+ sessionHandler.dispatch(new Command(open));
+ sessionHandler.addEventListener(new org.amqp.methods.channel.OpenOk(), onChannelOpenOk);
}
protected function publish(x:String, routing_key:String, data:ByteArray, properties:BasicProperties = null):void {
@@ -111,7 +102,7 @@ package org.amqp.patterns.impl
/**
* This should be overriden by specializing classes
**/
- protected function onRequestOk(event:ProtocolEvent):void {}
+ protected function onChannelOpenOk(event:ProtocolEvent):void {}
/**
* This should be overriden by specializing classes
@@ -17,7 +17,6 @@
**/
package org.amqp.patterns.impl
{
- import de.polygonal.ds.ArrayedQueue;
import flash.events.EventDispatcher;
import flash.utils.ByteArray;
@@ -29,51 +28,39 @@ package org.amqp.patterns.impl
import org.amqp.methods.basic.Consume;
import org.amqp.methods.basic.Deliver;
import org.amqp.patterns.CorrelatedMessageEvent;
+ import org.amqp.patterns.Dispatcher;
import org.amqp.patterns.RpcClient;
import org.amqp.util.Guid;
import org.amqp.util.Properties;
- public class RpcClientImpl extends AbstractDelegate implements RpcClient, BasicConsumer
+ public class RpcClientImpl extends AbstractDelegate implements RpcClient, BasicConsumer, Dispatcher
{
public var routingKey:String;
public var replyQueue:String;
public var consumerTag:String;
- private var sendBuffer:ArrayedQueue = new ArrayedQueue(100);
-
private var dispatcher:EventDispatcher = new EventDispatcher();
+ private var sendBuffer:SendBuffer;
+
public function RpcClientImpl(c:Connection) {
- super(c);
+ super(c);
+ sendBuffer = new SendBuffer(this);
}
public function send(o:*,callback:Function):void {
if (null != o) {
if (null == consumerTag) {
- buffer(o,callback);
+ sendBuffer.buffer(o,callback);
}
else {
dispatch(o,callback);
}
}
- }
-
- private function buffer(o:*,callback:Function):void {
- var o:Object = {payload:o,handler:callback};
- sendBuffer.enqueue(o);
- }
-
- private function drainBuffer():void {
- while(!sendBuffer.isEmpty()) {
- var o:Object = sendBuffer.dequeue();
- var data:* = o.payload;
- var callback:Function = o.handler;
- dispatch(data,callback);
- }
}
- private function dispatch(o:*,callback:Function):void {
+ public function dispatch(o:*,callback:Function):void {
var correlationId:String = Guid.next();
var data:ByteArray = new ByteArray();
serializer.serialize(o,data);
@@ -84,7 +71,7 @@ package org.amqp.patterns.impl
dispatcher.addEventListener(correlationId,callback);
}
- override protected function onRequestOk(event:ProtocolEvent):void {
+ override protected function onChannelOpenOk(event:ProtocolEvent):void {
declareExchange(exchange,exchangeType);
setupReplyQueue();
}
@@ -99,7 +86,7 @@ package org.amqp.patterns.impl
public function onConsumeOk(tag:String):void {
consumerTag = tag;
- drainBuffer();
+ sendBuffer.drain();
}
public function onCancelOk(tag:String):void {}
@@ -44,7 +44,7 @@ package org.amqp.patterns.impl
super(c);
}
- override protected function onRequestOk(event:ProtocolEvent):void {
+ override protected function onChannelOpenOk(event:ProtocolEvent):void {
declareExchange(exchange,exchangeType);
declareQueue("");
sessionHandler.addEventListener(new DeclareOk(),onQueueDeclareOk);
Oops, something went wrong.

0 comments on commit 8392408

Please sign in to comment.