Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-10968 run-everywhere #3843

Merged
merged 13 commits into from Aug 24, 2016
55 changes: 28 additions & 27 deletions src/frontend/org/voltdb/client/ClientImpl.java
Expand Up @@ -48,6 +48,11 @@
*/
public final class ClientImpl implements Client, ReplicaProcCaller {

/*
* refresh the partition key cache every 5 min
*/
static long PARTITION_KEYS_INFO_REFRESH_FREQUENCY = 5 * 60 * 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frequency is too large, our window example use once a second. We may need a similar value for it.


// call initiated by the user use positive handles
private final AtomicLong m_handle = new AtomicLong(0);

Expand Down Expand Up @@ -821,43 +826,38 @@ public VoltBulkLoader getNewBulkLoader(String tableName, int maxBatchSize, BulkL
@Override
public ClientResponseWithPartitionKey[] callAllPartitionProcedure(String procedureName, Object... params)
throws IOException, NoConnectionsException, ProcCallException {
SyncAllPartitionProcedureCallback callBack = new SyncAllPartitionProcedureCallback();
OnePartitionProcedureCallback callBack = new OnePartitionProcedureCallback();
callAllPartitionProcedure(callBack, procedureName, params);
return callBack.getResponse();
}

@Override
public boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback, String procedureName,
Object... params) throws IOException, NoConnectionsException, ProcCallException {
if(callback == null) {
if (callback == null) {
throw new IOException("AllPartitionProcedureCallback can not be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an IOException or an IllegalArgumentException?

}

Object[] args = new Object[params.length + 1];
System.arraycopy(params, 0, args, 1, params.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all procedures partition on the first parameter. We could add the restriction that all-partition procs must I guess. We do have this information for client affinity purposes as well.

Also, does this work with strings or ints? Is this another restriction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Form what I understand, the first arg on procedure.run(...) will be used for partition lookup. If a proc has two parameters, procedure.run(...) must have 3 arguments. We get a list of partition keys and iterate through. The column partition parameter is not used for partition lookup. I could be wrong.

I will dig into more on the types of the partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't specify a parameter offset when you partition a proc, we assume it's the first parameter. That's only a default though.

The rest makes sense. Feel free to ask me questions face-to-face if you want to go over more of this.


int partitionCount = getPartitionIntegerKeys().size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is going to call GetPartitionKeys for every call? That's not very efficient, especially because we already have a mechanism in Distributor for being notified of topology changes. We should only get new partition keys when the topology changes.

assert(partitionCount > 0);
ClientResponseWithPartitionKey[] responses = new ClientResponseWithPartitionKey[partitionCount];
AtomicInteger counter = new AtomicInteger(partitionCount);
for (Integer key : getPartitionIntegerKeys()) {
args[0] = key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about the UX of calling a proc that doesn't partition on the first parameter. The user will currently get a terrible and unhelpful error. What can we do to make this better?

partitionCount--;
PartitionProcedureCallback cb = new PartitionProcedureCallback(counter, key, partitionCount, responses, callback);
try{
if(callback instanceof SyncAllPartitionProcedureCallback){
try {
if (callback instanceof OnePartitionProcedureCallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I do not make it clear.

Keep "SyncAllPartitionProcedureCallback", but rename "PartitionProcedureCallback" as "OnePartitionProcedureCallback".

ClientResponse response = callProcedure(procedureName, args);
cb.clientCallback(response);
} else {
if(!callProcedure(cb, procedureName, args)){
if (!callProcedure(cb, procedureName, args)) {
cb.clientCallback(null);
}
}
} catch (ProcCallException | NoConnectionsException pe){
try {
cb.exceptionCallback(pe);
} catch (Exception e) {
throw new IOException(e);
}
} catch(Exception ex){
try {
cb.exceptionCallback(ex);
Expand All @@ -875,15 +875,16 @@ public boolean callAllPartitionProcedure(AllPartitionProcedureCallback callback,
* @throws NoConnectionsException if this {@link Client} instance is not connected to any servers.
* @throws IOException if there is a Java network or connection problem.
*/
private void setPartitions() throws IOException, NoConnectionsException, ProcCallException {
if(m_partitionIntegerKeys.isEmpty()){
VoltTable results[] = callProcedure("@GetPartitionKeys", "integer").getResults();
VoltTable keys = results[0];
for (int k = 0; k < keys.getRowCount(); k++) {
m_partitionIntegerKeys.add((int)(keys.fetchRow(k).getLong(1)));
}
m_lastPartitionKeyFetched = System.currentTimeMillis();
private void refreshPartitionKeys() throws IOException, NoConnectionsException, ProcCallException {
if (! m_partitionIntegerKeys.isEmpty()) {
return;
}
VoltTable results[] = callProcedure("@GetPartitionKeys", "integer").getResults();
VoltTable keys = results[0];
for (int k = 0; k < keys.getRowCount(); k++) {
m_partitionIntegerKeys.add((int)(keys.fetchRow(k).getLong(1)));
}
m_lastPartitionKeyFetched = System.currentTimeMillis();
}

/**
Expand All @@ -893,16 +894,16 @@ private void setPartitions() throws IOException, NoConnectionsException, ProcCal
* @throws IOException if there is a Java network or connection problem.
*/
public List<Integer> getPartitionIntegerKeys() throws NoConnectionsException, IOException, ProcCallException {
if(m_distributer.getPartitionKeys().size() > 0){
if (m_distributer.getPartitionKeys().size() > 0) {
return m_distributer.getPartitionKeys();
}

long time = System.currentTimeMillis() - m_lastPartitionKeyFetched;
if(time > 5 * 60 * 1000){
if (time > PARTITION_KEYS_INFO_REFRESH_FREQUENCY) {
m_partitionIntegerKeys.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is really async. Why clear and then populate this list? That will leave it unpopulated while you fetch?

Fetch it once a second and then atomic swap the pointers. Or don't if the lists are identical.

You may want to consider making the list itself immutable using a Guava immutable set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will make the change

}

setPartitions();
refreshPartitionKeys();
return m_partitionIntegerKeys;
}

Expand Down Expand Up @@ -935,16 +936,16 @@ public PartitionProcedureCallback(AtomicInteger counter, Object partitionKey, in

@Override
public void clientCallback(ClientResponse response) throws Exception {
if(response != null) {
if (response != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would response be null? What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if an asyc call is not queued, the stored proc process will not enter clientCallback. To ensure that every partition gets its ClientResponse, client will call clientCallback(null) to set up ClientResponse for the partition.

Copy link
Contributor

@jhugg jhugg Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So you fake the client response here. Why not just fake the client response above where it catches the exception? Then you know all procs get callbacks and you have to do less work here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same thing above when it returns false or throws and exception, and you don't need the exception callback code below either.

m_responses[m_index] = new ClientResponseWithPartitionKey(m_partitionKey, response);
} else {
final ClientResponse r = new ClientResponseImpl(ClientResponse.GRACEFUL_FAILURE, new VoltTable[0],
"The procedure is not queued for execution.");
m_responses[m_index] = new ClientResponseWithPartitionKey(m_partitionKey, r,
"The procedure is not queued for execution.", null);
r.getStatusString(), null);
}
m_partitionCounter.decrementAndGet();
if(m_partitionCounter.get() == 0){
if (m_partitionCounter.get() == 0) {
m_cb.clientCallback(m_responses);
}
}
Expand All @@ -964,7 +965,7 @@ public void exceptionCallback(Exception e) throws Exception {
}

m_partitionCounter.decrementAndGet();
if(m_partitionCounter.get() == 0){
if (m_partitionCounter.get() == 0) {
m_cb.clientCallback(m_responses);
}
}
Expand All @@ -973,7 +974,7 @@ public void exceptionCallback(Exception e) throws Exception {
/**
* Sync all partition procedure call back
*/
class SyncAllPartitionProcedureCallback implements AllPartitionProcedureCallback {
private class OnePartitionProcedureCallback implements AllPartitionProcedureCallback {

ClientResponseWithPartitionKey[] m_responses;
@Override
Expand Down
37 changes: 37 additions & 0 deletions tests/frontend/org/voltdb/client/IntPartitionCallTestProc.java
@@ -0,0 +1,37 @@
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.voltdb.client;

import org.voltdb.SQLStmt;
import org.voltdb.VoltProcedure;
import org.voltdb.VoltTable;

public class IntPartitionCallTestProc extends VoltProcedure {

public final SQLStmt stmt = new SQLStmt("SELECT count(*) FROM TABLE_INT_PARTITION;");

public VoltTable[] run(int partitionKey) {
voltQueueSQL(stmt);
return voltExecuteSQL(true);
}
}
38 changes: 38 additions & 0 deletions tests/frontend/org/voltdb/client/StringPartitionCallTestProc.java
@@ -0,0 +1,38 @@
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/

package org.voltdb.client;

import org.voltdb.SQLStmt;
import org.voltdb.VoltProcedure;
import org.voltdb.VoltTable;

public class StringPartitionCallTestProc extends VoltProcedure {

public final SQLStmt stmt = new SQLStmt("SELECT count(*) FROM TABLE_STRING_PARTITION;");

public VoltTable[] run(String partitionKey) {
voltQueueSQL(stmt);
return voltExecuteSQL(true);
}
}
161 changes: 161 additions & 0 deletions tests/frontend/org/voltdb/client/TestAllPartitionProcedureCalls.java
@@ -0,0 +1,161 @@
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.voltdb.client;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.voltdb.BackendTarget;
import org.voltdb.VoltTable;
import org.voltdb.compiler.VoltProjectBuilder;
import org.voltdb.regressionsuites.LocalCluster;
import org.voltdb.regressionsuites.MultiConfigSuiteBuilder;
import org.voltdb.regressionsuites.RegressionSuite;
import org.voltdb.utils.VoltFile;

/**
* Test client all partition calls
*
*/
public class TestAllPartitionProcedureCalls extends RegressionSuite {

static final Class<?>[] PROCEDURES = {IntPartitionCallTestProc.class,
StringPartitionCallTestProc.class};

static Map<String, Integer> EXPECT_PARTIITON_COUNTS = new HashMap<String, Integer>();
static {
EXPECT_PARTIITON_COUNTS.put("0", new Integer(129));
EXPECT_PARTIITON_COUNTS.put("1", new Integer(119));
EXPECT_PARTIITON_COUNTS.put("2", new Integer(127));
EXPECT_PARTIITON_COUNTS.put("7", new Integer(141));
EXPECT_PARTIITON_COUNTS.put("11", new Integer(118));
EXPECT_PARTIITON_COUNTS.put("15", new Integer(137));
EXPECT_PARTIITON_COUNTS.put("19", new Integer(110));
EXPECT_PARTIITON_COUNTS.put("23", new Integer(119));
}

@Override
public void setUp() throws Exception
{
VoltFile.recursivelyDelete(new File("/tmp/" + System.getProperty("user.name")));
File f = new File("/tmp/" + System.getProperty("user.name"));
f.mkdirs();
super.setUp();
}

public TestAllPartitionProcedureCalls(String name) {
super(name);
}

public void testCallAllPartitionProcedures() throws Exception{

Client client = getClient();

try {
load(client, "TABLE_INT_PARTITION");
load(client, "TABLE_STRING_PARTITION");

ClientResponseWithPartitionKey[] responses = client.callAllPartitionProcedure("IntPartitionCallTestProc");
validateResults(responses);

client.callAllPartitionProcedure(new CallBack(), "IntPartitionCallTestProc");

responses = client.callAllPartitionProcedure("StringPartitionCallTestProc");
validateResults(responses);

client.callAllPartitionProcedure(new CallBack(), "StringPartitionCallTestProc");

} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
if (client != null) {
try {
client.close();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

private void validateResults(ClientResponseWithPartitionKey[] responses)
{
for (ClientResponseWithPartitionKey resp: responses) {
VoltTable results = resp.m_response.getResults()[0];
int expected = EXPECT_PARTIITON_COUNTS.get(resp.m_partitionKey.toString());
assert(expected == results.fetchRow(0).getLong(0));
}
}

private void load(Client client, String tableName) throws NoConnectionsException, IOException, ProcCallException {
for(int i = 0; i < 1000; i++) {
StringBuilder builder = new StringBuilder();
builder.append("insert into " + tableName + " values (" + i);
builder.append(", 'foo" + i + "', " + i + ", " + i + ")");
client.callProcedure("@AdHoc", builder.toString());
}

String sql = "SELECT count(*) from " + tableName;;
VoltTable vt = client.callProcedure("@AdHoc", sql).getResults()[0];
assert(1000 == vt.fetchRow(0).getLong(0));
}

static public junit.framework.Test suite() throws Exception {
return buildEnv();
}

static public MultiConfigSuiteBuilder buildEnv() throws Exception {

final MultiConfigSuiteBuilder builder = new MultiConfigSuiteBuilder(TestAllPartitionProcedureCalls.class);
Map<String, String> additionalEnv = new HashMap<String, String>();
VoltProjectBuilder project = new VoltProjectBuilder();
project.setUseDDLSchema(true);
project.addSchema(TestAllPartitionProcedureCalls.class.getResource("allpartitioncall.sql"));
project.addProcedures(PROCEDURES);
LocalCluster config = new LocalCluster("client-all-partitions.jar", 4, 2, 0, BackendTarget.NATIVE_EE_JNI,
LocalCluster.FailureState.ALL_RUNNING, true, false, additionalEnv);
config.setHasLocalServer(false);
boolean compile = config.compile(project);
assertTrue(compile);
builder.addServerConfig(config);
return builder;
}

public static class CallBack implements AllPartitionProcedureCallback {

@Override
public void clientCallback(ClientResponseWithPartitionKey[] clientResponse) throws Exception {

if (clientResponse != null) {
for (ClientResponseWithPartitionKey resp: clientResponse) {
VoltTable results = resp.m_response.getResults()[0];
int expected = EXPECT_PARTIITON_COUNTS.get(resp.m_partitionKey);
assert(expected == results.fetchRow(0).getLong(0));
}
}
}
}
}