Skip to content
Permalink
Browse files
Implement CMSFv2 reading support, last part
  • Loading branch information
ngmr committed Apr 23, 2015
1 parent c32aceb commit 714e617fdc1528335c1cb8c322ec96ef9b8426c1
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 79 deletions.
@@ -448,13 +448,13 @@ public void execReceive() {
processException(State.Closed, ex, false);
break;
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("Message body received ");
int currentpos = buf.pos_;
buf.pos_ = 0;
logger.fine("Received message are: \n" + buf.dumpData());
buf.pos_ = currentpos;
}
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("Message body received ");
int currentpos = buf.pos_;
buf.pos_ = 0;
logger.fine("Received message are: \n" + buf.dumpData());
buf.pos_ = currentpos;
}

//
@@ -9,7 +9,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.yoko.rmi.cmsf.CmsfThreadLocalStack;
import org.apache.yoko.rmi.cmsf.CmsfThreadLocal;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.LocalObject;
import org.omg.IOP.TAG_RMI_CUSTOM_MAX_STREAM_FORMAT;
@@ -32,7 +32,7 @@ public void send_request(ClientRequestInfo ri) throws ForwardRequest {
throw e;
}
}
CmsfThreadLocalStack.push(cmsf.getValue());
CmsfThreadLocal.set(cmsf.getValue());

if (CmsfVersion.ENABLED) ri.add_request_service_context(CMSFv2.getSc(), false);
}
@@ -43,17 +43,17 @@ public void send_poll(ClientRequestInfo ri) {

@Override
public void receive_reply(ClientRequestInfo ri) {
CmsfThreadLocalStack.pop();
CmsfThreadLocal.reset();
}

@Override
public void receive_exception(ClientRequestInfo ri) throws ForwardRequest {
CmsfThreadLocalStack.pop();
CmsfThreadLocal.reset();
}

@Override
public void receive_other(ClientRequestInfo ri) throws ForwardRequest {
CmsfThreadLocalStack.pop();
CmsfThreadLocal.reset();
}

@Override
@@ -7,9 +7,11 @@
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.yoko.orb.OB.MinorCodes;
import org.apache.yoko.rmi.cmsf.CmsfThreadLocalStack;
import org.apache.yoko.orb.OB.IORUtil;
import org.apache.yoko.rmi.cmsf.CmsfThreadLocal;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.INTERNAL;
import org.omg.CORBA.LocalObject;
@@ -20,7 +22,8 @@
import org.omg.PortableInterceptor.ServerRequestInfo;
import org.omg.PortableInterceptor.ServerRequestInterceptor;

public class CmsfServerInterceptor extends LocalObject implements ServerRequestInterceptor {
public final class CmsfServerInterceptor extends LocalObject implements ServerRequestInterceptor {
private static final Logger LOGGER = Logger.getLogger(CmsfServerInterceptor.class.getName());
private static final String NAME = CmsfServerInterceptor.class.getName();

private final int slotId;
@@ -35,6 +38,9 @@ public void receive_request_service_contexts(ServerRequestInfo ri) throws Forwar
try {
ServiceContext sc = ri.get_request_service_context(RMICustomMaxStreamFormat.value);
cmsf = CmsfVersion.readData(sc.context_data);
if (LOGGER.isLoggable(Level.FINEST))
LOGGER.finest(String.format("Using custom marshal stream format version: %s, retrieved from bytes: %s",
cmsf, IORUtil.dump_octets(sc.context_data)));
} catch (BAD_PARAM e) {
if (e.minor != MinorInvalidServiceContextId) {
throw e;
@@ -49,28 +55,31 @@ public void receive_request_service_contexts(ServerRequestInfo ri) throws Forwar

@Override
public void receive_request(ServerRequestInfo ri) throws ForwardRequest {
}

private void setupCmsfThreadLocalValue(ServerRequestInfo ri) {
CmsfVersion cmsf = CMSFv1;
try {
cmsf = CmsfVersion.readAny(ri.get_slot(slotId));
} catch (InvalidSlot e) {
throw (INTERNAL)(new INTERNAL(e.getMessage())).initCause(e);
}
CmsfThreadLocalStack.push(cmsf.getValue());
CmsfThreadLocal.set(cmsf.getValue());
}

@Override
public void send_reply(ServerRequestInfo ri) {
CmsfThreadLocalStack.pop();
setupCmsfThreadLocalValue(ri);
}

@Override
public void send_exception(ServerRequestInfo ri) throws ForwardRequest {
CmsfThreadLocalStack.pop();
setupCmsfThreadLocalValue(ri);
}

@Override
public void send_other(ServerRequestInfo ri) throws ForwardRequest {
CmsfThreadLocalStack.pop();
setupCmsfThreadLocalValue(ri);
}

@Override
@@ -89,5 +98,4 @@ private void readObject(ObjectInputStream ios) throws IOException {
private void writeObject(ObjectOutputStream oos) throws IOException {
throw new NotSerializableException(NAME);
}

}
@@ -18,24 +18,19 @@
package test.rmi;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

import javax.rmi.CORBA.Stub;
import javax.rmi.CORBA.Util;
import javax.rmi.PortableRemoteObject;

import org.junit.Assert;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.Servant;

import junit.framework.Assert;

public class ClientMain extends Assert {

@@ -311,18 +306,18 @@ public void testComplexCorbaAttribute(SampleCorba corbaRef) throws RemoteExcepti
}

public void testHashMap() throws RemoteException {
HashMap map = new HashMap();
String str = "hello";
map.put(new Integer(0), str);
map.put(new Integer(1), str);
Integer i = new Integer(2);
map.put(new Integer(3), i);
map.put(new Integer(4), i);
HashMap<Integer, Serializable> map = new HashMap<>();
String str = new String("hello");
map.put(0, str);
map.put(1, str);
Integer two = new Integer(2);
map.put(3, two);
map.put(4, two);
sample.setSerializable(map);
HashMap map2 = (HashMap) sample.getSerializable();
Map<?,?> map2 = (Map<?,?>) sample.getSerializable();
assertEquals(map, map2);
assertTrue(map2.get(new Integer(3)) == map2.get(new Integer(4)));
assertTrue(map2.get(new Integer(0)) == map2.get(new Integer(1)));
assertSame(map2.get(3), map2.get(4));
assertSame(map2.get(0), map2.get(1));
}
}

@@ -0,0 +1,45 @@
package org.apache.yoko.rmi.cmsf;

import java.util.logging.Level;
import java.util.logging.Logger;

public final class CmsfThreadLocal {
private static final Logger LOGGER = Logger.getLogger(CmsfThreadLocal.class.getName());
private static final ThreadLocal<Version> value = new ThreadLocal<>();

private CmsfThreadLocal() {}

private enum Version {
CMSFv1(1), CMSFv2(2);

public final byte value;

private Version(int value) {
this.value = (byte)(value & 0xff);
}

static Version get(byte value) {
return (value >= 2) ? CMSFv2 : CMSFv1;
}
}

public static void set(byte cmsfv) {
final Version version = Version.get(cmsfv);
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer(String.format("CMSF thread local version set: %s", version));
value.set(version);
}

public static byte get() {
final Version version = value.get();
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer(String.format("CMSF thread local version retrieved: %s", version));
return version.value;
}

public static void reset() {
if (LOGGER.isLoggable(Level.FINER))
LOGGER.finer("CMSF thread local reset");
value.remove();
}
}

This file was deleted.

@@ -18,6 +18,15 @@ void delegateTo(ObjectReader delegate) {
this.delegate = delegate;
}

protected final Object readObjectOverride() throws ClassNotFoundException, IOException {
try {
delegate.enterRecursion();
return delegate.readAbstractObject();
} finally {
delegate.exitRecursion();
}
}

//////////////////////////////////////
// ONLY DELEGATE METHODS BELOW HERE //
//////////////////////////////////////
@@ -100,6 +109,7 @@ public void readFully(byte[] buf, int off, int len) throws IOException {
public int skipBytes(int len) throws IOException {
return delegate.skipBytes(len);
}
@Deprecated
public String readLine() throws IOException {
return delegate.readLine();
}
@@ -18,8 +18,6 @@

package org.apache.yoko.rmi.impl;

import org.apache.yoko.rmi.cmsf.CmsfThreadLocalStack;

import java.io.Externalizable;
import java.io.IOException;
import java.io.NotActiveException;
@@ -31,6 +29,8 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.yoko.rmi.cmsf.CmsfThreadLocal;

abstract class ObjectWriter extends ObjectOutputStream {
protected final Serializable object;

@@ -144,7 +144,7 @@ protected void beforeWriteData() throws IOException {

ObjectWriter(Serializable obj) throws IOException {
object = obj;
streamFormatVersion = CmsfThreadLocalStack.peek();
streamFormatVersion = CmsfThreadLocal.get();
}

private byte getStreamFormatVersion() {

0 comments on commit 714e617

Please sign in to comment.