Skip to content
This repository has been archived by the owner on Jul 17, 2023. It is now read-only.

Commit

Permalink
GUACAMOLE-44: Implement intercepting of input streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
mike-jumper committed Jun 4, 2016
1 parent f391f00 commit 131785a
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 5 deletions.
Expand Up @@ -21,12 +21,14 @@

import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -143,4 +145,52 @@ public void write(OutputStream output) throws IOException {

}

/**
* Intercepts a specific stream, sending the contents of the given
* InputStream over that stream as "blob" instructions.
*
* @param authToken
* The authentication token that is used to authenticate the user
* performing the operation.
*
* @param tunnelUUID
* The UUID of the tunnel containing the stream being intercepted.
*
* @param streamIndex
* The index of the stream to intercept.
*
* @param filename
* The filename to use for the sake of identifying the data being sent.
*
* @param data
* An InputStream containing the data to be sent over the intercepted
* stream.
*
* @throws GuacamoleException
* If the session associated with the given auth token cannot be
* retrieved, or if no such tunnel exists.
*/
@POST
@Consumes(MediaType.WILDCARD)
@Path("/{tunnel}/streams/{index}/{filename}")
public void setStreamContents(@QueryParam("token") String authToken,
@PathParam("tunnel") String tunnelUUID,
@PathParam("index") final int streamIndex,
@PathParam("filename") String filename,
InputStream data)
throws GuacamoleException {

GuacamoleSession session = authenticationService.getGuacamoleSession(authToken);
Map<String, StreamInterceptingTunnel> tunnels = session.getTunnels();

// Pull tunnel with given UUID
final StreamInterceptingTunnel tunnel = tunnels.get(tunnelUUID);
if (tunnel == null)
throw new GuacamoleResourceNotFoundException("No such tunnel.");

// Send input over stream
tunnel.interceptStream(streamIndex, data);

}

}
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.guacamole.tunnel;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.DatatypeConverter;
import org.apache.guacamole.GuacamoleException;
import org.apache.guacamole.net.GuacamoleTunnel;
import org.apache.guacamole.protocol.GuacamoleInstruction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Filter which selectively intercepts "ack" instructions, automatically reading
* from or closing the stream given with interceptStream(). The required "blob"
* and "end" instructions denoting the content and boundary of the stream are
* sent automatically.
*/
public class InputStreamInterceptingFilter
extends StreamInterceptingFilter<InputStream> {

/**
* Logger for this class.
*/
private static final Logger logger =
LoggerFactory.getLogger(InputStreamInterceptingFilter.class);

/**
* Creates a new InputStreamInterceptingFilter which selectively intercepts
* "ack" instructions. The required "blob" and "end" instructions will
* automatically be sent over the given tunnel based on the content of
* provided InputStreams.
*
* @param tunnel
* The GuacamoleTunnel over which any required "blob" and "end"
* instructions should be sent.
*/
public InputStreamInterceptingFilter(GuacamoleTunnel tunnel) {
super(tunnel);
}

/**
* Injects a "blob" instruction into the outbound Guacamole protocol
* stream, as if sent by the connected client. "blob" instructions are used
* to send chunks of data along a stream.
*
* @param index
* The index of the stream that this "blob" instruction relates to.
*
* @param blob
* The chunk of data to send within the "blob" instruction.
*/
private void sendBlob(String index, byte[] blob) {

// Send "blob" containing provided data
sendInstruction(new GuacamoleInstruction("blob", index,
DatatypeConverter.printBase64Binary(blob)));

}

/**
* Injects an "end" instruction into the outbound Guacamole protocol
* stream, as if sent by the connected client. "end" instructions are used
* to signal the end of a stream.
*
* @param index
* The index of the stream that this "end" instruction relates to.
*/
private void sendEnd(String index) {
sendInstruction(new GuacamoleInstruction("end", index));
}

/**
* Reads the next chunk of data from the InputStream associated with an
* intercepted stream, sending that data as a "blob" instruction over the
* GuacamoleTunnel associated with this filter. If the end of the
* InputStream is reached, an "end" instruction will automatically be sent.
*
* @param stream
* The stream from which the next chunk of data should be read.
*/
private void readNextBlob(InterceptedStream<InputStream> stream) {

// Read blob from stream if it exists
try {

// Read raw data from input stream
byte[] blob = new byte[6048];
int length = stream.getStream().read(blob);

// End stream if no more data
if (length == -1) {

// Close stream, send end if the stream is still valid
if (closeInterceptedStream(stream))
sendEnd(stream.getIndex());

return;

}

// Inject corresponding "blob" instruction
sendBlob(stream.getIndex(), Arrays.copyOf(blob, length));

}

// Terminate stream if it cannot be read
catch (IOException e) {

logger.debug("Unable to read data of intercepted input stream.", e);

// Close stream, send end if the stream is still valid
if (closeInterceptedStream(stream))
sendEnd(stream.getIndex());

}

}

/**
* Handles a single "ack" instruction, sending yet more blobs or closing the
* stream depending on whether the "ack" indicates success or failure. If no
* InputStream is associated with the stream index within the "ack"
* instruction, the instruction is ignored.
*
* @param instruction
* The "ack" instruction being handled.
*/
private void handleAck(GuacamoleInstruction instruction) {

// Verify all required arguments are present
List<String> args = instruction.getArgs();
if (args.size() < 3)
return;

// Pull associated stream
String index = args.get(0);
InterceptedStream<InputStream> stream = getInterceptedStream(index);
if (stream == null)
return;

// Pull status code
String status = args.get(2);

// Terminate stream if an error is encountered
if (!status.equals("0")) {
closeInterceptedStream(stream);
return;
}

// Send next blob
readNextBlob(stream);

}

@Override
public GuacamoleInstruction filter(GuacamoleInstruction instruction)
throws GuacamoleException {

// Intercept "ack" instructions for in-progress streams
if (instruction.getOpcode().equals("ack"))
handleAck(instruction);

// Pass instruction through untouched
return instruction;

}

@Override
protected void handleInterceptedStream(InterceptedStream<InputStream> stream) {

// Read the first blob. Note that future blobs will be read in response
// to received "ack" instructions.
readNextBlob(stream);

}

}
Expand Up @@ -19,7 +19,9 @@

package org.apache.guacamole.tunnel;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.guacamole.GuacamoleException;
import org.apache.guacamole.io.GuacamoleReader;
Expand All @@ -30,10 +32,10 @@
import org.slf4j.LoggerFactory;

/**
* GuacamoleTunnel implementation which provides for intercepting the contents
* of in-progress streams, rerouting received blobs to a provided OutputStream.
* Interception of streams is requested on a per stream basis and lasts only
* for the duration of that stream.
* GuacamoleTunnel implementation which provides for producing or consuming the
* contents of in-progress streams, rerouting blobs to a provided OutputStream
* or from a provided InputStream. Interception of streams is requested on a per
* stream basis and lasts only for the duration of that stream.
*
* @author Michael Jumper
*/
Expand All @@ -58,6 +60,12 @@ public StreamInterceptingTunnel(GuacamoleTunnel tunnel) {
super(tunnel);
}

/**
* The filter to use for providing stream data from InputStreams.
*/
private final InputStreamInterceptingFilter inputStreamFilter =
new InputStreamInterceptingFilter(this);

/**
* The filter to use for rerouting received stream data to OutputStreams.
*/
Expand Down Expand Up @@ -92,9 +100,45 @@ public void interceptStream(int index, OutputStream stream) {

}

/**
* Intercept the given stream, continuously writing the contents of the
* given InputStream as blobs. The stream will automatically end when
* when the end of the InputStream is reached. If there is no such
* stream, then the InputStream will be closed immediately. This function
* will block until all data from the InputStream has been written to the
* given stream.
*
* @param index
* The index of the stream to intercept.
*
* @param stream
* The InputStream to read all blobs data from.
*/
public void interceptStream(int index, InputStream stream) {

// Log beginning of intercepted stream
logger.debug("Intercepting input stream #{} of tunnel \"{}\".",
index, getUUID());

inputStreamFilter.interceptStream(index, new BufferedInputStream(stream));

// Log end of intercepted stream
logger.debug("Intercepted input stream #{} of tunnel \"{}\" ended.",
index, getUUID());

}

@Override
public GuacamoleReader acquireReader() {
return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter);

GuacamoleReader reader = super.acquireReader();

// Filter both input and output streams
reader = new FilteredGuacamoleReader(reader, inputStreamFilter);
reader = new FilteredGuacamoleReader(reader, outputStreamFilter);

return reader;

}

@Override
Expand All @@ -108,6 +152,7 @@ public synchronized void close() throws GuacamoleException {

// Close all intercepted streams
finally {
inputStreamFilter.closeAllInterceptedStreams();
outputStreamFilter.closeAllInterceptedStreams();
}

Expand Down

0 comments on commit 131785a

Please sign in to comment.