Skip to content

Commit

Permalink
Use YASF negotiation to maintain stream compatibility when talking En…
Browse files Browse the repository at this point in the history
…ums to unfixed yoko
  • Loading branch information
ngmr committed Mar 24, 2016
1 parent 02829e8 commit 55b7d4f
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 108 deletions.
Expand Up @@ -296,7 +296,7 @@ private void initialize(org.omg.CORBA.StringSeqHolder args, String orbId,
try {
piManager.addIORInterceptor(new YasfIORInterceptor(), true);
piManager.addClientRequestInterceptor(new YasfClientInterceptor());
piManager.addServerRequestInterceptor(new YasfServerInterceptor());
piManager.addServerRequestInterceptor(new YasfServerInterceptor(piManager.allocateSlotId()));
} catch (org.omg.PortableInterceptor.ORBInitInfoPackage.DuplicateName ex) {
org.apache.yoko.orb.OB.Assert._OB_assert(ex);
}
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.yoko.util.cmsf.CmsfThreadLocal;
import org.apache.yoko.util.cmsf.CmsfThreadLocal.CmsfOverride;
import org.apache.yoko.util.yasf.YasfThreadLocal;
import org.apache.yoko.util.yasf.YasfThreadLocal.YasfOverride;

final public class ClientRequestInfo_impl extends RequestInfo_impl implements
org.omg.PortableInterceptor.ClientRequestInfo {
Expand Down Expand Up @@ -364,7 +366,8 @@ public void _OB_request(java.util.Vector interceptors)
argStrategy_.setArgsAvail(true);
argStrategy_.setExceptAvail(true);

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
java.util.Enumeration e = interceptors.elements();
while (e.hasMoreElements()) {
org.omg.PortableInterceptor.ClientRequestInterceptor interceptor = (org.omg.PortableInterceptor.ClientRequestInterceptor) e
Expand Down Expand Up @@ -398,7 +401,8 @@ public void _OB_reply() throws org.apache.yoko.orb.OB.LocationForward {
current_._OB_pushSlotData(currentSlots_);
}

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
try {
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.yoko.util.cmsf.CmsfThreadLocal;
import org.apache.yoko.util.cmsf.CmsfThreadLocal.CmsfOverride;
import org.apache.yoko.util.yasf.YasfThreadLocal;
import org.apache.yoko.util.yasf.YasfThreadLocal.YasfOverride;

final public class ServerRequestInfo_impl extends RequestInfo_impl implements
ServerRequestInfoExt {
Expand Down Expand Up @@ -380,7 +382,8 @@ public void _OB_requestServiceContext(java.util.Vector interceptors)
argStrategy_.setArgsAvail(false);
argStrategy_.setExceptAvail(false);

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
java.util.Enumeration e = interceptors.elements();
while (e.hasMoreElements()) {
org.omg.PortableInterceptor.ServerRequestInterceptor i = (org.omg.PortableInterceptor.ServerRequestInterceptor) e
Expand Down Expand Up @@ -408,7 +411,8 @@ public void _OB_request() throws org.apache.yoko.orb.OB.LocationForward {

status_ = NO_REPLY;

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
java.util.Enumeration e = interceptors_.elements();
while (e.hasMoreElements()) {
((org.omg.PortableInterceptor.ServerRequestInterceptor) (e
Expand All @@ -434,7 +438,8 @@ public void _OB_sendReply() {
//
servant_ = null;

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
int curr = interceptors_.size() - 1;
while (!interceptors_.isEmpty()) {
org.omg.PortableInterceptor.ServerRequestInterceptor i = (org.omg.PortableInterceptor.ServerRequestInterceptor) interceptors_
Expand Down Expand Up @@ -469,7 +474,8 @@ public void _OB_sendException()
//
servant_ = null;

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
org.apache.yoko.orb.OB.Assert
._OB_assert(status_ == org.omg.PortableInterceptor.SYSTEM_EXCEPTION.value
|| status_ == org.omg.PortableInterceptor.USER_EXCEPTION.value);
Expand Down Expand Up @@ -511,7 +517,8 @@ public void _OB_sendOther() throws org.apache.yoko.orb.OB.LocationForward {
//
servant_ = null;

try (CmsfOverride cmsfo = CmsfThreadLocal.override()) {
try (CmsfOverride cmsfo = CmsfThreadLocal.override();
YasfOverride yasfo = YasfThreadLocal.override()) {
org.apache.yoko.orb.OB.Assert
._OB_assert(status_ == org.omg.PortableInterceptor.LOCATION_FORWARD.value
|| status_ == org.omg.PortableInterceptor.TRANSPORT_RETRY.value);
Expand Down
71 changes: 0 additions & 71 deletions yoko-core/src/main/java/org/apache/yoko/orb/yasf/Yasf.java

This file was deleted.

Expand Up @@ -5,6 +5,8 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.yoko.util.yasf.Yasf;
import org.apache.yoko.util.yasf.YasfThreadLocal;
import org.omg.CORBA.LocalObject;
import org.omg.PortableInterceptor.ClientRequestInfo;
import org.omg.PortableInterceptor.ClientRequestInterceptor;
Expand All @@ -15,7 +17,11 @@ public class YasfClientInterceptor extends LocalObject implements ClientRequestI

@Override
public void send_request(ClientRequestInfo ri) throws ForwardRequest {
ri.add_request_service_context(Yasf.build().sc(), false);
byte[] yasfData = YasfHelper.readData(ri);

YasfThreadLocal.push(Yasf.toSet(yasfData));

YasfHelper.addSc(ri, Yasf.supported());
}

@Override
Expand All @@ -24,14 +30,17 @@ public void send_poll(ClientRequestInfo ri) {

@Override
public void receive_reply(ClientRequestInfo ri) {
YasfThreadLocal.pop();
}

@Override
public void receive_exception(ClientRequestInfo ri) throws ForwardRequest {
YasfThreadLocal.pop();
}

@Override
public void receive_other(ClientRequestInfo ri) throws ForwardRequest {
YasfThreadLocal.pop();
}

@Override
Expand Down
87 changes: 87 additions & 0 deletions yoko-core/src/main/java/org/apache/yoko/orb/yasf/YasfHelper.java
@@ -0,0 +1,87 @@
package org.apache.yoko.orb.yasf;

import org.apache.yoko.util.yasf.Yasf;
import org.omg.CORBA.Any;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.INTERNAL;
import org.omg.CORBA.ORB;
import org.omg.IOP.ServiceContext;
import org.omg.IOP.TaggedComponent;
import org.omg.PortableInterceptor.ClientRequestInfo;
import org.omg.PortableInterceptor.IORInfo;
import org.omg.PortableInterceptor.InvalidSlot;
import org.omg.PortableInterceptor.ServerRequestInfo;

import java.util.Set;

import static org.apache.yoko.orb.OB.MinorCodes.MinorInvalidComponentId;
import static org.apache.yoko.orb.OB.MinorCodes.MinorInvalidServiceContextId;

/**
* Created by nrichard on 23/03/16.
*/
public enum YasfHelper {
;

public static void addTc(IORInfo info, Set<Yasf> set) {
TaggedComponent tc = new TaggedComponent(Yasf.TAG_YOKO_AUXILLIARY_STREAM_FORMAT, Yasf.toData(set));
info.add_ior_component(tc);
}

private static ServiceContext sc(Set<Yasf> set) {
return new ServiceContext(Yasf.YOKO_AUXIllIARY_STREAM_FORMAT_SC, Yasf.toData(set));
}

public static void addSc(ClientRequestInfo ri, Set<Yasf> set) {
ServiceContext sc = sc(set);
ri.add_request_service_context(sc, false);
}

public static void addSc(ServerRequestInfo ri, Set<Yasf> set) {
ServiceContext sc = sc(set);
ri.add_reply_service_context(sc, false);
}

public static byte[] readData(ClientRequestInfo ri) {
try {
TaggedComponent tc = ri.get_effective_component(Yasf.TAG_YOKO_AUXILLIARY_STREAM_FORMAT);
return tc.component_data;
} catch (BAD_PARAM e) {
if (e.minor != MinorInvalidComponentId) {
throw e;
}
}
return null;
}

public static byte[] readData(ServerRequestInfo ri) {
try {
ServiceContext sc = ri.get_request_service_context(Yasf.YOKO_AUXIllIARY_STREAM_FORMAT_SC);
return sc.context_data;
} catch (BAD_PARAM e) {
if (e.minor != MinorInvalidServiceContextId) {
throw e;
}
}
return null;
}

public static void setSlot(int slotId, ServerRequestInfo ri, byte[] data) {
Any any = ORB.init().create_any();
any.insert_Value(data);
try {
ri.set_slot(slotId, any);
} catch (InvalidSlot e) {
throw (INTERNAL)(new INTERNAL(e.getMessage())).initCause(e);
}
}

public static byte[] getSlot(int slotId, ServerRequestInfo ri) {
try {
Any any = ri.get_slot(slotId);
return (byte[])any.extract_Value();
} catch (InvalidSlot e) {
throw (INTERNAL)(new INTERNAL(e.getMessage())).initCause(e);
}
}
}
Expand Up @@ -5,6 +5,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.yoko.util.yasf.Yasf;
import org.omg.CORBA.LocalObject;
import org.omg.PortableInterceptor.IORInfo;
import org.omg.PortableInterceptor.IORInterceptor;
Expand All @@ -14,7 +15,7 @@ public class YasfIORInterceptor extends LocalObject implements IORInterceptor {

@Override
public void establish_components(IORInfo info) {
info.add_ior_component(Yasf.build().tc());
YasfHelper.addTc(info, Yasf.supported());
}

@Override
Expand Down
Expand Up @@ -5,6 +5,8 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.yoko.util.yasf.Yasf;
import org.apache.yoko.util.yasf.YasfThreadLocal;
import org.omg.CORBA.LocalObject;
import org.omg.PortableInterceptor.ForwardRequest;
import org.omg.PortableInterceptor.ServerRequestInfo;
Expand All @@ -13,8 +15,17 @@
public class YasfServerInterceptor extends LocalObject implements ServerRequestInterceptor {
private static final String NAME = YasfServerInterceptor.class.getName();

private final int slotId;

public YasfServerInterceptor(int slotId) {
this.slotId = slotId;
}

@Override
public void receive_request_service_contexts(ServerRequestInfo ri) throws ForwardRequest {
YasfThreadLocal.reset();
byte[] yasfData = YasfHelper.readData(ri);
YasfHelper.setSlot(slotId, ri, yasfData);
}

@Override
Expand All @@ -23,17 +34,23 @@ public void receive_request(ServerRequestInfo ri) throws ForwardRequest {

@Override
public void send_reply(ServerRequestInfo ri) {
ri.add_reply_service_context(Yasf.build().sc(), false);
YasfThreadLocal.push(Yasf.toSet(YasfHelper.getSlot(slotId, ri)));
// Adding for diagnostic purposes
YasfHelper.addSc(ri, Yasf.supported());
}

@Override
public void send_exception(ServerRequestInfo ri) throws ForwardRequest {
ri.add_reply_service_context(Yasf.build().sc(), false);
YasfThreadLocal.push(Yasf.toSet(YasfHelper.getSlot(slotId, ri)));
// Adding for diagnostic purposes
YasfHelper.addSc(ri, Yasf.supported());
}

@Override
public void send_other(ServerRequestInfo ri) throws ForwardRequest {
ri.add_reply_service_context(Yasf.build().sc(), false);
YasfThreadLocal.push(Yasf.toSet(YasfHelper.getSlot(slotId, ri)));
// Adding for diagnostic purposes
YasfHelper.addSc(ri, Yasf.supported());
}

@Override
Expand Down

0 comments on commit 55b7d4f

Please sign in to comment.