/
FragmentTask.java
136 lines (123 loc) · 5.17 KB
/
FragmentTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/* This file is part of VoltDB.
* Copyright (C) 2008-2012 VoltDB Inc.
*
* VoltDB is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* VoltDB is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
*/
package org.voltdb.iv2;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.voltcore.logging.Level;
import org.voltcore.messaging.Mailbox;
import org.voltdb.ParameterSet;
import org.voltdb.SiteProcedureConnection;
import org.voltdb.VoltDB;
import org.voltdb.VoltTable;
import org.voltdb.exceptions.EEException;
import org.voltdb.exceptions.SQLException;
import org.voltdb.messaging.FastDeserializer;
import org.voltdb.messaging.FragmentResponseMessage;
import org.voltdb.messaging.FragmentTaskMessage;
import org.voltdb.utils.LogKeys;
public class FragmentTask extends TransactionTask
{
final Mailbox m_initiator;
final FragmentTaskMessage m_task;
FragmentTask(Mailbox mailbox,
ParticipantTransactionState txn,
FragmentTaskMessage message)
{
super(txn);
m_initiator = mailbox;
m_task = message;
}
@Override
public void run(SiteProcedureConnection siteConnection)
{
// Set the begin undo token if we haven't already
// In the future we could record a token per batch
// and do partial rollback
if (!m_txn.isReadOnly()) {
if (m_txn.getBeginUndoToken() == Site.kInvalidUndoToken) {
m_txn.setBeginUndoToken(siteConnection.getLatestUndoToken());
}
}
final FragmentResponseMessage response = processFragmentTask(siteConnection);
// completion?
m_initiator.deliver(response);
}
// Cut and pasted from ExecutionSite processFragmentTask(), then
// modifed to work in the new world
public FragmentResponseMessage processFragmentTask(SiteProcedureConnection siteConnection)
{
// IZZY: actually need the "executor" HSId these days?
final FragmentResponseMessage currentFragResponse =
new FragmentResponseMessage(m_task, m_initiator.getHSId());
currentFragResponse.setStatus(FragmentResponseMessage.SUCCESS, null);
if (m_task.isSysProcTask())
{
throw new RuntimeException("IV2 unable to handle system procedures yet");
}
for (int frag = 0; frag < m_task.getFragmentCount(); frag++)
{
final long fragmentId = m_task.getFragmentId(frag);
final int outputDepId = m_task.getOutputDepId(frag);
ParameterSet params = m_task.getParameterSetForFragment(frag);
final int inputDepId = m_task.getOnlyInputDepId(frag);
/*
* Currently the error path when executing plan fragments
* does not adequately distinguish between fatal errors and
* abort type errors that should result in a roll back.
* Assume that it is ninja: succeeds or doesn't return.
* No roll back support.
*/
try {
final VoltTable dependency =
siteConnection.executePlanFragment(fragmentId,
inputDepId,
params,
m_txn.txnId,
m_txn.isReadOnly());
if (hostLog.isTraceEnabled()) {
hostLog.l7dlog(Level.TRACE,
LogKeys.org_voltdb_ExecutionSite_SendingDependency.name(),
new Object[] { outputDepId }, null);
}
currentFragResponse.addDependency(outputDepId, dependency);
} catch (final EEException e) {
hostLog.l7dlog( Level.TRACE, LogKeys.host_ExecutionSite_ExceptionExecutingPF.name(), new Object[] { fragmentId }, e);
currentFragResponse.setStatus(FragmentResponseMessage.UNEXPECTED_ERROR, e);
break;
} catch (final SQLException e) {
hostLog.l7dlog( Level.TRACE, LogKeys.host_ExecutionSite_ExceptionExecutingPF.name(), new Object[] { fragmentId }, e);
currentFragResponse.setStatus(FragmentResponseMessage.UNEXPECTED_ERROR, e);
break;
}
}
return currentFragResponse;
}
@Override
public long getMpTxnId()
{
return m_task.getTxnId();
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append("FragmentTask:");
sb.append("\n\tMP TXN ID: ").append(getMpTxnId());
sb.append("\n\tLOCAL TXN ID: ").append(getLocalTxnId());
return sb.toString();
}
}