Skip to content
Permalink
Browse files

Hooked together ClientInterface with HStoreSite. We are still going t…

…o support the old VoltProcedureListener stuff for a bit longer... although now that I type this I realize that it's all disconnected anyway, so it doesn't really matter...
  • Loading branch information...
apavlo committed Jun 11, 2012
1 parent aa8fccc commit ce0d3fd8ac4491a099c4f398bb568393732666fa
@@ -52,6 +52,8 @@
import org.voltdb.utils.DumpManager;

import edu.brown.hstore.interfaces.Shutdownable;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.utils.EventObservable;
import edu.brown.utils.EventObserver;

@@ -63,6 +65,11 @@
*/
public class ClientInterface implements DumpManager.Dumpable, Shutdownable {
private static final Logger LOG = Logger.getLogger(ClientInterface.class);
private static final LoggerBoolean debug = new LoggerBoolean(LOG.isDebugEnabled());
private static final LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled());
static {
LoggerUtil.attachObserver(LOG, debug, trace);
}

private final HStoreSite hstore_site;

@@ -641,7 +648,7 @@ public static ClientInterface create(

public void increaseBackpressure(int messageSize)
{
LOG.info("Increasing Backpressure: " + messageSize);
if (debug.get()) LOG.debug("Increasing Backpressure: " + messageSize);

m_pendingTxnBytes += messageSize;
m_pendingTxnCount++;
@@ -656,7 +663,7 @@ public void increaseBackpressure(int messageSize)

public void reduceBackpressure(int messageSize)
{
LOG.info("Reducing Backpressure: " + messageSize);
if (debug.get()) LOG.debug("Reducing Backpressure: " + messageSize);

m_pendingTxnBytes -= messageSize;
m_pendingTxnCount--;
@@ -28,7 +28,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -68,6 +67,7 @@
import org.voltdb.utils.EstTime;
import org.voltdb.utils.EstTimeUpdater;
import org.voltdb.utils.Pair;
import org.voltdb.utils.DBBPool.BBContainer;

import com.google.protobuf.RpcCallback;

@@ -1169,7 +1169,7 @@ public void run() {
// XXX: We have to join on all of our PartitionExecutor threads
try {
for (Thread t : this.executor_threads) {
t.join();
if (t != null) t.join();
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
@@ -1335,17 +1335,38 @@ public boolean isShuttingDown() {

protected void queueInvocation(ByteBuffer buffer, ClientInputHandler handler, Connection c) {
int messageSize = buffer.capacity();
ClientResponseCallback callback = new ClientResponseCallback(clientInterface, c, messageSize);
RpcCallback<ClientResponseImpl> callback = new ClientResponseCallback(this.clientInterface, c, messageSize);
this.clientInterface.increaseBackpressure(messageSize);
this.queueInvocation(buffer, callback);

if (this.preProcessorQueue != null) {
this.preProcessorQueue.add(Pair.of(buffer, callback));
} else {
this.processInvocation(buffer, callback);
}
}

@Override
public void queueInvocation(ByteBuffer buffer, RpcCallback<ClientResponseImpl> clientCallback) {
public void queueInvocation(ByteBuffer buffer, final RpcCallback<byte[]> clientCallback) {
// HACK
RpcCallback<ClientResponseImpl> wrapperCallback = new RpcCallback<ClientResponseImpl>() {
@Override
public void run(ClientResponseImpl parameter) {
FastSerializer fs = getOutgoingSerializer();
try {
BBContainer bb = fs.writeObjectForMessaging(parameter);
clientCallback.run(bb.b.array());
} catch (IOException ex) {
throw new RuntimeException(ex);
} finally {
fs.clear();
}
}
};

if (this.preProcessorQueue != null) {
this.preProcessorQueue.add(Pair.of(buffer, clientCallback));
this.preProcessorQueue.add(Pair.of(buffer, wrapperCallback));
} else {
this.processInvocation(buffer, clientCallback);
this.processInvocation(buffer, wrapperCallback);
}
}

@@ -37,6 +37,12 @@

private final AtomicInteger numConnections = new AtomicInteger(0);


public static interface Handler {
public long getInstanceId();
public void queueInvocation(ByteBuffer serializedRequest, RpcCallback<byte[]> clientCallback);
}

public VoltProcedureListener(int hostId, EventLoop eventLoop, Handler handler) {
this.hostId = hostId;
this.eventLoop = eventLoop;
@@ -260,11 +266,6 @@ public void setServerSocketForTest(ServerSocketChannel serverSocket) {
this.serverSocket = serverSocket;
}

public static interface Handler {
public long getInstanceId();
public void queueInvocation(ByteBuffer serializedRequest, RpcCallback<byte[]> clientCallback);
}

public static void main(String[] vargs) throws Exception {
// Example of using VoltProcedureListener: prints procedure name, returns empty array
NIOEventLoop eventLoop = new NIOEventLoop();
@@ -3,26 +3,18 @@
*/
package edu.brown.hstore.callbacks;

import org.apache.log4j.Logger;
import org.voltdb.ClientResponseImpl;
import org.voltdb.network.Connection;

import com.google.protobuf.RpcCallback;

import edu.brown.hstore.ClientInterface;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;

/**
* Thin wrapper to sent a ClientResponse back to the client over a Connection handle
* @author pavlo
*/
public class ClientResponseCallback implements RpcCallback<ClientResponseImpl> {
private static final Logger LOG = Logger.getLogger(ClientResponseCallback.class);
private static final LoggerBoolean debug = new LoggerBoolean(LOG.isDebugEnabled());
private static final LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled());
static {
LoggerUtil.attachObserver(LOG, debug, trace);
}

private final ClientInterface clientInterface;
private final Connection conn;
@@ -37,7 +29,6 @@ public ClientResponseCallback(ClientInterface clientInterface, Connection conn,

@Override
public void run(ClientResponseImpl parameter) {
LOG.info("Sending back ClientResponse to " + this.conn.getHostname() + "\n" + parameter);
boolean ret = this.conn.writeStream().enqueue(parameter);
if (ret == false) {
throw new RuntimeException("Unable to write ClientResponse on output stream?");
@@ -113,8 +113,6 @@ private ClientResponse execQuery(Client client, String query) throws Exception {
* @throws Exception
*/
private ClientResponse execProcedure(Client client, String procName, String query) throws Exception {
ClientResponse cresponse = null;

Procedure catalog_proc = this.catalog_db.getProcedures().getIgnoreCase(procName);
if (catalog_proc == null) {
throw new Exception("Invalid stored procedure name '" + procName + "'");
@@ -144,7 +142,7 @@ private ClientResponse execProcedure(Client client, String procName, String quer

LOG.info(String.format("Executing %s(%s)",
catalog_proc.getName(), StringUtil.join(", ", procParams)));
client.callProcedure(catalog_proc.getName(), procParams.toArray());
ClientResponse cresponse = client.callProcedure(catalog_proc.getName(), procParams.toArray());
return (cresponse);
}

@@ -255,8 +253,11 @@ public void run() {

// Just print out the result
if (cresponse != null) {
System.out.println("Server Response: " + cresponse.getStatus());
VoltTable[] results = cresponse.getResults();
System.out.println(StringUtil.join("\n", results));
} else {
LOG.warn("Return result is null");
}

// Fatal Error
@@ -460,7 +460,6 @@ private Runnable getPortCallRunnable(final VoltPort port) {
return new Runnable() {
@Override
public void run() {
m_logger.info("Invoking call() on " + port);
try {
port.call();
} catch (Exception e) {

0 comments on commit ce0d3fd

Please sign in to comment.
You can’t perform that action at this time.