/
PcellsCommand.java
144 lines (129 loc) · 4.66 KB
/
PcellsCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package org.dcache.services.ssh2;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.Environment;
import org.apache.sshd.server.ExitCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import diskCacheV111.admin.UserAdminShell;
import diskCacheV111.util.TimeoutCacheException;
import dmg.cells.applets.login.DomainObjectFrame;
import dmg.cells.nucleus.CellEndpoint;
import dmg.util.CommandException;
public class PcellsCommand implements Command, Runnable
{
private static final Logger LOGGER =
LoggerFactory.getLogger(PcellsCommand.class);
private final CellEndpoint _endpoint;
private UserAdminShell _userAdminShell;
private InputStream _in;
private ExitCallback _exitCallback;
private OutputStream _out;
private Thread _adminShellThread;
private ExecutorService _executor = Executors.newCachedThreadPool();
private volatile boolean _done = false;
public PcellsCommand(CellEndpoint endpoint)
{
_endpoint = endpoint;
}
@Override
public void setErrorStream(OutputStream err)
{
// we don't use the error stream
}
@Override
public void setExitCallback(ExitCallback callback)
{
_exitCallback = callback;
}
@Override
public void setInputStream(InputStream in)
{
_in = in;
}
@Override
public void setOutputStream(OutputStream out)
{
_out = out;
}
@Override
public void start(Environment env) throws IOException
{
String user = env.getEnv().get(Environment.ENV_USER);
_userAdminShell = new UserAdminShell(user, _endpoint, _endpoint.getArgs());
_adminShellThread = new Thread(this);
_adminShellThread.start();
}
@Override
public void destroy()
{
if (_adminShellThread != null) {
_adminShellThread.interrupt();
}
_executor.shutdownNow();
}
@Override
public void run() {
try {
final ObjectInputStream in = new ObjectInputStream(_in);
final ObjectOutputStream out = new ObjectOutputStream(_out);
out.flush();
Object obj;
while (!_done && (obj = in.readObject()) != null) {
if (!(obj instanceof DomainObjectFrame)) {
LOGGER.error("Received unsupported request type: {}", obj.getClass());
continue;
}
final DomainObjectFrame frame = (DomainObjectFrame) obj;
LOGGER.trace("Frame id {} received", frame.getId());
_executor.execute(new Runnable() {
@Override
public void run()
{
Object result;
try {
if (frame.getDestination() == null) {
result = _userAdminShell.executeCommand(frame.getPayload().toString());
} else {
result = _userAdminShell.executeCommand(frame.getDestination(), frame.getPayload());
}
} catch (CommandException e) {
result = e;
_done = true;
try {
in.close();
} catch (IOException ignored) {
}
} catch (TimeoutCacheException e) {
result = null;
} catch (Exception ae) {
result = ae;
}
frame.setPayload(result);
try {
synchronized (out) {
out.writeObject(frame);
out.flush();
out.reset(); // prevents memory leaks...
}
} catch (IOException e) {
LOGGER.error("Problem sending result : {}", e);
}
}
});
}
} catch (IOException e) {
LOGGER.warn("I/O failure in pcells connection: {}", e.toString());
} catch (ClassNotFoundException e) {
LOGGER.error("Received unsupported request type: {}", e.getMessage());
} finally {
_exitCallback.onExit(0);
}
}
}