Permalink
Browse files

[AS7-4696] workaround the controller-clint API in order to be able to…

… deploy bigger files
  • Loading branch information...
1 parent 1cf10c9 commit 8f1b8bc0b1086b9537a31842506688609c534ce5 @emuckenhuber committed May 1, 2012
@@ -667,21 +667,17 @@ protected void execute(CommandContext ctx, ModelNode request, File f, boolean un
addHeaders(ctx, request);
ModelNode result;
- FileInputStream is = null;
try {
if(!unmanaged) {
- is = new FileInputStream(f);
OperationBuilder op = new OperationBuilder(request);
- op.addInputStream(is);
+ op.addFileAsAttachment(f);
request.get(Util.CONTENT).get(0).get(Util.INPUT_STREAM_INDEX).set(0);
result = ctx.getModelControllerClient().execute(op.build());
} else {
result = ctx.getModelControllerClient().execute(request);
}
} catch (Exception e) {
throw new CommandFormatException("Failed to add the deployment content to the repository: " + e.getLocalizedMessage());
- } finally {
- StreamUtils.safeClose(is);
}
if (!Util.isSuccess(result)) {
throw new CommandFormatException(Util.getFailureDescription(result));
@@ -21,10 +21,17 @@
*/
package org.jboss.as.controller.client;
+import org.jboss.as.controller.client.impl.InputStreamEntry;
+import org.jboss.as.protocol.StreamUtils;
import org.jboss.dmr.ModelNode;
import static org.jboss.as.controller.client.ControllerClientMessages.MESSAGES;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -54,6 +61,29 @@ public OperationBuilder(final ModelNode operation, boolean autoCloseStreams) {
}
/**
+ * Associate a file with the operation. This will create a {@code FileInputStream}
+ * and add it as attachment.
+ *
+ * @param file the file
+ * @return the operation builder
+ */
+ public OperationBuilder addFileAsAttachment(final File file) {
+ if(file == null) {
+ throw MESSAGES.nullVar("file");
+ }
+ try {
+ FileStreamEntry entry = new FileStreamEntry(file);
+ if (inputStreams == null) {
+ inputStreams = new ArrayList<InputStream>();
+ }
+ inputStreams.add(entry);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ /**
* Associate an input stream with the operation. Closing the input stream
* is the responsibility of the caller.
*
@@ -120,4 +150,31 @@ public static OperationBuilder create(final ModelNode operation, final boolean a
return new OperationBuilder(operation, autoCloseStreams);
}
+ // Wrap the FIS in a streamEntry so that the controller-client has access to the underlying File
+ private static class FileStreamEntry extends FilterInputStream implements InputStreamEntry {
+
+ private final File file;
+ private FileStreamEntry(final File file) throws IOException {
+ super(new FileInputStream(file)); // This stream will get closed regardless of autoClose
+ this.file = file;
+ }
+
+ @Override
+ public int initialize() throws IOException {
+ return (int) file.length();
+ }
+
+ @Override
+ public void copyStream(final DataOutput output) throws IOException {
+ final FileInputStream is = new FileInputStream(file);
+ try {
+ StreamUtils.copyStream(is, output);
+ is.close();
+ } finally {
+ StreamUtils.safeClose(is);
+ }
+ }
+
+ }
+
}
@@ -18,15 +18,12 @@
*/
package org.jboss.as.controller.client.impl;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.jboss.as.controller.client.MessageSeverity;
import org.jboss.as.controller.client.ModelControllerClient;
@@ -59,14 +56,7 @@
private static ManagementRequestHandler<ModelNode, OperationExecutionContext> MESSAGE_HANDLER = new HandleReportRequestHandler();
private static ManagementRequestHandler<ModelNode, OperationExecutionContext> GET_INPUT_STREAM = new ReadAttachmentInputStreamRequestHandler();
- private static final OperationMessageHandler NO_OP_HANDLER = new OperationMessageHandler() {
-
- @Override
- public void handleReport(MessageSeverity severity, String message) {
- //
- }
-
- };
+ private static final OperationMessageHandler NO_OP_HANDLER = OperationMessageHandler.DISCARD;
/**
* Get the mgmt channel association.
@@ -197,37 +187,31 @@ public void execute(final ManagementRequestContext<OperationExecutionContext> co
final OperationExecutionContext exec = context.getAttachment();
final ManagementRequestHeader header = ManagementRequestHeader.class.cast(context.getRequestHeader());
final ManagementResponseHeader response = new ManagementResponseHeader(header.getVersion(), header.getRequestId(), null);
- final InputStream is = exec.getOperation().getInputStreams().get(index);
- try {
- final ByteArrayOutputStream bout = copyStream(is);
- final FlushableDataOutput output = context.writeMessage(response);
+ final InputStreamEntry entry = exec.getStream(index);
+ synchronized (entry) {
+ // Initialize the stream entry
+ final int size = entry.initialize();
try {
- output.writeByte(ModelControllerProtocol.PARAM_INPUTSTREAM_LENGTH);
- output.writeInt(bout.size());
- output.writeByte(ModelControllerProtocol.PARAM_INPUTSTREAM_CONTENTS);
- output.write(bout.toByteArray());
- output.writeByte(ManagementProtocol.RESPONSE_END);
- output.close();
+ final FlushableDataOutput output = context.writeMessage(response);
+ try {
+ output.writeByte(ModelControllerProtocol.PARAM_INPUTSTREAM_LENGTH);
+ output.writeInt(size);
+ output.writeByte(ModelControllerProtocol.PARAM_INPUTSTREAM_CONTENTS);
+ entry.copyStream(output);
+ output.writeByte(ManagementProtocol.RESPONSE_END);
+ output.close();
+ } finally {
+ StreamUtils.safeClose(output);
+ }
} finally {
- StreamUtils.safeClose(output);
+ // the caller is responsible for closing the input streams
+ // StreamUtils.safeClose(is);
}
- } finally {
- // the caller is responsible for closing the input streams
- // StreamUtils.safeClose(is);
}
}
});
}
- protected ByteArrayOutputStream copyStream(final InputStream is) throws IOException {
- final ByteArrayOutputStream bout = new ByteArrayOutputStream();
- // Hmm, a null input-stream should be a failure?
- if(is != null) {
- StreamUtils.copyStream(is, bout);
- }
- return bout;
- }
-
}
private static class HandleReportRequestHandler implements ManagementRequestHandler<ModelNode, OperationExecutionContext> {
@@ -257,20 +241,27 @@ public void handleRequest(final DataInput input, final ActiveOperation.ResultHan
private final Operation operation;
private final OperationMessageHandler handler;
+ private final List<InputStreamEntry> streams;
OperationExecutionContext(final Operation operation, final OperationMessageHandler handler) {
this.operation = operation;
this.handler = handler != null ? handler : NO_OP_HANDLER;
- }
-
- Operation getOperation() {
- return operation;
+ this.streams = createStreamEntries(operation);
}
OperationMessageHandler getOperationMessageHandler() {
return handler;
}
+ InputStreamEntry getStream(int index) {
+ final InputStreamEntry entry = streams.get(index);
+ if(entry == null) {
+ // Hmm, a null input-stream should be a failure?
+ return InputStreamEntry.EMPTY;
+ }
+ return entry;
+ }
+
@Override
public void completed(ModelNode result) {
closeAttachments();
@@ -287,6 +278,9 @@ public void cancelled() {
}
private void closeAttachments() {
+ for(final InputStreamEntry entry : streams) {
+ StreamUtils.safeClose(entry);
+ }
if(operation.isAutoCloseStreams()) {
StreamUtils.safeClose(operation);
}
@@ -354,4 +348,22 @@ public void handleRequest(DataInput input, ActiveOperation.ResultHandler<ModelNo
}
}
+ static List<InputStreamEntry> createStreamEntries(final Operation operation) {
+ final List<InputStream> streams = operation.getInputStreams();
+ if(streams.isEmpty()) {
+ return Collections.emptyList();
+ }
+ final List<InputStreamEntry> entries = new ArrayList<InputStreamEntry>();
+ final boolean autoClose = operation.isAutoCloseStreams();
+ for(final InputStream stream : streams) {
+ if(stream instanceof InputStreamEntry) {
+ entries.add((InputStreamEntry) stream);
+ } else {
+ // TODO don't copy everything to memory... perhaps use InputStreamEntry.CachingStreamEntry
+ entries.add(new InputStreamEntry.InMemoryEntry(stream, autoClose));
+ }
+ }
+ return entries;
+ }
+
}
Oops, something went wrong.

0 comments on commit 8f1b8bc

Please sign in to comment.