From afe13e4bef33381f27adcad6b5b0ca857673945c Mon Sep 17 00:00:00 2001 From: "Dennis M. Sosnoski" Date: Mon, 10 Feb 2014 10:09:02 +0000 Subject: [PATCH] CXF-4866, CXF-352: Fixes to restructured WS-RM code from problems found with unit tests, and corrected unit tests to work with the restructured code. git-svn-id: https://svn.apache.org/repos/asf/cxf/trunk@1566557 13f79535-47bb-0310-9956-ffa450edef68 --- .../cxf/ws/rm/RetransmissionInterceptor.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java new file mode 100644 index 00000000000..83201cba757 --- /dev/null +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.ws.rm; + +import java.io.OutputStream; + +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.MessageSenderInterceptor; +import org.apache.cxf.io.WriteOnCloseOutputStream; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; +import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor; + +/** + * + */ +public class RetransmissionInterceptor extends AbstractPhaseInterceptor { + + RMManager manager; + + public RetransmissionInterceptor() { + super(Phase.PREPARE_SEND); + addAfter(MessageSenderInterceptor.class.getName()); + addBefore(GZIPOutInterceptor.class.getName()); + } + + public RMManager getManager() { + return manager; + } + + public void setManager(RMManager manager) { + this.manager = manager; + } + + public void handleMessage(Message message) throws Fault { + handle(message, false); + } + + @Override + public void handleFault(Message message) { + handle(message, true); + } + + void handle(Message message, boolean isFault) { + if (null == getManager().getRetransmissionQueue()) { + return; + } + + OutputStream os = message.getContent(OutputStream.class); + if (null == os) { + return; + } + if (isFault) { + // remove the exception set by the PhaseInterceptorChain so that the + // error does not reach the client when retransmission is scheduled + message.setContent(Exception.class, null); + message.getExchange().put(Exception.class, null); + } else { + WriteOnCloseOutputStream stream = RMUtils.createCachedStream(message, os); + stream.registerCallback(new RetransmissionCallback(message, getManager())); + } + } +} + + + +