Skip to content

Commit

Permalink
Fixing serialization of AsyncResponseInvoker in support of async serv…
Browse files Browse the repository at this point in the history
…ices under TUSCANY-3783

git-svn-id: https://svn.apache.org/repos/asf/tuscany/sca-java-2.x/trunk@1062814 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Mike Edwards committed Jan 24, 2011
1 parent 4c8ca1c commit 989bd0d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 21 deletions.
Expand Up @@ -111,6 +111,23 @@ public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
return endpointreferences;
}

/**
* Returns a list of EndpointReferences that have a URI that matches a given URI
* @param uri - the URI to match
* @return a List of EndpointReferences that match the supplied URI - if there are none
* an *empty* list is returned (not null)
*/
public List<EndpointReference> findEndpointReferences( String uri ) {
List<EndpointReference> theRefs = new ArrayList<EndpointReference>();
if( uri == null ) return theRefs;

for( EndpointReference ref : endpointreferences ) {
if( uri.equals(ref.getURI()) ) theRefs.add(ref);
} // end for

return theRefs;
} // end method findEndpointReference

public abstract Endpoint getEndpoint(String uri);

Expand Down
Expand Up @@ -63,6 +63,14 @@ public interface EndpointRegistry {
void removeEndpointReference(EndpointReference endpointReference);
// List<EndpointReference> findEndpointReference(Endpoint endpoint);
List<EndpointReference> getEndpointReferences();

/**
* Returns a list of EndpointReferences that have a URI that matches a given URI
* @param uri - the URI to match
* @return a List of EndpointReferences that match the supplied URI - if there are none
* an *empty* list is returned (not null)
*/
public List<EndpointReference> findEndpointReferences( String uri );

void addListener(EndpointListener listener);
void removeListener(EndpointListener listener);
Expand Down
Expand Up @@ -25,12 +25,16 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointImpl;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
Expand All @@ -40,6 +44,7 @@
import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
import org.apache.tuscany.sca.runtime.DomainRegistryFactory;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.EndpointSerializer;
import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
Expand All @@ -60,15 +65,21 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
*/
private static final long serialVersionUID = -7992598227671386588L;

private RuntimeEndpoint requestEndpoint;
private RuntimeEndpointReference responseEndpointReference;
private transient RuntimeEndpoint requestEndpoint;
private transient RuntimeEndpointReference responseEndpointReference;
private T responseTargetAddress;
private String relatesToMsgID;
private String operationName;
private MessageFactory messageFactory;
private String bindingType = "";
private boolean isNativeAsync;

private String endpointURI;
private String endpointReferenceURI;
private String domainURI;

private transient EndpointRegistry endpointRegistry;

public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
RuntimeEndpointReference responseEndpointReference,
T responseTargetAddress, String relatesToMsgID,
Expand All @@ -81,6 +92,20 @@ public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
this.operationName = operationName;
this.messageFactory = messageFactory;

CompositeContext context = null;
if(requestEndpoint != null ) {
endpointURI = requestEndpoint.getURI();
context = requestEndpoint.getCompositeContext();
} // end if
if(responseEndpointReference != null ) {
endpointReferenceURI = responseEndpointReference.getURI();
context = responseEndpointReference.getCompositeContext();
}

if( context != null ) {
domainURI = context.getDomainURI();
} // end if

if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
(((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
isNativeAsync = true;
Expand Down Expand Up @@ -126,8 +151,9 @@ public void setRelatesToMsgID(String relatesToMsgID) {
* - this method creates a Tuscany message
*
* @param args the response data
* @param headers - any header
*/
public void invokeAsyncResponse(Object args) {
public void invokeAsyncResponse(Object args, Map<String, Object> headers) {

Message msg = messageFactory.createMessage();

Expand All @@ -150,6 +176,10 @@ public void invokeAsyncResponse(Object args) {
msg.setTo(requestEndpoint);
msg.setFrom(responseEndpointReference);

if( headers != null ) {
msg.getHeaders().putAll(headers);
}

if( args instanceof Throwable ) {
msg.setFaultBody(args);
} else {
Expand Down Expand Up @@ -199,27 +229,111 @@ public void setResponseEndpointReference(
this.responseEndpointReference = responseEndpointReference;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
in.defaultReadObject();

// find the real endpoint
ExtensionPointRegistry extensionPointRegistry = NodeFactory.getInstance().getExtensionPointRegistry();
DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(extensionPointRegistry);
Collection<EndpointRegistry> endpointRegistries = domainRegistryFactory.getEndpointRegistries();
EndpointRegistry endpointRegistry = endpointRegistries.iterator().next();
List<Endpoint> endpoints = endpointRegistry.findEndpoint(requestEndpoint.getURI());
requestEndpoint = (RuntimeEndpoint)endpoints.get(0);
requestEndpoint = retrieveEndpoint(endpointURI);
responseEndpointReference = retrieveEndpointReference(endpointReferenceURI);

if (responseTargetAddress instanceof EndpointReference){
// fix the target as in this case it will be an EPR
EndpointReference epr = (EndpointReference)responseTargetAddress;
List<EndpointReference> endpointReferences = endpointRegistry.getEndpointReferences();
for (EndpointReference endpointReference : endpointReferences){
if (endpointReference.getURI().equals(epr.getURI())){
responseTargetAddress = (T)endpointReference;
}
}
}
}
responseTargetAddress = (T)retrieveEndpointReference(epr.getURI());
} // end if
} // end method readObject

/**
* Fetches the EndpointReference identified by an endpoint reference URI
* @param uri - the URI of the endpoint reference
* @return - the EndpointReference matching the supplied URI - null if no EPR is found which
* matches the URI
*/
private RuntimeEndpointReference retrieveEndpointReference(String uri) {
if( uri == null ) return null;
if( endpointRegistry == null ) return null;
List<EndpointReference> refs = endpointRegistry.findEndpointReferences( uri );
// If there is more than EndpointReference with the uri...
if( refs.isEmpty() ) return null;
// TODO: what if there is more than 1 EPR with the given URI?
return (RuntimeEndpointReference) refs.get(0);
} // end method retrieveEndpointReference

/**
* Fetches the Endpoint identified by an endpoint URI
* - the Endpoint is retrieved from the EndpointRegistry
* @param uri - the URI of the Endpoint
* @return - the Endpoint corresponding to the URI, or null if no Endpoint is found which has the
* supplied URI
*/
private RuntimeEndpoint retrieveEndpoint(String uri) {
if( uri == null ) return null;
if( endpointRegistry == null ) endpointRegistry = getEndpointRegistry( uri );
if( endpointRegistry == null ) return null;
// TODO what if more than one Endpoint gets returned??
return (RuntimeEndpoint) endpointRegistry.findEndpoint(uri).get(0);
} // end method retrieveEndpoint

/**
* Gets the EndpointRegistry which contains an Endpoint with the supplied URI
* @param uri - The URI of an Endpoint
* @return - the EndpointRegistry containing the Endpoint with the supplied URI - null if no
* such EndpointRegistry can be found
*/
private EndpointRegistry getEndpointRegistry(String uri) {
ExtensionPointRegistry registry = null;
EndpointRegistry endpointRegistry = null;

CompositeContext context = CompositeContext.getCurrentCompositeContext();
if( context == null && requestEndpoint != null ) context = requestEndpoint.getCompositeContext();
if( context != null ) {
registry = context.getExtensionPointRegistry();
endpointRegistry = getEndpointRegistry( registry );
if( endpointRegistry != null ) return endpointRegistry;
} // end if

// Deal with the case where there is no context available
for( NodeFactory factory : NodeFactory.getNodeFactories() ) {
registry = factory.getExtensionPointRegistry();
if( registry != null ) {
// Find the actual Endpoint in the EndpointRegistry
endpointRegistry = getEndpointRegistry( registry );

if( endpointRegistry != null ) {
for( Endpoint endpoint : endpointRegistry.findEndpoint(uri) ) {
// TODO: For the present, simply return the first registry with a matching endpoint
return endpointRegistry;
} // end for
} // end if
} // end if
} // end for

return null;
} // end method getEndpointRegistry

/**
* Get the EndpointRegistry
* @param registry - the ExtensionPoint registry
* @return the EndpointRegistry - will be null if the EndpointRegistry cannot be found
*/
private EndpointRegistry getEndpointRegistry( ExtensionPointRegistry registry) {
DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);

if( domainRegistryFactory == null ) return null;

// Find the first endpoint registry that matches the domain name
if( domainURI != null ) {
for( EndpointRegistry endpointRegistry : domainRegistryFactory.getEndpointRegistries() ) {
if( domainURI.equals( endpointRegistry.getDomainURI() ) ) return endpointRegistry;
} // end for
} // end if

// if there was no domainName to match, simply return the first EndpointRegistry...
EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0];


return endpointRegistry;
} // end method

} // end class
Expand Up @@ -138,7 +138,7 @@ public void sendFault(Throwable e) {
// Use response invoker if present
if( respInvoker != null ) {
//respInvoker.invokeAsyncResponse(new AsyncFaultWrapper(e));
respInvoker.invokeAsyncResponse(e);
respInvoker.invokeAsyncResponse(e, null);
return;
} // end if

Expand All @@ -153,6 +153,7 @@ public void sendFault(Throwable e) {
* @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously
* @param res - the response message, which is of type T
*/
@SuppressWarnings("unchecked")
public void sendResponse(T res) {
if( sendOK() ) {
lock.lock();
Expand All @@ -175,7 +176,7 @@ public void sendResponse(T res) {

// Use response invoker if present
if( respInvoker != null ) {
respInvoker.invokeAsyncResponse(res);
respInvoker.invokeAsyncResponse(res, null);
return;
} // end if
} // end method sendResponse
Expand Down

0 comments on commit 989bd0d

Please sign in to comment.