Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixed get.

  • Loading branch information...
commit c55fed97e74ea84adf0d6407805c2230d61419f2 1 parent e02f9de
Neil authored
View
2  clc/modules/core/src/main/java/edu/ucsb/eucalyptus/util/WalrusDataMessenger.java
@@ -42,7 +42,7 @@
public class WalrusDataMessenger {
private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
- private static final int DATA_QUEUE_SIZE = 2;
+ private static final int DATA_QUEUE_SIZE = 3;
private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
View
4 clc/modules/msgs/src/main/java/GenerateJibxBinding.groovy
@@ -57,7 +57,6 @@ def typedCollection = {
binding("http://msgs.eucalyptus.ucsb.edu");
classList.each({
Class itsClass = Class.forName(it);
-
if ( itsClass.getSuperclass().getSimpleName().equals("Object") )
baseMapping(itsClass.getSimpleName(), itsClass.getName());
else if ( itsClass.getSuperclass().getSimpleName().equals("EucalyptusData") )
@@ -68,7 +67,8 @@ classList.each({
def fieldList = itsClass.getDeclaredFields().findAll({Modifier.isPrivate(it.getModifiers())})
fieldList.each({
Class itsType = it.getType();
- if ( itsType.getSuperclass().equals(edu.ucsb.eucalyptus.msgs.EucalyptusData) ) {
+ if (itsType.getSimpleName().endsWith("Channel")) {}
+ else if ( itsType.getSuperclass().equals(edu.ucsb.eucalyptus.msgs.EucalyptusData) ) {
typeBind(it.getName(), itsType.getName());
} else if ( it.getType().equals(java.util.ArrayList.class) ) {
if ( it.getGenericType() != null ) {
View
16 clc/modules/msgs/src/main/java/edu/ucsb/eucalyptus/msgs/Walrus.groovy
@@ -1,5 +1,6 @@
package edu.ucsb.eucalyptus.msgs
-
+
+import org.jboss.netty.handler.codec.http.HttpResponseStatus
import org.jboss.netty.channel.Channel;
/*
* Software License Agreement (BSD License)
@@ -294,6 +295,10 @@ public class WalrusDataResponseType extends WalrusResponseType {
Integer errorCode;
String contentType;
String contentDisposition;
+}
+
+public class WalrusDataGetResponseType extends WalrusDataResponseType {
+
}
public class PutObjectResponseType extends WalrusDataResponseType {
@@ -437,6 +442,7 @@ public class GetObjectType extends WalrusDataRequestType {
Boolean getMetaData;
Boolean getData;
Boolean inlineData;
+ Boolean deleteAfterGet;
Boolean getTorrent;
Channel channel;
@@ -447,9 +453,13 @@ public class GetObjectType extends WalrusDataRequestType {
super( bucketName, key );
this.getData = getData;
this.getMetaData = getMetaData;
+ this.inlineData = inlineData;
+ }
+
+ public Channel getChannel() {
return channel;
}
-}
+}
public class GetObjectResponseType extends WalrusDataGetResponseType {
Status status;
@@ -503,7 +513,7 @@ public class UpdateWalrusConfigurationResponseType extends WalrusResponseType {
}
public class GetDecryptedImageType extends WalrusDataRequestType {
-}
+}
public class GetDecryptedImageResponseType extends WalrusDataGetResponseType {
}
View
3,000 clc/modules/storage-manager/src/main/java/edu/ucsb/eucalyptus/cloud/ws/WalrusManager.java
1,499 additions, 1,501 deletions not shown
View
14 clc/modules/wsstack/src/main/java/com/eucalyptus/ws/handlers/ServiceSinkHandler.java
@@ -23,7 +23,7 @@
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
import edu.ucsb.eucalyptus.msgs.EventRecord;
import edu.ucsb.eucalyptus.msgs.WalrusDataRequestType;
-import edu.ucsb.eucalyptus.msgs.GetObjectResponseType;
+import edu.ucsb.eucalyptus.msgs.WalrusDataGetResponseType;
@ChannelPipelineCoverage("one")
@@ -47,7 +47,7 @@ public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e ) throws E
LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_RECEIVED, msg.getClass().getSimpleName() ) );
long startTime = System.currentTimeMillis();
if(msg instanceof WalrusDataRequestType) {
- Dispatcher dispatch = new Dispatcher(ctx, msg, message);
+ Dispatcher dispatch = new Dispatcher(ctx, msg, message, startTime);
dispatch.start();
} else {
Registry registry = MuleServer.getMuleContext( ).getRegistry( );
@@ -73,11 +73,13 @@ public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e ) throws E
private ChannelHandlerContext ctx;
private EucalyptusMessage msg;
private MappingHttpMessage message;
-
- public Dispatcher(ChannelHandlerContext ctx, EucalyptusMessage msg, MappingHttpMessage message) {
+ private long startTime;
+
+ public Dispatcher(ChannelHandlerContext ctx, EucalyptusMessage msg, MappingHttpMessage message, long startTime) {
this.ctx = ctx;
this.msg = msg;
this.message = message;
+ this.startTime = startTime;
}
public void run() {
@@ -87,13 +89,13 @@ public void run() {
LOG.warn("Message dispatched");
EucalyptusMessage reply = ReplyQueue.getReply( msg.getCorrelationId() );
LOG.warn("Reply received");
- //LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_SERVICED, ( System.currentTimeMillis() - startTime ) ) );
+ LOG.info( EventRecord.create( this.getClass().getSimpleName(), msg.getUserId(), msg.getCorrelationId(), EventType.MSG_SERVICED, ( System.currentTimeMillis() - startTime ) ) );
if ( reply == null ) {
reply = new EucalyptusErrorMessageType( this.getClass().getSimpleName(), msg, "Received a NULL reply" );
}
MappingHttpResponse response = new MappingHttpResponse( message.getProtocolVersion( ) );
response.setMessage( reply );
- if(!(reply instanceof GetObjectResponseType))
+ if(!(reply instanceof WalrusDataGetResponseType))
Channels.write( ctx.getChannel( ), response );
}
}
View
112 clc/modules/wsstack/src/main/java/com/eucalyptus/ws/handlers/WalrusRESTBinding.java
@@ -1,9 +1,6 @@
package com.eucalyptus.ws.handlers;
-import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
@@ -15,33 +12,31 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.zip.GZIPInputStream;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.groovy.JsonSlurper;
import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+import org.apache.tools.ant.util.DateUtils;
+import org.apache.xml.dtm.ref.DTMNodeList;
+import org.bouncycastle.util.encoders.Base64;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jibx.runtime.JiBXException;
-import com.eucalyptus.ws.AuthenticationException;
import com.eucalyptus.ws.BindingException;
import com.eucalyptus.ws.MappingHttpRequest;
import com.eucalyptus.ws.MappingHttpResponse;
import com.eucalyptus.ws.binding.Binding;
import com.eucalyptus.ws.binding.BindingManager;
-import com.eucalyptus.ws.server.EucalyptusQueryPipeline.OperationParameter;
-import com.eucalyptus.ws.server.EucalyptusQueryPipeline.RequiredQueryParams;
-import com.eucalyptus.ws.util.Hashes;
-import com.eucalyptus.ws.util.HttpUtils;
import com.eucalyptus.ws.util.EucalyptusProperties;
+import com.eucalyptus.ws.util.Hashes;
import com.eucalyptus.ws.util.StorageProperties;
import com.eucalyptus.ws.util.WalrusProperties;
import com.eucalyptus.ws.util.XMLParser;
@@ -49,29 +44,20 @@
import edu.ucsb.eucalyptus.annotation.HttpEmbedded;
import edu.ucsb.eucalyptus.annotation.HttpParameterMapping;
+import edu.ucsb.eucalyptus.cloud.entities.UserInfo;
import edu.ucsb.eucalyptus.msgs.AccessControlListType;
import edu.ucsb.eucalyptus.msgs.AccessControlPolicyType;
import edu.ucsb.eucalyptus.msgs.CanonicalUserType;
import edu.ucsb.eucalyptus.msgs.EucalyptusErrorMessageType;
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
-import edu.ucsb.eucalyptus.msgs.GetObjectResponseType;
import edu.ucsb.eucalyptus.msgs.GetObjectType;
import edu.ucsb.eucalyptus.msgs.Grant;
import edu.ucsb.eucalyptus.msgs.Grantee;
import edu.ucsb.eucalyptus.msgs.Group;
import edu.ucsb.eucalyptus.msgs.MetaDataEntry;
-import edu.ucsb.eucalyptus.util.BindingUtil;
-import groovy.lang.GroovyObject;
-import org.apache.commons.fileupload.FileItem;
-import org.apache.commons.fileupload.FileUpload;
-import org.apache.log4j.Logger;
-import org.apache.tools.ant.util.DateUtils;
-import org.apache.xml.dtm.ref.DTMNodeList;
-import org.bouncycastle.util.encoders.Base64;
-import edu.ucsb.eucalyptus.util.WalrusDataMessenger;
import edu.ucsb.eucalyptus.util.WalrusDataMessage;
-import edu.ucsb.eucalyptus.cloud.entities.UserInfo;
-import org.jboss.netty.channel.Channel;
+import edu.ucsb.eucalyptus.util.WalrusDataMessenger;
+import groovy.lang.GroovyObject;
public class WalrusRESTBinding extends RestfulMarshallingHandler {
private static Logger LOG = Logger.getLogger( WalrusRESTBinding.class );
@@ -83,7 +69,6 @@
private static WalrusDataMessenger getMessenger;
public static final int DATA_MESSAGE_SIZE = 102400;
private LinkedBlockingQueue<WalrusDataMessage> putQueue;
- private GetObjectType getObjectType;
public static Channel channel;
@Override
@@ -95,8 +80,8 @@ public void incomingMessage( ChannelHandlerContext ctx, MessageEvent event ) thr
EucalyptusMessage msg = (EucalyptusMessage) this.bind( "admin", true, httpRequest );
httpRequest.setMessage( msg );
if(msg instanceof GetObjectType) {
- getObjectType = (GetObjectType) msg;
- channel = ctx.getChannel();
+ GetObjectType getObject = (GetObjectType) msg;
+ getObject.setChannel(ctx.getChannel());
}
} else if(event.getMessage() instanceof HttpChunk) {
HttpChunk httpChunk = (HttpChunk) event.getMessage();
@@ -115,14 +100,16 @@ public void outgoingMessage( ChannelHandlerContext ctx, MessageEvent event ) thr
} else {
binding = BindingManager.getBinding( BindingManager.sanitizeNamespace( "http://msgs.eucalyptus.ucsb.edu" ) );
}
- OMElement omMsg = binding.toOM( httpResponse.getMessage( ) );
- ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
- omMsg.serialize( byteOut );
- byte[] req = byteOut.toByteArray();
- ChannelBuffer buffer = ChannelBuffers.copiedBuffer( req );
- httpResponse.addHeader( HttpHeaders.Names.CONTENT_LENGTH, String.valueOf( buffer.readableBytes() ) );
- httpResponse.addHeader( HttpHeaders.Names.CONTENT_TYPE, "application/xml" );
- httpResponse.setContent( buffer );
+ if(msg != null) {
+ OMElement omMsg = binding.toOM( httpResponse.getMessage( ) );
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+ omMsg.serialize( byteOut );
+ byte[] req = byteOut.toByteArray();
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer( req );
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_LENGTH, String.valueOf( buffer.readableBytes() ) );
+ httpResponse.addHeader( HttpHeaders.Names.CONTENT_TYPE, "application/xml" );
+ httpResponse.setContent( buffer );
+ }
}
}
@@ -412,17 +399,12 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
}
httpRequest.addHeader(WalrusProperties.FormField.FormUploadPolicyData.toString(), policyData);
}
- //TODO: handle file put
- /*operationParams.put("Key", objectKey);
- key = target[0] + "." + objectKey;
+ String key = target[0] + "." + objectKey;
String randomKey = key + "." + Hashes.getRandom(10);
- LinkedBlockingQueue<WalrusDataMessage> putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
-
- Writer writer = new Writer(formDataIn, postContentLength, putQueue);
- writer.start();
-
- operationParams.put("ContentLength", (new Long(postContentLength).toString()));
- operationParams.put(WalrusProperties.Headers.RandomKey.toString(), randomKey);*/
+ operationParams.put("ContentLength", (new Long(contentLength).toString()));
+ operationParams.put(WalrusProperties.Headers.RandomKey.toString(), randomKey);
+ putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
+ handleFirstChunk(httpRequest, contentLength);
}
} else {
@@ -486,35 +468,6 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
} else {
//handle PUTs
- /*messageContext.setProperty(WalrusProperties.STREAMING_HTTP_PUT, Boolean.TRUE);
- InputStream in = (InputStream) messageContext.getProperty("TRANSPORT_IN");
- InputStream inStream;
- if((!walrusInternalOperation) || (!WalrusProperties.StorageOperations.StoreSnapshot.toString().equals(operationName))) {
- inStream = new BufferedInputStream(in);
- } else {
- try {
- inStream = new GZIPInputStream(in);
- } catch(Exception ex) {
- LOG.warn(ex, ex);
- throw new BindingException("cannot process input");
- }
- }
- String key = target[0] + "." + objectKey;
- String randomKey = key + "." + Hashes.getRandom(10);
-
- String contentType = httpRequest.getHeader(WalrusProperties.CONTENT_TYPE);
- if(contentType != null)
- operationParams.put("ContentType", contentType);
- String contentDisposition = httpRequest.getHeader("Content-Disposition");
- if(contentDisposition != null)
- operationParams.put("ContentDisposition", contentDisposition);
- operationParams.put("ContentLength", (new Long(contentLength).toString()));
- operationParams.put(WalrusProperties.Headers.RandomKey.toString(), randomKey);
-
- LinkedBlockingQueue<WalrusDataMessage> putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
-
- Writer writer = new Writer(inStream, contentLength, putQueue);
- writer.start();*/
String key = target[0] + "." + objectKey;
String randomKey = key + "." + Hashes.getRandom(10);
String contentType = httpRequest.getHeader(WalrusProperties.CONTENT_TYPE);
@@ -527,8 +480,6 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
operationParams.put(WalrusProperties.Headers.RandomKey.toString(), randomKey);
putQueue = getWriteMessenger().interruptAllAndGetQueue(key, randomKey);
handleFirstChunk(httpRequest, contentLength);
- //Writer writer = new Writer(httpRequest.getContent(), contentLength);
- //writer.start();
}
} else if(verb.equals(WalrusProperties.HTTPVerb.GET.toString())) {
//TODO:handle streaming get
@@ -560,14 +511,7 @@ private String getOperation(MappingHttpRequest httpRequest, Map operationParams)
//only supported through SOAP
operationParams.put("ReturnCompleteObjectOnConditionFailure", Boolean.FALSE);
}
- } else {
- /*for(WalrusProperties.InfoOperations operation : WalrusProperties.InfoOperations.values()) {
- if(operation.toString().equals(operationName)) {
- messageContext.removeProperty(WalrusProperties.STREAMING_HTTP_GET);
- break;
- }
- }*/
- }
+ }
if(params.containsKey(WalrusProperties.GetOptionalParameters.IsCompressed.toString())) {
Boolean isCompressed = Boolean.parseBoolean(params.remove(WalrusProperties.GetOptionalParameters.IsCompressed.toString()));
operationParams.put("IsCompressed", isCompressed);
@@ -1085,6 +1029,8 @@ private void handleFirstChunk(MappingHttpRequest httpRequest, long dataLength) {
byte[] read = new byte[buffer.readableBytes( )];
buffer.readBytes( read );
putQueue.put(WalrusDataMessage.DataMessage(read));
+ if(!httpRequest.isChunked())
+ putQueue.put(WalrusDataMessage.EOF());
} catch (Exception ex) {
LOG.error(ex, ex);
}
Please sign in to comment.
Something went wrong with that request. Please try again.