/
LineBasedDoor.java
283 lines (250 loc) · 9.1 KB
/
LineBasedDoor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package diskCacheV111.doors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import dmg.cells.nucleus.CDC;
import dmg.cells.nucleus.CellCommandListener;
import dmg.cells.nucleus.CellInfoProvider;
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.CellMessageSender;
import dmg.util.Args;
import dmg.util.CommandExitException;
import dmg.util.StreamEngine;
import org.dcache.cells.AbstractCell;
import org.dcache.util.SequentialExecutor;
import org.dcache.util.Transfer;
/**
* Door cell for line based protocols.
*
* To be used with LoginManager. The cell reads lines from a StreamEngine
* and passes these to an interpreter for processing.
*
* The cell is able to detect end-of-stream even while the interpreter
* is processing a line.
*/
public class LineBasedDoor
extends AbstractCell implements Runnable
{
private static final Logger LOGGER = LoggerFactory.getLogger(LineBasedDoor.class);
private final Class<? extends LineBasedInterpreter> interpreterClass;
/**
* Door instances are created by the LoginManager. This is the
* stream engine passed to us from the LoginManager upon
* instantiation.
*/
private final StreamEngine engine;
private LineBasedInterpreter interpreter;
private final CountDownLatch shutdownGate = new CountDownLatch(1);
/**
* Executor for processing commands.
*/
private final Executor executor;
private Thread workerThread;
public LineBasedDoor(String cellName, Args args, Class<? extends LineBasedInterpreter> interpreterClass,
StreamEngine engine, ExecutorService executor)
{
super(cellName, args);
this.interpreterClass = interpreterClass;
this.engine = engine;
this.executor = executor;
try {
doInit();
workerThread.start();
} catch (InterruptedException e) {
shutdownGate.countDown();
} catch (ExecutionException e) {
LOGGER.error(e.getCause().toString());
shutdownGate.countDown();
}
}
@Override
protected void init()
throws Exception
{
super.init();
Transfer.initSession();
LOGGER.debug("Client host: {}",
engine.getInetAddress().getHostAddress());
workerThread = new Thread(this);
interpreter = interpreterClass.newInstance();
parseOptions(interpreter);
interpreter.setWriter(engine.getWriter());
interpreter.setRemoteAddress((InetSocketAddress) engine.getSocket().getRemoteSocketAddress());
interpreter.setLocalAddress((InetSocketAddress) engine.getSocket().getLocalSocketAddress());
if (interpreter instanceof CellMessageSender) {
((CellMessageSender) interpreter).setCellEndpoint(this);
}
interpreter.init();
if (interpreter instanceof CellCommandListener) {
addCommandListener(interpreter);
}
if (interpreter instanceof CellMessageReceiver) {
addMessageListener((CellMessageReceiver) interpreter);
}
}
private synchronized void shutdownInputStream()
{
try {
Socket socket = engine.getSocket();
if (!socket.isClosed() && !socket.isInputShutdown()) {
socket.shutdownInput();
}
} catch (IOException e) {
LOGGER.info("Failed to shut down input stream of the " +
"control channel: {}", e.getMessage());
}
}
/**
* Main loop for command processing.
*
* Commands are read from the socket and submitted to the command
* queue for execution. Upon termination, most of the shutdown
* logic is in this method, including:
*
* - Emergency shutdown of performance marker engine
* - Shut down of passive mode server socket
* - Abort and cleanup after failed transfers
* - Cell shutdown initiation
*
* Notice that socket and thus input and output streams are not
* closed here. See cleanUp() for details on this.
*/
@Override
public void run()
{
try {
SequentialExecutor executor = new SequentialExecutor(this.executor);
try {
/* Notice that we do not close the input stream, as
* doing so would close the socket as well. We don't
* want to do that until cleanUp() is called.
*
* REVISIT: I hope that the StreamEngine does not
* maintain any resources that do not get
* automatically freed when the socket is closed.
*/
BufferedReader in =
new BufferedReader(new InputStreamReader(engine.getInputStream(), "UTF-8"));
String s = in.readLine();
while (s != null) {
executor.execute(new Command(s));
s = in.readLine();
}
} catch (IOException e) {
LOGGER.error("Got error reading data: {}", e.getMessage());
} finally {
try {
executor.shutdownNow();
interpreter.shutdown();
executor.awaitTermination();
} catch (InterruptedException e) {
LOGGER.error("Failed to shut down command processing: {}",
e.getMessage());
}
LOGGER.debug("End of stream encountered");
}
} finally {
/* cleanUp() waits for us to open the gate.
*/
shutdownGate.countDown();
/* Killing the cell will cause cleanUp() to be
* called (although from a different thread).
*/
kill();
}
}
/**
* Called by the cell infrastructure when the cell has been killed.
*
* The socket will be closed by this method. It is quite important
* that this does not happen earlier, as several threads use the
* output stream. This is the only place where we can be 100%
* certain, that all the other threads are done with their job.
*
* The method blocks until the worker thread has terminated.
*/
@Override
public void cleanUp()
{
/* Closing the input stream will cause the FTP command
* processing thread to shut down. In case the shutdown was
* initiated by the FTP client, this will already have
* happened at this point. However if the cell is shut down
* explicitly, then we have to shutdown the input stream here.
*/
shutdownInputStream();
/* The FTP command processing thread will open the gate after
* shutdown.
*/
try {
shutdownGate.await();
} catch (InterruptedException e) {
/* This should really not happen as nobody is supposed to
* interrupt the cell thread, but if it does happen then
* we better log it.
*/
LOGGER.error("Got interrupted exception shutting down input stream");
}
try {
/* Closing the socket will also close the input and output
* streams of the socket. This in turn will cause the
* command poller thread to shut down.
*/
engine.getSocket().close();
} catch (IOException e) {
LOGGER.error("Got I/O exception closing socket: {}",
e.getMessage());
}
super.cleanUp();
}
@Override
public void getInfo(PrintWriter pw)
{
pw.println(" User Host : " + engine.getInetAddress().getHostAddress());
if (interpreter instanceof CellInfoProvider) {
((CellInfoProvider) interpreter).getInfo(pw);
}
}
public interface LineBasedInterpreter
{
void setWriter(Writer out);
void execute(String cmd) throws CommandExitException;
void init();
void shutdown();
void setRemoteAddress(InetSocketAddress remoteAddress);
void setLocalAddress(InetSocketAddress localAddress);
}
private class Command implements Runnable
{
private final CDC cdc;
private final String command;
public Command(String command)
{
this.cdc = new CDC();
this.command = command;
}
@Override
public void run()
{
try (CDC ignored = cdc.restore()) {
try {
interpreter.execute(command);
} catch (CommandExitException e) {
shutdownInputStream();
} catch (RuntimeException e) {
LOGGER.error("Bug detected", e);
}
}
}
}
}