Skip to content
Permalink
Browse files
Merge branch 'improve_cmsfv2' into 'ibm-trunk'
Improve cmsfv2

Add additional guards to CmsfThreadLocal mechanis

See merge request !34
  • Loading branch information
ngmr committed Apr 30, 2015
2 parents 159cf98 + f2a7985 commit c78bc8753d81a51c1e754043d3fea906c4d98f70
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 87 deletions.
@@ -17,6 +17,9 @@

package org.apache.yoko.orb.PortableInterceptor;

import org.apache.yoko.util.cmsf.CmsfThreadLocal;
import org.apache.yoko.util.cmsf.CmsfThreadLocal.CmsfOverride;

final public class ClientRequestInfo_impl extends RequestInfo_impl implements
org.omg.PortableInterceptor.ClientRequestInfo {
//
@@ -361,23 +364,25 @@ public void _OB_request(java.util.Vector interceptors)
argStrategy_.setArgsAvail(true);
argStrategy_.setExceptAvail(true);

java.util.Enumeration e = interceptors.elements();
while (e.hasMoreElements()) {
org.omg.PortableInterceptor.ClientRequestInterceptor interceptor = (org.omg.PortableInterceptor.ClientRequestInterceptor) e
.nextElement();
try {
interceptor.send_request(this);
interceptors_.addElement(interceptor);
} catch (org.omg.CORBA.SystemException ex) {
status_ = org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value;
receivedException_ = ex;
_OB_reply();
} catch (org.omg.PortableInterceptor.ForwardRequest ex) {
status_ = org.omg.PortableInterceptor.LOCATION_FORWARD.value;
org.apache.yoko.orb.CORBA.Delegate p = (org.apache.yoko.orb.CORBA.Delegate) (((org.omg.CORBA.portable.ObjectImpl) ex.forward)
._get_delegate());
forwardReference_ = p._OB_IOR();
_OB_reply();
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
java.util.Enumeration e = interceptors.elements();
while (e.hasMoreElements()) {
org.omg.PortableInterceptor.ClientRequestInterceptor interceptor = (org.omg.PortableInterceptor.ClientRequestInterceptor) e
.nextElement();
try {
interceptor.send_request(this);
interceptors_.addElement(interceptor);
} catch (org.omg.CORBA.SystemException ex) {
status_ = org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value;
receivedException_ = ex;
_OB_reply();
} catch (org.omg.PortableInterceptor.ForwardRequest ex) {
status_ = org.omg.PortableInterceptor.LOCATION_FORWARD.value;
org.apache.yoko.orb.CORBA.Delegate p = (org.apache.yoko.orb.CORBA.Delegate) (((org.omg.CORBA.portable.ObjectImpl) ex.forward)
._get_delegate());
forwardReference_ = p._OB_IOR();
_OB_reply();
}
}
}

@@ -393,49 +398,51 @@ public void _OB_reply() throws org.apache.yoko.orb.OB.LocationForward {
current_._OB_pushSlotData(currentSlots_);
}

int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
try {
org.omg.PortableInterceptor.ClientRequestInterceptor i = (org.omg.PortableInterceptor.ClientRequestInterceptor) interceptors_
.elementAt(curr);

if (status_ == org.omg.PortableInterceptor.SUCCESSFUL.value) {
//
// The result, arguments are available
//
argStrategy_.setResultAvail(true);

i.receive_reply(this);
} else if (status_ == org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value
|| status_ == org.omg.PortableInterceptor.USER_EXCEPTION.value) {
//
// The result, arguments not available
//
argStrategy_.setResultAvail(false);
argStrategy_.setArgsAvail(false);

i.receive_exception(this);
} else {
//
// The result, arguments not available
//
argStrategy_.setResultAvail(false);
argStrategy_.setArgsAvail(false);

i.receive_other(this);
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
try {
org.omg.PortableInterceptor.ClientRequestInterceptor i = (org.omg.PortableInterceptor.ClientRequestInterceptor) interceptors_
.elementAt(curr);

if (status_ == org.omg.PortableInterceptor.SUCCESSFUL.value) {
//
// The result, arguments are available
//
argStrategy_.setResultAvail(true);

i.receive_reply(this);
} else if (status_ == org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value
|| status_ == org.omg.PortableInterceptor.USER_EXCEPTION.value) {
//
// The result, arguments not available
//
argStrategy_.setResultAvail(false);
argStrategy_.setArgsAvail(false);

i.receive_exception(this);
} else {
//
// The result, arguments not available
//
argStrategy_.setResultAvail(false);
argStrategy_.setArgsAvail(false);

i.receive_other(this);
}
} catch (org.omg.CORBA.SystemException ex) {
status_ = org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value;
receivedException_ = ex;
receivedId_ = null;
} catch (org.omg.PortableInterceptor.ForwardRequest ex) {
status_ = org.omg.PortableInterceptor.LOCATION_FORWARD.value;
org.apache.yoko.orb.CORBA.Delegate p = (org.apache.yoko.orb.CORBA.Delegate) (((org.omg.CORBA.portable.ObjectImpl) ex.forward)
._get_delegate());
forwardReference_ = p._OB_IOR();
}
} catch (org.omg.CORBA.SystemException ex) {
status_ = org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value;
receivedException_ = ex;
receivedId_ = null;
} catch (org.omg.PortableInterceptor.ForwardRequest ex) {
status_ = org.omg.PortableInterceptor.LOCATION_FORWARD.value;
org.apache.yoko.orb.CORBA.Delegate p = (org.apache.yoko.orb.CORBA.Delegate) (((org.omg.CORBA.portable.ObjectImpl) ex.forward)
._get_delegate());
forwardReference_ = p._OB_IOR();
interceptors_.removeElementAt(curr);
--curr;
}
interceptors_.removeElementAt(curr);
--curr;
}

//
@@ -17,6 +17,9 @@

package org.apache.yoko.orb.PortableInterceptor;

import org.apache.yoko.util.cmsf.CmsfThreadLocal;
import org.apache.yoko.util.cmsf.CmsfThreadLocal.CmsfOverride;

final public class ServerRequestInfo_impl extends RequestInfo_impl implements
ServerRequestInfoExt {
//
@@ -377,7 +380,7 @@ public void _OB_requestServiceContext(java.util.Vector interceptors)
argStrategy_.setArgsAvail(false);
argStrategy_.setExceptAvail(false);

try {
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
java.util.Enumeration e = interceptors.elements();
while (e.hasMoreElements()) {
org.omg.PortableInterceptor.ServerRequestInterceptor i = (org.omg.PortableInterceptor.ServerRequestInterceptor) e
@@ -405,7 +408,7 @@ public void _OB_request() throws org.apache.yoko.orb.OB.LocationForward {

status_ = NO_REPLY;

try {
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
java.util.Enumeration e = interceptors_.elements();
while (e.hasMoreElements()) {
((org.omg.PortableInterceptor.ServerRequestInterceptor) (e
@@ -431,14 +434,16 @@ public void _OB_sendReply() {
//
servant_ = null;

int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
org.omg.PortableInterceptor.ServerRequestInterceptor i = (org.omg.PortableInterceptor.ServerRequestInterceptor) interceptors_
.elementAt(curr);
interceptors_.removeElementAt(curr);
--curr;
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
org.omg.PortableInterceptor.ServerRequestInterceptor i = (org.omg.PortableInterceptor.ServerRequestInterceptor) interceptors_
.elementAt(curr);
interceptors_.removeElementAt(curr);
--curr;

i.send_reply(this);
i.send_reply(this);
}
}

//
@@ -464,7 +469,7 @@ public void _OB_sendException()
//
servant_ = null;

try {
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
org.apache.yoko.orb.OB.Assert
._OB_assert(status_ == org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value
|| status_ == org.omg.PortableInterceptor.USER_EXCEPTION.value);
@@ -506,7 +511,7 @@ public void _OB_sendOther() throws org.apache.yoko.orb.OB.LocationForward {
//
servant_ = null;

try {
try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
org.apache.yoko.orb.OB.Assert
._OB_assert(status_ == org.omg.PortableInterceptor.LOCATION_FORWARD.value
|| status_ == org.omg.PortableInterceptor.TRANSPORT_RETRY.value);
@@ -32,7 +32,7 @@ public void send_request(ClientRequestInfo ri) throws ForwardRequest {
throw e;
}
}
CmsfThreadLocal.set(cmsf.getValue());
CmsfThreadLocal.push(cmsf.getValue());

if (CmsfVersion.ENABLED) ri.add_request_service_context(CMSFv2.getSc(), false);
}
@@ -43,17 +43,17 @@ public void send_poll(ClientRequestInfo ri) {

@Override
public void receive_reply(ClientRequestInfo ri) {
CmsfThreadLocal.reset();
CmsfThreadLocal.pop();
}

@Override
public void receive_exception(ClientRequestInfo ri) throws ForwardRequest {
CmsfThreadLocal.reset();
CmsfThreadLocal.pop();
}

@Override
public void receive_other(ClientRequestInfo ri) throws ForwardRequest {
CmsfThreadLocal.reset();
CmsfThreadLocal.pop();
}

@Override
@@ -34,6 +34,7 @@ public CmsfServerInterceptor(int slotId) {

@Override
public void receive_request_service_contexts(ServerRequestInfo ri) throws ForwardRequest {
CmsfThreadLocal.reset();
CmsfVersion cmsf = CMSFv1;
try {
ServiceContext sc = ri.get_request_service_context(RMICustomMaxStreamFormat.value);
@@ -64,7 +65,7 @@ private void setupCmsfThreadLocalValue(ServerRequestInfo ri) {
} catch (InvalidSlot e) {
throw (INTERNAL)(new INTERNAL(e.getMessage())).initCause(e);
}
CmsfThreadLocal.set(cmsf.getValue());
CmsfThreadLocal.push(cmsf.getValue());
}

@Override
@@ -5,10 +5,34 @@

public final class CmsfThreadLocal {
private static final Logger LOGGER = Logger.getLogger(CmsfThreadLocal.class.getName());
private static final ThreadLocal<Version> value = new ThreadLocal<>();
private static final ThreadLocal<CmsfInfo> cmsfInfo = new ThreadLocal<CmsfInfo>() {
@Override protected CmsfInfo initialValue() {
return new CmsfInfo();
}
};

private CmsfThreadLocal() {}

private static final class CmsfInfo {
public Frame head = Frame.DEFAULT;
public boolean override = false;
}

private static final class Frame {
static final Frame DEFAULT = new Frame();
public final Version version;
public final Frame prev;

private Frame() {
this.version = Version.CMSFv1;
this.prev = this;
}

Frame(Version version, Frame prev) {
this.version = version;
this.prev = prev;
}
}
private enum Version {
CMSFv1(1), CMSFv2(2);

@@ -23,28 +47,53 @@ static Version get(byte value) {
}
}

public static void set(byte cmsfv) {
public static final class CmsfOverride implements AutoCloseable {
private final CmsfInfo info;

CmsfOverride(CmsfInfo info) {
this.info = info;
info.override = true;
}

@Override
public void close() {
info.override = false;
}
}

public static CmsfOverride override() {
return new CmsfOverride(cmsfInfo.get());
}

public static void push(byte cmsfv) {
final CmsfInfo info = cmsfInfo.get();
final Version version = Version.get(cmsfv);
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer(String.format("CMSF thread local version set: %s", version));
value.set(version);
LOGGER.finer(String.format("CMSF thread local version pushed onto stack: %s", version));
info.head = new Frame(version, info.head);
}

public static byte get() {
final Version version = value.get();
final CmsfInfo info = cmsfInfo.get();
final boolean override = info.override;
final Version version = (override) ? Version.CMSFv1 : info.head.version;
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer(String.format("CMSF thread local version retrieved: %s", version));
if (version == null) {
if (LOGGER.isLoggable(Level.FINE))
LOGGER.fine("CMSF thread local was not set - returning CMSFv1");
return Version.CMSFv1.value;
}
LOGGER.finer(String.format("CMSF thread local version retrieved: %s, override is %b", version, override));
return version.value;
}

public static byte pop() {
final CmsfInfo info = cmsfInfo.get();
final Version version = info.head.version;
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer(String.format("CMSF thread local version popped from stack: %s", version));
info.head = info.head.prev;
return version.value;
}

public static void reset() {
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer("CMSF thread local reset");
value.remove();
LOGGER.finer("CMSF thread local stack reset");
cmsfInfo.remove();
}
}

0 comments on commit c78bc87

Please sign in to comment.