Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-10968 run-everywhere #3843

Merged
merged 13 commits into from Aug 24, 2016
33 changes: 33 additions & 0 deletions src/frontend/org/voltdb/client/AllPartitionProcedureCallback.java
@@ -0,0 +1,33 @@
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
*/

package org.voltdb.client;

/**
* interface for callbacks that are invoked when an asynchronously invoked transaction receives a response.
* Extend this class and provide an implementation of {@link #clientCallback} to receive a response to a
* stored procedure invocation on all partitions.
*/
public interface AllPartitionProcedureCallback {
/**
* Implementation of callback to be provided by client applications.
*
* @param clientResponse Responses for each partition to the stored procedure invocation this callback is associated with
* @throws Exception on any Exception.
*/
abstract public void clientCallback(PartitionClientResponse[] clientResponse) throws Exception;
}
32 changes: 32 additions & 0 deletions src/frontend/org/voltdb/client/Client.java
Expand Up @@ -422,4 +422,36 @@ public boolean updateClasses(ProcedureCallback callback,
*/
public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, boolean upsert, BulkLoaderFailureCallBack blfcb) throws Exception;
public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, BulkLoaderFailureCallBack blfcb) throws Exception;

/**
* <p>Synchronously invoke a procedure. Blocks until a result is available. A {@link ProcCallException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc isn't specific to the new functionality yet?

* is thrown if the response is anything other then success.</p>
*
* @param procedureName <code>class</code> name (not qualified by package) of the partitioned java procedure to execute.
* @param params list of procedure's parameter values.
* @return {@link PartitionClientResponse} instance of procedure call results.
* @throws ProcCallException on any VoltDB specific failure.
* @throws NoConnectionsException if this {@link Client} instance is not connected to any servers.
* @throws IOException if there is a Java network or connection problem.
*/
public PartitionClientResponse[] callAllPartitionProcedure(String procedureName, Object... params)
throws IOException, NoConnectionsException, ProcCallException;

/**
* <p>Asynchronously invoke a replicated procedure, by providing a callback that will be invoked by the single
* thread backing the client instance when the procedure invocation receives a response.
* See the {@link Client} class documentation for information on the negative performance impact of slow or
* blocking callbacks. If there is backpressure this call will block until the invocation is queued. If configureBlocking(false) is invoked
* then it will return immediately. Check the return value to determine if queueing actually took place.</p>
*
* @param callback {@link AllPartitionProcedureCallback} that will be invoked with procedure results.
* @param procedureName class name (not qualified by package) of the partitioned java procedure to execute.
* @param params list of procedure's parameter values.
* @return <code>true</code> if the procedure was queued and <code>false</code> otherwise.
* @throws NoConnectionsException if this {@link Client} instance is not connected to any servers.
* @throws IOException if there is a Java network or connection problem.
* @throws ProcCallException on any VoltDB specific failure.
*/
boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback, String procedureName, Object... params)
throws IOException, NoConnectionsException, ProcCallException;
}
111 changes: 110 additions & 1 deletion src/frontend/org/voltdb/client/ClientImpl.java
Expand Up @@ -22,9 +22,11 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -69,7 +71,7 @@ public final class ClientImpl implements Client, ReplicaProcCaller {
private final String m_username;
private final byte m_passwordHash[];
private final ClientAuthScheme m_hashScheme;

private List<Integer> m_partitionKeys;
/**
* These threads belong to the network thread pool
* that invokes callbacks. These threads are "blessed"
Expand Down Expand Up @@ -814,4 +816,111 @@ public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, BulkL
return new VoltBulkLoader(m_vblGlobals, tableName, maxBatchSize, blfcb);
}
}

@Override
public PartitionClientResponse[] callAllPartitionProcedure(String procedureName, Object... params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we make the sync version call the async version and block on the callback. This reduces duplicated code, and also doesn't serialize the calls, which isn't really what we want.

throws IOException, NoConnectionsException, ProcCallException {

Object[] args = new Object[params.length + 1];
System.arraycopy(params, 0, args, 1, params.length);
setPartitions();
int partitionCount = m_partitionKeys.size();
PartitionClientResponse[] responses = new PartitionClientResponse[partitionCount];
for (int key : m_partitionKeys) {
args[0] = key;
partitionCount--;
ClientResponse response = callProcedure(procedureName, args);
responses[partitionCount] = new PartitionClientResponse(key, response);
}
return responses;
}

@Override
public boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback, String procedureName,
Object... params) throws IOException, NoConnectionsException, ProcCallException {

Object[] args = new Object[params.length + 1];
System.arraycopy(params, 0, args, 1, params.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all procedures partition on the first parameter. We could add the restriction that all-partition procs must I guess. We do have this information for client affinity purposes as well.

Also, does this work with strings or ints? Is this another restriction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Form what I understand, the first arg on procedure.run(...) will be used for partition lookup. If a proc has two parameters, procedure.run(...) must have 3 arguments. We get a list of partition keys and iterate through. The column partition parameter is not used for partition lookup. I could be wrong.

I will dig into more on the types of the partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't specify a parameter offset when you partition a proc, we assume it's the first parameter. That's only a default though.

The rest makes sense. Feel free to ask me questions face-to-face if you want to go over more of this.


setPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only fetches the keys the first time and won't work if the topology ever changes. It might be better to integrate this data with the client affinity data kept in the distributer. We might even want to extend the subscription thing that gets us periodic details from the server side to ensure it has what we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will look into it.

int partitionCount = m_partitionKeys.size();
PartitionClientResponse[] responses = new PartitionClientResponse[partitionCount];
CountDownLatch responseWaiter = new CountDownLatch(partitionCount);
for (int key : m_partitionKeys) {
args[0] = key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about the UX of calling a proc that doesn't partition on the first parameter. The user will currently get a terrible and unhelpful error. What can we do to make this better?

partitionCount--;
PartitionProcedureCallback cb = new PartitionProcedureCallback(responseWaiter, key, partitionCount, responses);
if(!callProcedure(cb, procedureName, args)){
return false;
}
}

try {
responseWaiter.await();
callback.clientCallback(responses);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually asynchronous. We're blocking here.

We probably want to use a specialized callback for each call that aggregates responses, and calls the users callback if it's the last one.

} catch (Exception e) {
throw new IOException (e);
}

return true;
}

/**
* Set up partitions.
* @throws ProcCallException on any VoltDB specific failure.
* @throws NoConnectionsException if this {@link Client} instance is not connected to any servers.
* @throws IOException if there is a Java network or connection problem.
*/
private void setPartitions() throws IOException, NoConnectionsException, ProcCallException {
if(m_partitionKeys == null){
m_partitionKeys = new ArrayList<Integer>();
VoltTable results[] = callProcedure("@GetPartitionKeys", "integer").getResults();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the client Distributer Affinity to get the type of partition parameter so that we do not introduce new limitations on partition column type. Not necessary to be integer always.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a dumb trick. You can always use integer partition keys with quotes around them. "1" and 1 will always route to the same place, intentionally. So you shouldn't have to call getPartitionKeys for both types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, great!

VoltTable keys = results[0];
for (int k = 0; k < keys.getRowCount(); k++) {
m_partitionKeys.add((int)(keys.fetchRow(k).getLong((1))));
}
}
}

/**
* Return a list of partition keys
* @throws ProcCallException on any VoltDB specific failure.
* @throws NoConnectionsException if this {@link Client} instance is not connected to any servers.
* @throws IOException if there is a Java network or connection problem.
*/
public List<Integer> getPartitionKeys() throws NoConnectionsException, IOException, ProcCallException {
setPartitions();
return m_partitionKeys;
}

/**
* Procedure call back used in {@link ClientImpl#callAllPartitionProcedure(AllPartitionProcedureCallback, String, Object...)}
*/
class PartitionProcedureCallback implements ProcedureCallback {

final CountDownLatch m_responseWaiter;
final PartitionClientResponse[] m_responses;
final int m_index;
final Object m_partitionKey;

/**
* Callback initialization
* @param responseWaiter The count down latch
* @param partitionKey The partition where the call back works on
* @param index The index for PartitionClientResponse
* @param responses The final result array
*/
public PartitionProcedureCallback(CountDownLatch responseWaiter, Object partitionKey, int index, PartitionClientResponse[] responses){
m_responseWaiter = responseWaiter;
m_partitionKey = partitionKey;
m_index = index;
m_responses = responses;
}

@Override
public void clientCallback(ClientResponse clientResponse) {
m_responses[m_index] = new PartitionClientResponse(m_partitionKey, clientResponse);
m_responseWaiter.countDown();
}
}
}
45 changes: 45 additions & 0 deletions src/frontend/org/voltdb/client/PartitionClientResponse.java
@@ -0,0 +1,45 @@
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
*/

package org.voltdb.client;

import org.voltdb.client.ClientResponse;

/**
* Packages up the data to be sent back to the client as a stored
* procedure response for a partition
*
*/
public class PartitionClientResponse {

private final Object m_partitionKey;
private ClientResponse m_clientResponse;

public PartitionClientResponse(Object partitionKey, ClientResponse clientResponse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be public.

m_partitionKey = partitionKey;
m_clientResponse = clientResponse;
}

public Object getPartitionKey() {
return m_partitionKey;
}

public ClientResponse getClientResponse() {
return m_clientResponse;
}

}
14 changes: 13 additions & 1 deletion tests/frontend/org/voltdb/client/MockVoltClient.java
Expand Up @@ -137,7 +137,6 @@ public long getClientRoundtripNanos() {
// TODO Auto-generated method stub
return 0;
}

};
}

Expand Down Expand Up @@ -384,4 +383,17 @@ public boolean callProcedureWithTimeout(ProcedureCallback callback,
return false;
}

@Override
public PartitionClientResponse[] callAllPartitionProcedure(String procedureName, Object... params) throws IOException, NoConnectionsException, ProcCallException{
// TODO Auto-generated method stub
return null;
}

@Override
public boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback, String procedureName,
Object... params) throws IOException, NoConnectionsException, ProcCallException {
// TODO Auto-generated method stub
return false;
}

}