Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: VoltDB/voltdb
base: 6b8920ac41
...
head fork: VoltDB/voltdb
compare: 3aced29211
  • 12 commits
  • 14 files changed
  • 0 commit comments
  • 1 contributor
Commits on May 10, 2012
@rbetts rbetts ENG-2943: register sysproc fragments in Site. 1af3e86
@rbetts rbetts Introduce SystemProcedureExecutionContext for Site
Not fully functional (the context desires a sitetracker and
an ExecutionSite reference, not availble at the iv2 Site).
Enough to get started.
1daa29b
@rbetts rbetts Pass the sysproc context to ProcedureRunnerFactory
Connect the ProcedureRunnerFactory kneebone to the
SystemProcedureExecutionContext shinbone.
6a7625f
@rbetts rbetts Start creating a sysproc fragment task.
System procedure fragments call the associated sysproc procedure
directly and don't call SiteProcedureConnection.executePlanFragment().
Specialize them as a separate task.
fce60ae
@rbetts rbetts Working Adhoc test suite. 7afa961
Commits on May 11, 2012
@rbetts rbetts Sysproc fragments need valid undo tokens. 3ecd34a
@rbetts rbetts DRY: add FragmentTaskMessage get params utility e3018e0
@rbetts rbetts DRY: use new paramSet getter and remove copy/paste 38b524f
@rbetts rbetts Don't expose loadedprocset as broadly.
Hide LoadedProcedureSet from MpTransactionState and
SystempProcedureFragmentTask.
ebc19e2
@rbetts rbetts Add TestAdHocQueries to the junit_iv2 list. d82b63b
@rbetts rbetts Merge remote branch 'origin/iv2-master' into iv2-sysprocs 6f62787
@rbetts rbetts Remove debug logging. 3aced29
View
1  build.xml
@@ -1196,7 +1196,6 @@ TEST CASES
<include name='org/voltdb/**/Test*.class'/>
<include name='org/voltdb/**/Iv2Test*.class'/>
<exclude name="**/*$*.class"/>
- <exclude name="**/TestAdHocQueries.class" />
<exclude name="**/TestHSQLBackend.class" />
<exclude name="**/TestJSONInterface.class" />
<exclude name="**/TestRejoinEndToEnd.class" />
View
10 src/frontend/org/voltdb/ExecutionSite.java
@@ -2259,6 +2259,16 @@ public void truncateUndoLog(boolean rollback, long token, long txnId) {
throw new RuntimeException("Unsupported IV2-only API.");
}
+ // do-nothing implementation of IV2 sysproc fragment API.
+ @Override
+ public DependencyPair executePlanFragment(
+ TransactionState txnState,
+ Map<Integer, List<VoltTable>> dependencies, long fragmentId,
+ ParameterSet params) {
+ throw new RuntimeException("Unsupported IV2-only API.");
+ }
+
+
@Override
public VoltTable executePlanFragment(long planFragmentId, int inputDepId,
ParameterSet parameterSet, long txnId,
View
2  src/frontend/org/voltdb/ProcedureRunner.java
@@ -430,7 +430,7 @@ public void voltLoadTable(String clusterName, String databaseName,
}
}
- DependencyPair executePlanFragment(
+ public DependencyPair executePlanFragment(
TransactionState txnState,
Map<Integer, List<VoltTable>> dependencies, long fragmentId,
ParameterSet params) {
View
9 src/frontend/org/voltdb/SiteProcedureConnection.java
@@ -105,4 +105,13 @@ public VoltTable executePlanFragment(
* IV2: send dependencies to the EE
*/
public void stashWorkUnitDependencies(final Map<Integer, List<VoltTable>> dependencies);
+
+ /**
+ * IV2: run a system procedure plan fragment
+ */
+ public DependencyPair executePlanFragment(
+ TransactionState txnState,
+ Map<Integer, List<VoltTable>> dependencies, long fragmentId,
+ ParameterSet params);
+
}
View
1  src/frontend/org/voltdb/VoltSystemProcedure.java
@@ -160,7 +160,6 @@ abstract public DependencyPair executePlanFragment(
// the stack frame drop terminates the recursion and resumes
// execution of the current stored procedure.
assert (txnState != null);
- assert (txnState instanceof MultiPartitionParticipantTxnState);
txnState.setupProcedureResume(false, new int[] { aggregatorOutputDependencyId });
// execute the tasks that just got queued.
View
23 src/frontend/org/voltdb/iv2/FragmentTask.java
@@ -82,28 +82,7 @@ public FragmentResponseMessage processFragmentTask(SiteProcedureConnection siteC
final long fragmentId = m_task.getFragmentId(frag);
final int outputDepId = m_task.getOutputDepId(frag);
- // this is a horrible performance hack, and can be removed with small changes
- // to the ee interface layer.. (rtb: not sure what 'this' encompasses...)
- // (izzy: still not sure what 'this' encompasses...)
- ParameterSet params = null;
- final ByteBuffer paramData = m_task.getParameterDataForFragment(frag);
- if (paramData != null) {
- final FastDeserializer fds = new FastDeserializer(paramData);
- try {
- params = fds.readObject(ParameterSet.class);
- }
- catch (final IOException e) {
- // IZZY: why not send a non-success response back to the
- // MPI here?
- hostLog.l7dlog(Level.FATAL,
- LogKeys.host_ExecutionSite_FailedDeserializingParamsForFragmentTask.name(), e);
- VoltDB.crashLocalVoltDB(e.getMessage(), true, e);
- }
- }
- else {
- params = new ParameterSet();
- }
-
+ ParameterSet params = m_task.getParameterSetForFragment(frag);
final int inputDepId = m_task.getOnlyInputDepId(frag);
/*
View
1  src/frontend/org/voltdb/iv2/InitiatorMailbox.java
@@ -31,7 +31,6 @@
import org.voltcore.zk.BabySitter.Callback;
import org.voltcore.zk.LeaderElector;
import org.voltcore.zk.LeaderNoticeHandler;
-import org.voltdb.LoadedProcedureSet;
import org.voltdb.VoltDB;
import org.voltdb.VoltZK;
import org.voltdb.messaging.CompleteTransactionMessage;
View
44 src/frontend/org/voltdb/iv2/MpTransactionState.java
@@ -17,8 +17,6 @@
package org.voltdb.iv2;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,28 +26,26 @@
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
-import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.Mailbox;
import org.voltcore.messaging.MessagingException;
import org.voltcore.messaging.TransactionInfoBaseMessage;
+import org.voltdb.DependencyPair;
import org.voltdb.ParameterSet;
import org.voltdb.SiteProcedureConnection;
import org.voltdb.StoredProcedureInvocation;
-import org.voltdb.VoltDB;
import org.voltdb.VoltTable;
import org.voltdb.dtxn.TransactionState;
-import org.voltdb.messaging.FastDeserializer;
import org.voltdb.messaging.FragmentResponseMessage;
import org.voltdb.messaging.FragmentTaskMessage;
import org.voltdb.messaging.Iv2InitiateTaskMessage;
-import org.voltdb.utils.LogKeys;
public class MpTransactionState extends TransactionState
{
private static final VoltLogger hostLog = new VoltLogger("HOST");
final Iv2InitiateTaskMessage m_task;
+
LinkedBlockingDeque<FragmentResponseMessage> m_newDeps =
new LinkedBlockingDeque<FragmentResponseMessage>();
Map<Integer, Set<Long>> m_remoteDeps;
@@ -246,6 +242,7 @@ private void trackDependency(long hsid, int depId, VoltTable table)
tables.add(table);
}
else {
+ // TODO: need an error path here.
System.out.println("No remote dep for local site: " + hsid);
}
}
@@ -291,29 +288,22 @@ private boolean checkDoneReceivingFragResponses()
final long fragmentId = ftask.getFragmentId(frag);
final int outputDepId = ftask.getOutputDepId(frag);
- // this is a horrible performance hack, and can be removed with small changes
- // to the ee interface layer.. (rtb: not sure what 'this' encompasses...)
- ParameterSet params = null;
- final ByteBuffer paramData = ftask.getParameterDataForFragment(frag);
- if (paramData != null) {
- final FastDeserializer fds = new FastDeserializer(paramData);
- try {
- params = fds.readObject(ParameterSet.class);
- }
- catch (final IOException e) {
- hostLog.l7dlog(Level.FATAL,
- LogKeys.host_ExecutionSite_FailedDeserializingParamsForFragmentTask.name(), e);
- VoltDB.crashLocalVoltDB(e.getMessage(), true, e);
- }
- }
- else {
- params = new ParameterSet();
- }
+ ParameterSet params = ftask.getParameterSetForFragment(frag);
if (ftask.isSysProcTask()) {
- throw new RuntimeException("IV2: Sysprocs not yet supported");
-// return processSysprocFragmentTask(txnState, dependencies, fragmentId,
-// currentFragResponse, params);
+ final DependencyPair dep
+ = siteConnection.executePlanFragment(this,
+ m_remoteDepTables,
+ fragmentId,
+ params);
+
+ List<VoltTable> tables = depResults.get(outputDepId);
+ if (tables == null) {
+ tables = new ArrayList<VoltTable>();
+ depResults.put(outputDepId, tables);
+ }
+ tables.add(dep.dependency);
+ return depResults;
}
else {
final int inputDepId = ftask.getOnlyInputDepId(frag);
View
13 src/frontend/org/voltdb/iv2/Scheduler.java
@@ -98,9 +98,16 @@ public void handleFragmentTaskMessage(FragmentTaskMessage message)
txn = new ParticipantTransactionState(localTxnId, message);
m_outstandingTxns.put(message.getTxnId(), txn);
}
- final FragmentTask task =
- new FragmentTask(m_mailbox, (ParticipantTransactionState)txn, message);
- m_pendingTasks.offer(task);
+ if (message.isSysProcTask()) {
+ final SysprocFragmentTask task =
+ new SysprocFragmentTask(m_mailbox, (ParticipantTransactionState)txn, message);
+ m_tasks.offer(task);
+ }
+ else {
+ final FragmentTask task =
+ new FragmentTask(m_mailbox, (ParticipantTransactionState)txn, message);
+ m_tasks.offer(task);
+ }
}
public void handleFragmentResponseMessage(FragmentResponseMessage message)
View
106 src/frontend/org/voltdb/iv2/Site.java
@@ -17,31 +17,40 @@
package org.voltdb.iv2;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.CoreUtils;
+
import org.voltdb.BackendTarget;
+import org.voltdb.catalog.Cluster;
+import org.voltdb.catalog.Database;
import org.voltdb.CatalogContext;
-import org.voltdb.HsqlBackend;
+import org.voltdb.DependencyPair;
import org.voltdb.iv2.SiteTasker;
-import org.voltdb.iv2.SiteTaskerQueue;
+import org.voltdb.LoadedProcedureSet;
import org.voltdb.ParameterSet;
import org.voltdb.SiteProcedureConnection;
import org.voltdb.VoltDB;
-import org.voltdb.VoltProcedure.VoltAbortException;
import org.voltdb.dtxn.SiteTracker;
import org.voltdb.dtxn.TransactionState;
import org.voltdb.exceptions.EEException;
+import org.voltdb.ExecutionSite;
+import org.voltdb.HsqlBackend;
+import org.voltdb.iv2.SiteTaskerQueue;
import org.voltdb.jni.ExecutionEngine;
import org.voltdb.jni.ExecutionEngineIPC;
import org.voltdb.jni.ExecutionEngineJNI;
import org.voltdb.jni.MockExecutionEngine;
+import org.voltdb.ProcedureRunner;
+import org.voltdb.SystemProcedureExecutionContext;
import org.voltdb.utils.Encoder;
import org.voltdb.utils.LogKeys;
+import org.voltdb.VoltProcedure.VoltAbortException;
import org.voltdb.VoltTable;
public class Site implements Runnable, SiteProcedureConnection
@@ -74,6 +83,9 @@
// Current catalog
CatalogContext m_context;
+ // Currently available procedure
+ LoadedProcedureSet m_loadedProcedures;
+
// Current topology
int m_partitionId;
@@ -116,6 +128,75 @@ SiteProcedureConnection getSiteProcedureConnection()
return this;
}
+
+ /**
+ * SystemProcedures are "friends" with ExecutionSites and granted
+ * access to internal state via m_systemProcedureContext.
+ */
+ SystemProcedureExecutionContext m_sysprocContext = new SystemProcedureExecutionContext() {
+ @Override
+ public Database getDatabase() {
+ return m_context.database;
+ }
+
+ @Override
+ public Cluster getCluster() {
+ return m_context.cluster;
+ }
+
+ @Override
+ public ExecutionEngine getExecutionEngine() {
+ return m_ee;
+ }
+
+ @Override
+ public long getLastCommittedTxnId() {
+ return m_lastCommittedTxnId;
+ }
+
+ @Override
+ public long getCurrentTxnId() {
+ throw new RuntimeException("Not implemented in iv2");
+ }
+
+ @Override
+ public long getNextUndo() {
+ return getNextUndoToken();
+ }
+
+ @Override
+ public ExecutionSite getExecutionSite() {
+ throw new RuntimeException("Not implemented in iv2");
+ }
+
+ @Override
+ public HashMap<String, ProcedureRunner> getProcedures() {
+ throw new RuntimeException("Not implemented in iv2");
+ // return m_loadedProcedures.procs;
+ }
+
+ @Override
+ public long getSiteId() {
+ return m_siteId;
+ }
+
+ @Override
+ public int getHostId() {
+ return SiteTracker.getHostForSite(m_siteId);
+ }
+
+ @Override
+ public int getPartitionId() {
+ return m_partitionId;
+ }
+
+ @Override
+ public SiteTracker getSiteTracker() {
+ throw new RuntimeException("Not implemented in iv2");
+ // return m_tracker;
+ }
+ };
+
/** Create a new execution site and the corresponding EE */
public Site(
SiteTaskerQueue scheduler,
@@ -138,6 +219,12 @@ public Site(
m_startupConfig = new StartupConfig(serializedCatalog, txnId);
}
+ /** Update the loaded procedures. */
+ void setLoadedProcedures(LoadedProcedureSet loadedProcedure)
+ {
+ m_loadedProcedures = loadedProcedure;
+ }
+
/** Thread specific initialization */
void initialize(String serializedCatalog, long txnId)
{
@@ -378,4 +465,15 @@ public void stashWorkUnitDependencies(Map<Integer, List<VoltTable>> dependencies
{
m_ee.stashWorkUnitDependencies(dependencies);
}
+
+ @Override
+ public DependencyPair executePlanFragment(
+ TransactionState txnState,
+ Map<Integer, List<VoltTable>> dependencies, long fragmentId,
+ ParameterSet params)
+ {
+ ProcedureRunner runner = m_loadedProcedures.getSysproc(fragmentId);
+ return runner.executePlanFragment(txnState, dependencies, fragmentId, params);
+ }
+
}
View
5 src/frontend/org/voltdb/iv2/SpInitiator.java
@@ -60,7 +60,9 @@ public void configure(BackendTarget backend, String serializedCatalog,
m_partitionId,
siteTracker.m_numberOfPartitions);
ProcedureRunnerFactory prf = new ProcedureRunnerFactory();
- prf.configure(m_executionSite, null /* context */, null /* hsql */);
+ prf.configure(m_executionSite,
+ m_executionSite.m_sysprocContext,
+ null /* hsql */);
m_procSet = new LoadedProcedureSet(m_executionSite,
prf,
m_initiatorMailbox.getHSId(),
@@ -68,6 +70,7 @@ public void configure(BackendTarget backend, String serializedCatalog,
siteTracker.m_numberOfPartitions);
m_procSet.loadProcedures(catalogContext, backend);
m_scheduler.setProcedureSet(m_procSet);
+ m_executionSite.setLoadedProcedures(m_procSet);
m_siteThread = new Thread(m_executionSite);
View
107 src/frontend/org/voltdb/iv2/SysprocFragmentTask.java
@@ -0,0 +1,107 @@
+/* This file is part of VoltDB.
+ * Copyright (C) 2008-2012 VoltDB Inc.
+ *
+ * VoltDB is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * VoltDB is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package org.voltdb.iv2;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.voltcore.logging.Level;
+import org.voltcore.messaging.Mailbox;
+
+import org.voltdb.DependencyPair;
+import org.voltdb.exceptions.EEException;
+import org.voltdb.exceptions.SQLException;
+import org.voltdb.messaging.FragmentResponseMessage;
+import org.voltdb.messaging.FragmentTaskMessage;
+import org.voltdb.ParameterSet;
+import org.voltdb.SiteProcedureConnection;
+import org.voltdb.utils.LogKeys;
+import org.voltdb.VoltTable;
+
+public class SysprocFragmentTask extends TransactionTask
+{
+ final Mailbox m_initiator;
+ final FragmentTaskMessage m_task;
+
+ SysprocFragmentTask(Mailbox mailbox,
+ ParticipantTransactionState txn,
+ FragmentTaskMessage message)
+ {
+ super(txn);
+ m_initiator = mailbox;
+ m_task = message;
+ assert(m_task.isSysProcTask());
+ }
+
+ @Override
+ public void run(SiteProcedureConnection siteConnection)
+ {
+ if (!m_txn.isReadOnly()) {
+ if (m_txn.getBeginUndoToken() == Site.kInvalidUndoToken) {
+ m_txn.setBeginUndoToken(siteConnection.getLatestUndoToken());
+ }
+ }
+
+ final FragmentResponseMessage response = processFragmentTask(siteConnection);
+ m_initiator.deliver(response);
+ }
+
+ // Extracted the sysproc portion of ExecutionSite processFragmentTask(), then
+ // modifed to work in the new world
+ public FragmentResponseMessage processFragmentTask(SiteProcedureConnection siteConnection)
+ {
+ final FragmentResponseMessage currentFragResponse =
+ new FragmentResponseMessage(m_task, m_initiator.getHSId());
+ currentFragResponse.setStatus(FragmentResponseMessage.SUCCESS, null);
+
+ for (int frag = 0; frag < m_task.getFragmentCount(); frag++)
+ {
+ final long fragmentId = m_task.getFragmentId(frag);
+ // equivalent to dep.depId:
+ // final int outputDepId = m_task.getOutputDepId(frag);
+
+ ParameterSet params = m_task.getParameterSetForFragment(frag);
+
+ try {
+ // run the overloaded sysproc planfragment. pass an empty dependency
+ // set since remote (non-aggregator) fragments don't receive dependencies.
+ final DependencyPair dep
+ = siteConnection.executePlanFragment(m_txn,
+ new HashMap<Integer, List<VoltTable>>(),
+ fragmentId,
+ params);
+ currentFragResponse.addDependency(dep.depId, dep.dependency);
+ } catch (final EEException e) {
+ hostLog.l7dlog( Level.TRACE, LogKeys.host_ExecutionSite_ExceptionExecutingPF.name(), new Object[] { fragmentId }, e);
+ currentFragResponse.setStatus(FragmentResponseMessage.UNEXPECTED_ERROR, e);
+ break;
+ } catch (final SQLException e) {
+ hostLog.l7dlog( Level.TRACE, LogKeys.host_ExecutionSite_ExceptionExecutingPF.name(), new Object[] { fragmentId }, e);
+ currentFragResponse.setStatus(FragmentResponseMessage.UNEXPECTED_ERROR, e);
+ break;
+ }
+ }
+ return currentFragResponse;
+ }
+
+ @Override
+ public long getMpTxnId()
+ {
+ return m_task.getTxnId();
+ }
+}
View
2  src/frontend/org/voltdb/jni/ExecutionEngine.java
@@ -160,10 +160,12 @@ public VoltTable nextDependency(final int dependencyId) {
// will not retain any references to VoltTables (which is the goal).
final ArrayDeque<VoltTable> vtstack = m_depsById.get(dependencyId);
if (vtstack != null && vtstack.size() > 0) {
+ System.out.println("DID FIND IT: " + dependencyId);
// java doc. says this amortized constant time.
return vtstack.pop();
}
else if (vtstack == null) {
+ System.out.println("CAN'T FIND IT: " + dependencyId);
assert(false) : "receive without associated tracked dependency.";
return null;
}
View
26 src/frontend/org/voltdb/messaging/FragmentTaskMessage.java
@@ -21,10 +21,14 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import org.voltcore.logging.Level;
+import org.voltcore.logging.VoltLogger;
import org.voltcore.messaging.Subject;
import org.voltcore.messaging.TransactionInfoBaseMessage;
import org.voltcore.utils.CoreUtils;
import org.voltdb.ParameterSet;
+import org.voltdb.utils.LogKeys;
+import org.voltdb.VoltDB;
/**
* Message from a stored procedure coordinator to an execution site
@@ -34,6 +38,8 @@
*/
public class FragmentTaskMessage extends TransactionInfoBaseMessage
{
+ protected static final VoltLogger hostLog = new VoltLogger("HOST");
+
public static final byte USER_PROC = 0;
public static final byte SYS_PROC_PER_PARTITION = 1;
public static final byte SYS_PROC_PER_SITE = 2;
@@ -175,6 +181,26 @@ public ByteBuffer getParameterDataForFragment(int index) {
return m_parameterSets[index].asReadOnlyBuffer();
}
+ public ParameterSet getParameterSetForFragment(int index) {
+ ParameterSet params = null;
+ final ByteBuffer paramData = m_parameterSets[index].asReadOnlyBuffer();
+ if (paramData != null) {
+ final FastDeserializer fds = new FastDeserializer(paramData);
+ try {
+ params = fds.readObject(ParameterSet.class);
+ }
+ catch (final IOException e) {
+ hostLog.l7dlog(Level.FATAL,
+ LogKeys.host_ExecutionSite_FailedDeserializingParamsForFragmentTask.name(), e);
+ VoltDB.crashLocalVoltDB(e.getMessage(), true, e);
+ }
+ }
+ else {
+ params = new ParameterSet();
+ }
+ return params;
+ }
+
@Override
public int getSerializedSize()
{

No commit comments for this range

Something went wrong with that request. Please try again.