Skip to content

Commit

Permalink
clean up the blob file but only when we know we are done with the str…
Browse files Browse the repository at this point in the history
…eam - so do so at the time close is called
  • Loading branch information
jmazzitelli committed Aug 5, 2015
1 parent 7bbbd5b commit 0cd6bbe
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,23 @@ public class BinaryData extends InputStream {
private final InputStream streamData;
private int inMemoryDataPointer;

private Runnable onCloseAction;

public BinaryData(byte[] inMemoryData, InputStream streamData) {
this.inMemoryData = (inMemoryData != null) ? inMemoryData : new byte[0];
this.streamData = streamData;
this.inMemoryDataPointer = 0;
this.onCloseAction = null;
}

/**
* Provides custom action to run after {@link #close()} finishes closing the stream.
* This allows you to tell this object how it can clean up resources backing the stream.
*
* @param action the action or null if nothing should be done
*/
public void setOnCloseAction(Runnable action) {
onCloseAction = action;
}

public int read() throws IOException {
Expand Down Expand Up @@ -88,6 +101,11 @@ public void close() throws IOException {
inMemoryData = new byte[0];
inMemoryDataPointer = 0;
streamData.close();

// if we were asked to do something after close is done, do it now
if (onCloseAction != null) {
onCloseAction.run();
}
}

public void mark(int readlimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.hawkular.bus.common.BasicMessage;
import org.hawkular.bus.common.BasicMessageWithExtraData;
import org.hawkular.bus.common.BinaryData;
import org.hawkular.bus.common.MessageId;
import org.hawkular.bus.common.log.MsgLogger;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void setConsumerConnectionContext(ConsumerConnectionContext consumerConne
*
* @return the message as a object T, or null if we should not or cannot process the message
*/
protected BasicMessageWithExtraData<T> parseMessage(Message message) {
protected BasicMessageWithExtraData<T> parseMessage(final Message message) {
BasicMessageWithExtraData<T> retVal;

try {
Expand All @@ -108,6 +109,21 @@ protected BasicMessageWithExtraData<T> parseMessage(Message message) {
} else if (message instanceof ActiveMQBlobMessage) {
InputStream receivedBody = ((ActiveMQBlobMessage) message).getInputStream();
retVal = BasicMessage.fromJSON(receivedBody, getBasicMessageClass());
BinaryData extraData = retVal.getBinaryData();
if (extraData != null) {
extraData.setOnCloseAction(new Runnable() {
@Override
public void run() {
ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message;
try {
getLog().tracef("Deleting blob msg file [%s]", blob.getRemoteBlobUrl());
blob.deleteFile();
} catch (Exception e) {
getLog().warnf(e, "Failed to delete blob msg file: [%s]", blob.getRemoteBlobUrl());
}
}
});
}
} else {
throw new Exception("Message is not a valid type: " + message.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testWithSimpleInputStream() throws Exception {

try {
String brokerURL = broker.getBrokerURL();
brokerURL += "/?jms.blobTransferPolicy.uploadUrl=file:" + storageLocation;
brokerURL += "?jms.blobTransferPolicy.uploadUrl=file:" + storageLocation;
Endpoint endpoint = new Endpoint(Type.QUEUE, "testq");

// mimic server-side
Expand Down Expand Up @@ -85,6 +85,7 @@ public void testWithSimpleInputStream() throws Exception {
Assert.assertEquals(outgoingExtraData, new String(incomingExtraData, "UTF-8"));

// make sure the data has been removed from the uploadUrl storage location
binaryData.close(); // closing will force the backing file to be removed
File[] blobsStillAround = new File(storageLocation).listFiles();
Assert.assertEquals("Still have blobs: " + Arrays.asList(blobsStillAround), 0, blobsStillAround.length);

Expand Down

0 comments on commit 0cd6bbe

Please sign in to comment.