Permalink
Browse files

saving changes.

  • Loading branch information...
Neil
Neil committed Jul 30, 2009
1 parent 619c857 commit a2363d00143a14628d1e86a5cd5a12e7594b0e9d
@@ -42,7 +42,7 @@
public class WalrusDataMessenger {
private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
- private static final int DATA_QUEUE_SIZE = 1;
+ private static final int DATA_QUEUE_SIZE = 2;
private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
@@ -1240,7 +1240,7 @@ public GetObjectResponseType getObject(GetObjectType request) throws EucalyptusC
String key = bucketName + "." + objectKey;
String randomKey = key + "." + Hashes.getRandom(10);
request.setRandomKey(randomKey);
- LinkedBlockingQueue<WalrusDataMessage> getQueue = null; //TODO: NEIL WalrusQueryDispatcher.getReadMessenger().getQueue(key, randomKey);
+ LinkedBlockingQueue<WalrusDataMessage> getQueue = WalrusRESTBinding.getReadMessenger().getQueue(key, randomKey);
ObjectReader reader = new ObjectReader(bucketName, objectName, objectInfo.getSize(), getQueue, deleteAfterGet, null, storageManager);
reader.start();
@@ -22,38 +22,76 @@
import edu.ucsb.eucalyptus.msgs.EucalyptusErrorMessageType;
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
import edu.ucsb.eucalyptus.msgs.EventRecord;
+import edu.ucsb.eucalyptus.msgs.WalrusDataRequestType;
@ChannelPipelineCoverage("one")
public class ServiceSinkHandler implements ChannelDownstreamHandler, ChannelUpstreamHandler{
- private static Logger LOG = Logger.getLogger( ServiceSinkHandler.class );
-
- @Override
- public void handleDownstream( final ChannelHandlerContext ctx, final ChannelEvent e ) throws Exception {
- ctx.sendDownstream( e );
- }
-
- @Override
- public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e ) throws Exception {
- LOG.debug( this.getClass( ).getSimpleName( ) + "[incoming]: " + e );
- if ( e instanceof MessageEvent ) {
- final MessageEvent event = ( MessageEvent ) e;
- MappingHttpMessage message = ( MappingHttpMessage ) event.getMessage( );
- EucalyptusMessage msg = (EucalyptusMessage) message.getMessage( );
- LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_RECEIVED, msg.getClass().getSimpleName() ) );
- long startTime = System.currentTimeMillis();
-// Registry registry = MuleServer.getMuleContext( ).getRegistry( );
- /*Messaging.dispatch( "vm://RequestQueue", msg );
- EucalyptusMessage reply = null;
-
- reply = ReplyQueue.getReply( msg.getCorrelationId() );
- LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_SERVICED, ( System.currentTimeMillis() - startTime ) ) );
- if ( reply == null ) {
- reply = new EucalyptusErrorMessageType( this.getClass().getSimpleName(), msg, "Received a NULL reply" );
- }*/
- EucalyptusMessage reply = new PutObjectResponseType();
- MappingHttpResponse response = new MappingHttpResponse( message.getProtocolVersion( ) );
- response.setMessage( reply );
- Channels.write( ctx.getChannel( ), response );
- }
- }
+ private static Logger LOG = Logger.getLogger( ServiceSinkHandler.class );
+
+ @Override
+ public void handleDownstream( final ChannelHandlerContext ctx, final ChannelEvent e ) throws Exception {
+ LOG.warn("handle downstream");
+ ctx.sendDownstream( e );
+ }
+
+ @Override
+ public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e ) throws Exception {
+ LOG.debug( this.getClass( ).getSimpleName( ) + "[incoming]: " + e );
+ if ( e instanceof MessageEvent ) {
+ final MessageEvent event = ( MessageEvent ) e;
+ if(event.getMessage() instanceof MappingHttpMessage) {
+ MappingHttpMessage message = ( MappingHttpMessage ) event.getMessage( );
+ EucalyptusMessage msg = (EucalyptusMessage) message.getMessage( );
+ LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_RECEIVED, msg.getClass().getSimpleName() ) );
+ long startTime = System.currentTimeMillis();
+ if(msg instanceof WalrusDataRequestType) {
+ Dispatcher dispatch = new Dispatcher(ctx, msg, message);
+ dispatch.start();
+ } else {
+ Registry registry = MuleServer.getMuleContext( ).getRegistry( );
+ Messaging.dispatch( "vm://RequestQueue", msg );
+ EucalyptusMessage reply = null;
+
+ LOG.warn("Message dispatched");
+ reply = ReplyQueue.getReply( msg.getCorrelationId() );
+ LOG.warn("Reply received");
+ LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_SERVICED, ( System.currentTimeMillis() - startTime ) ) );
+ if ( reply == null ) {
+ reply = new EucalyptusErrorMessageType( this.getClass().getSimpleName(), msg, "Received a NULL reply" );
+ }
+ MappingHttpResponse response = new MappingHttpResponse( message.getProtocolVersion( ) );
+ response.setMessage( reply );
+ Channels.write( ctx.getChannel( ), response );
+ }
+ }
+ }
+ }
+
+ private class Dispatcher extends Thread {
+ private ChannelHandlerContext ctx;
+ private EucalyptusMessage msg;
+ private MappingHttpMessage message;
+
+ public Dispatcher(ChannelHandlerContext ctx, EucalyptusMessage msg, MappingHttpMessage message) {
+ this.ctx = ctx;
+ this.msg = msg;
+ this.message = message;
+ }
+
+ public void run() {
+ Registry registry = MuleServer.getMuleContext( ).getRegistry( );
+ Messaging.dispatch( "vm://RequestQueue", msg );
+
+ LOG.warn("Message dispatched");
+ EucalyptusMessage reply = ReplyQueue.getReply( msg.getCorrelationId() );
+ LOG.warn("Reply received");
+ //LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_SERVICED, ( System.currentTimeMillis() - startTime ) ) );
+ if ( reply == null ) {
+ reply = new EucalyptusErrorMessageType( this.getClass().getSimpleName(), msg, "Received a NULL reply" );
+ }
+ MappingHttpResponse response = new MappingHttpResponse( message.getProtocolVersion( ) );
+ response.setMessage( reply );
+ Channels.write( ctx.getChannel( ), response );
+ }
+ }
}
@@ -53,6 +53,8 @@
import edu.ucsb.eucalyptus.msgs.AccessControlPolicyType;
import edu.ucsb.eucalyptus.msgs.CanonicalUserType;
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
+import edu.ucsb.eucalyptus.msgs.GetObjectResponseType;
+import edu.ucsb.eucalyptus.msgs.GetObjectType;
import edu.ucsb.eucalyptus.msgs.Grant;
import edu.ucsb.eucalyptus.msgs.Grantee;
import edu.ucsb.eucalyptus.msgs.Group;
@@ -68,6 +70,7 @@
import edu.ucsb.eucalyptus.util.WalrusDataMessenger;
import edu.ucsb.eucalyptus.util.WalrusDataMessage;
import edu.ucsb.eucalyptus.cloud.entities.UserInfo;
+import org.jboss.netty.channel.Channel;
public class WalrusRESTBinding extends RestfulMarshallingHandler {
private static Logger LOG = Logger.getLogger( WalrusRESTBinding.class );
@@ -79,34 +82,67 @@
private static WalrusDataMessenger getMessenger;
public static final int DATA_MESSAGE_SIZE = 102400;
private LinkedBlockingQueue<WalrusDataMessage> putQueue;
+ private GetObjectType getObjectType;
@Override
public void incomingMessage( ChannelHandlerContext ctx, MessageEvent event ) throws Exception {
+ LOG.warn("Incoming message");
if ( event.getMessage( ) instanceof MappingHttpRequest ) {
MappingHttpRequest httpRequest = ( MappingHttpRequest ) event.getMessage( );
namespace = "http://s3.amazonaws.com/doc/" + WalrusProperties.NAMESPACE_VERSION;
// TODO: get real user data here too
- httpRequest.setMessage( this.bind( "admin", true, httpRequest ) );
+ EucalyptusMessage msg = (EucalyptusMessage) this.bind( "admin", true, httpRequest );
+ httpRequest.setMessage( msg );
+ if(msg instanceof GetObjectType)
+ getObjectType = (GetObjectType) msg;
} else if(event.getMessage() instanceof HttpChunk) {
HttpChunk httpChunk = (HttpChunk) event.getMessage();
handleHttpChunk(httpChunk);
- //ctx.sendDownstream(event);
}
}
@Override
public void outgoingMessage( ChannelHandlerContext ctx, MessageEvent event ) throws Exception {
if ( event.getMessage( ) instanceof MappingHttpResponse ) {
MappingHttpResponse httpResponse = ( MappingHttpResponse ) event.getMessage( );
- Binding binding = BindingManager.getBinding( BindingManager.sanitizeNamespace( namespace ) );
- OMElement omMsg = binding.toOM( httpResponse.getMessage( ) );
- ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
- omMsg.serialize( byteOut );
- byte[] req = byteOut.toByteArray();
- ChannelBuffer buffer = ChannelBuffers.copiedBuffer( req );
- httpResponse.addHeader( HttpHeaders.Names.CONTENT_LENGTH, String.valueOf( buffer.readableBytes() ) );
- httpResponse.addHeader( HttpHeaders.Names.CONTENT_TYPE, "text/plain" );
- httpResponse.setContent( buffer );
+ EucalyptusMessage msg = (EucalyptusMessage) httpResponse.getMessage( );
+ if(msg instanceof GetObjectResponseType) {
+ GetObjectResponseType getObjectResponse = (GetObjectResponseType) msg;
+ Long size = getObjectResponse.getSize();
+ String etag = getObjectResponse.getEtag();
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(size) );
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_TYPE, "binary/octet-stream" );
+ Channel channel = event.getChannel();
+ channel.write(httpResponse);
+ WalrusDataMessenger messenger = getReadMessenger();
+ LinkedBlockingQueue<WalrusDataMessage> getQueue = messenger.getQueue(getObjectType.getKey(), getObjectType.getRandomKey());
+
+ WalrusDataMessage dataMessage;
+ try {
+ while ((dataMessage = getQueue.take())!=null) {
+ if(WalrusDataMessage.isStart(dataMessage)) {
+ //TODO: should read size and verify
+ } else if(WalrusDataMessage.isData(dataMessage)) {
+ byte[] data = dataMessage.getPayload();
+ channel.write(data);
+ } else {
+
+ }
+ }
+ } catch (InterruptedException ex) {
+ LOG.error(ex, ex);
+ }
+ } else {
+ Binding binding = BindingManager.getBinding( BindingManager.sanitizeNamespace( namespace ) );
+ OMElement omMsg = binding.toOM( httpResponse.getMessage( ) );
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+ omMsg.serialize( byteOut );
+ byte[] req = byteOut.toByteArray();
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer( req );
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_LENGTH, String.valueOf( buffer.readableBytes() ) );
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_TYPE, "text/plain" );
+ httpResponse.setContent( buffer );
+ }
}
}
@@ -509,13 +545,14 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
operationParams.put("ContentDisposition", contentDisposition);
operationParams.put("ContentLength", (new Long(contentLength).toString()));
operationParams.put(WalrusProperties.Headers.RandomKey.toString(), randomKey);
- putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
- Writer writer = new Writer(httpRequest.getContent(), contentLength);
- writer.start();
+ putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
+ handleFirstChunk(httpRequest, contentLength);
+ //Writer writer = new Writer(httpRequest.getContent(), contentLength);
+ //writer.start();
}
} else if(verb.equals(WalrusProperties.HTTPVerb.GET.toString())) {
//TODO:handle streaming get
- /*messageContext.setProperty(WalrusProperties.STREAMING_HTTP_GET, Boolean.TRUE);
+ /*messageContext.setProperty(WalrusProperties.STREAMING_HTTP_GET, Boolean.TRUE);*/
if(!walrusInternalOperation) {
if(params.containsKey("torrent")) {
@@ -526,10 +563,9 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
operationParams.put("GetMetaData", Boolean.TRUE);
}
- Iterator<String> iterator = caseInsensitiveHeaders.keySet().iterator();
+ Set<String> headerNames = httpRequest.getHeaderNames();
boolean isExtendedGet = false;
- while(iterator.hasNext()) {
- String key = iterator.next();
+ for(String key : headerNames) {
for(WalrusProperties.ExtendedGetHeaders header: WalrusProperties.ExtendedGetHeaders.values()) {
if(key.replaceAll("-", "").equals(header.toString().toLowerCase())) {
String value = httpRequest.getHeader(key);
@@ -545,17 +581,17 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
operationParams.put("ReturnCompleteObjectOnConditionFailure", Boolean.FALSE);
}
} else {
- for(WalrusProperties.InfoOperations operation : WalrusProperties.InfoOperations.values()) {
+ /*for(WalrusProperties.InfoOperations operation : WalrusProperties.InfoOperations.values()) {
if(operation.toString().equals(operationName)) {
messageContext.removeProperty(WalrusProperties.STREAMING_HTTP_GET);
break;
}
- }
+ }*/
}
if(params.containsKey(WalrusProperties.GetOptionalParameters.IsCompressed.toString())) {
Boolean isCompressed = Boolean.parseBoolean(params.remove(WalrusProperties.GetOptionalParameters.IsCompressed.toString()));
operationParams.put("IsCompressed", isCompressed);
- }*/
+ }
} else if(verb.equals(WalrusProperties.HTTPVerb.HEAD.toString())) {
//messageContext.setProperty(WalrusProperties.STREAMING_HTTP_GET, Boolean.FALSE);
@@ -1045,57 +1081,80 @@ private String getMessageString(MappingHttpRequest httpRequest) {
}
private void handleHttpChunk(HttpChunk httpChunk) throws Exception {
- ChannelBuffer buffer = httpChunk.getContent();
- byte[] bytes = new byte[DATA_MESSAGE_SIZE];
+ ChannelBuffer buffer = httpChunk.getContent();
+ byte[] bytes = new byte[DATA_MESSAGE_SIZE];
- try {
- buffer.markReaderIndex( );
- byte[] read = new byte[buffer.readableBytes( )];
- buffer.readBytes( read );
- putQueue.put(WalrusDataMessage.DataMessage(read));
- if(httpChunk.isLast())
- putQueue.put(WalrusDataMessage.EOF());
+ try {
+ buffer.markReaderIndex( );
+ byte[] read = new byte[buffer.readableBytes( )];
+ buffer.readBytes( read );
+ putQueue.put(WalrusDataMessage.DataMessage(read));
+ if(httpChunk.isLast())
+ putQueue.put(WalrusDataMessage.EOF());
+
+ } catch (Exception ex) {
+ LOG.error(ex, ex);
+ }
- } catch (Exception ex) {
- LOG.error(ex, ex);
- }
+ }
+
+ private void handleFirstChunk(MappingHttpRequest httpRequest, long dataLength) {
+ ChannelBuffer buffer = httpRequest.getContent();
+ byte[] bytes = new byte[DATA_MESSAGE_SIZE];
+ try {
+ putQueue.put(WalrusDataMessage.StartOfData(dataLength));
+ buffer.markReaderIndex( );
+ byte[] read = new byte[buffer.readableBytes( )];
+ buffer.readBytes( read );
+ putQueue.put(WalrusDataMessage.DataMessage(read));
+ } catch (Exception ex) {
+ LOG.error(ex, ex);
+ }
}
+ public static synchronized WalrusDataMessenger getReadMessenger() {
+ if (getMessenger == null) {
+ getMessenger = new WalrusDataMessenger();
+ }
+ return getMessenger;
+ }
+
+
public static synchronized WalrusDataMessenger getWriteMessenger() {
if (putMessenger == null) {
putMessenger = new WalrusDataMessenger();
}
return putMessenger;
}
-
- class Writer extends Thread {
-
- private ChannelBuffer firstBuffer;
- private long dataLength;
- public Writer(ChannelBuffer firstBuffer, long dataLength) {
- this.firstBuffer = firstBuffer;
- this.dataLength = dataLength;
- }
- public void run() {
- byte[] bytes = new byte[DATA_MESSAGE_SIZE];
+ class Writer extends Thread {
- try {
- LOG.info("Starting upload");
- putQueue.put(WalrusDataMessage.StartOfData(dataLength));
+ private ChannelBuffer firstBuffer;
+ private long dataLength;
+ public Writer(ChannelBuffer firstBuffer, long dataLength) {
+ this.firstBuffer = firstBuffer;
+ this.dataLength = dataLength;
+ }
- firstBuffer.markReaderIndex( );
- byte[] read = new byte[firstBuffer.readableBytes( )];
- firstBuffer.readBytes( read );
- putQueue.put(WalrusDataMessage.DataMessage(read));
- //putQueue.put(WalrusDataMessage.EOF());
+ public void run() {
+ byte[] bytes = new byte[DATA_MESSAGE_SIZE];
- } catch (Exception ex) {
- LOG.error(ex, ex);
- }
- }
+ try {
+ LOG.info("Starting upload");
+ putQueue.put(WalrusDataMessage.StartOfData(dataLength));
- }
+ firstBuffer.markReaderIndex( );
+ byte[] read = new byte[firstBuffer.readableBytes( )];
+ firstBuffer.readBytes( read );
+ putQueue.put(WalrusDataMessage.DataMessage(read));
+ //putQueue.put(WalrusDataMessage.EOF());
+
+ } catch (Exception ex) {
+ LOG.error(ex, ex);
+ }
+ }
+
+ }
}

0 comments on commit a2363d0

Please sign in to comment.