Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;

import org.apache.zeppelin.resource.ResourceId;

/**
* message payload to invoke method of resource in the resourcepool
*/
public class InvokeResourceMethodEventMessage {
public final ResourceId resourceId;
public final String methodName;
public final String[] paramClassnames;
public final Object[] params;
public final String returnResourceName;

public InvokeResourceMethodEventMessage(
ResourceId resourceId,
String methodName,
Class[] paramtypes,
Object[] params,
String returnResourceName
) {
this.resourceId = resourceId;
this.methodName = methodName;
if (paramtypes != null) {
paramClassnames = new String[paramtypes.length];
for (int i = 0; i < paramClassnames.length; i++) {
paramClassnames[i] = paramtypes[i].getName();
}
} else {
paramClassnames = null;
}

this.params = params;
this.returnResourceName = returnResourceName;
}

public Class [] getParamTypes() throws ClassNotFoundException {
if (paramClassnames == null) {
return null;
}

Class [] types = new Class[paramClassnames.length];
for (int i = 0; i < paramClassnames.length; i++) {
types[i] = this.getClass().getClassLoader().loadClass(paramClassnames[i]);
}

return types;
}

public boolean shouldPutResultIntoResourcePool() {
return (returnResourceName != null);
}

@Override
public int hashCode() {
String hash = resourceId.hashCode() + methodName;
if (paramClassnames != null) {
for (String name : paramClassnames) {
hash += name;
}
}
if (returnResourceName != null) {
hash += returnResourceName;
}

return hash.hashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof InvokeResourceMethodEventMessage) {
InvokeResourceMethodEventMessage r = (InvokeResourceMethodEventMessage) o;
return r.hashCode() == hashCode();
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<>();
private final List<ResourceSet> getAllResourceResponse = new LinkedList<>();
private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
private final Map<InvokeResourceMethodEventMessage, Object> getInvokeResponse = new HashMap<>();
private final Gson gson = new Gson();

/**
Expand Down Expand Up @@ -164,6 +165,112 @@ public Object readResource(ResourceId resourceId) {
}
}

/**
* Invoke method and save result in resourcePool as another resource
* @param resourceId
* @param methodName
* @param paramTypes
* @param params
* @return
*/
@Override
public Object invokeMethod(
ResourceId resourceId,
String methodName,
Class[] paramTypes,
Object[] params) {
logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());

InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
resourceId,
methodName,
paramTypes,
params,
null);

synchronized (getInvokeResponse) {
// wait for previous response consumed
while (getInvokeResponse.containsKey(invokeMethod)) {
try {
getInvokeResponse.wait();
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
// send request
Gson gson = new Gson();

sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
gson.toJson(invokeMethod)));
// wait for response
while (!getInvokeResponse.containsKey(invokeMethod)) {
try {
getInvokeResponse.wait();
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
Object o = getInvokeResponse.remove(invokeMethod);
getInvokeResponse.notifyAll();
return o;
}
}

/**
* Invoke method and save result in resourcePool as another resource
* @param resourceId
* @param methodName
* @param paramTypes
* @param params
* @param returnResourceName
* @return
*/
@Override
public Resource invokeMethod(
ResourceId resourceId,
String methodName,
Class[] paramTypes,
Object[] params,
String returnResourceName) {
logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());

InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
resourceId,
methodName,
paramTypes,
params,
returnResourceName);

synchronized (getInvokeResponse) {
// wait for previous response consumed
while (getInvokeResponse.containsKey(invokeMethod)) {
try {
getInvokeResponse.wait();
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
// send request
Gson gson = new Gson();

sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
gson.toJson(invokeMethod)));
// wait for response
while (!getInvokeResponse.containsKey(invokeMethod)) {
try {
getInvokeResponse.wait();
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
getInvokeResponse.notifyAll();
return o;
}
}

/**
* Supposed to call from RemoteInterpreterEventPoller
*/
Expand Down Expand Up @@ -209,6 +316,42 @@ public void putResponseGetResource(String resourceId, ByteBuffer object) {
}


/**
* Supposed to call from RemoteInterpreterEventPoller
* @param invokeMessage json serialized InvokeMessage
* @param object java serialized of the object
*/
public void putResponseInvokeMethod(
InvokeResourceMethodEventMessage invokeMessage, ByteBuffer object) {
Object o = null;
try {
o = Resource.deserializeObject(object);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it fail / exit then without passing o along? Since o can be null in that case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to put null into getInvokeResponse, otherwise, https://github.com/apache/zeppelin/pull/1951/files#diff-5d91426e574ad85664eb7a7857ef6e90R207 may not finish.

}

synchronized (getInvokeResponse) {
getInvokeResponse.put(invokeMessage, o);
getInvokeResponse.notifyAll();
}
}

/**
* Supposed to call from RemoteInterpreterEventPoller
* @param invokeMessage invoke message
* @param resource remote resource
*/
public void putResponseInvokeMethod(
InvokeResourceMethodEventMessage invokeMessage, Resource resource) {
synchronized (getInvokeResponse) {
getInvokeResponse.put(invokeMessage, resource);
getInvokeResponse.notifyAll();
}
}


/**
* Supposed to call from RemoteInterpreterEventPoller
* @return next available event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -62,6 +64,8 @@ public class RemoteInterpreterEventPoller extends Thread {
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;

Gson gson = new Gson();

public RemoteInterpreterEventPoller(
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
Expand Down Expand Up @@ -117,8 +121,6 @@ public void run() {
interpreterProcess.releaseClient(client, broken);
}

Gson gson = new Gson();

AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();

try {
Expand Down Expand Up @@ -160,6 +162,12 @@ public void run() {
logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
Object o = getResource(resourceId);
sendResourceResponseGet(resourceId, o);
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
String message = event.getData();
InvokeResourceMethodEventMessage invokeMethodMessage =
gson.fromJson(message, InvokeResourceMethodEventMessage.class);
Object ret = invokeResourceMethod(invokeMethodMessage);
sendInvokeMethodResult(invokeMethodMessage, ret);
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
// on output append
Map<String, String> outputAppend = gson.fromJson(
Expand Down Expand Up @@ -383,8 +391,6 @@ private ResourceSet getAllResourcePoolExcept() {
return resourceSet;
}



private void sendResourceResponseGet(ResourceId resourceId, Object o) {
Client client = null;
boolean broken = false;
Expand Down Expand Up @@ -444,6 +450,87 @@ private Object getResource(ResourceId resourceId) {
return null;
}

public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) {
Client client = null;
boolean broken = false;
try {
client = interpreterProcess.getClient();
Gson gson = new Gson();
String invokeMessage = gson.toJson(message);
ByteBuffer obj;
if (o == null) {
obj = ByteBuffer.allocate(0);
} else {
obj = Resource.serializeObject(o);
}
client.resourceResponseInvokeMethod(invokeMessage, obj);
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
interpreterProcess.releaseClient(client, broken);
}
}
}

private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) {
ResourceId resourceId = message.resourceId;
InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}

RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
Resource res = localPool.get(resourceId.getName());
if (res != null) {
try {
return res.invokeMethod(
message.methodName,
message.getParamTypes(),
message.params,
message.returnResourceName);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
} else {
// object is null. can't invoke any method
logger.error("Can't invoke method {} on null object", message.methodName);
return null;
}
} else {
logger.error("no resource pool");
return null;
}
} else if (interpreterProcess.isRunning()) {
Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
ByteBuffer res = client.resourceInvokeMethod(
resourceId.getNoteId(),
resourceId.getParagraphId(),
resourceId.getName(),
gson.toJson(message));
Object o = Resource.deserializeObject(res);
return o;
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
}
}
return null;
}

private void waitQuietly() {
try {
synchronized (this) {
Expand Down
Loading