/
NamedPipeHelper.java
317 lines (282 loc) · 13.3 KB
/
NamedPipeHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ml.utils;
import org.apache.lucene.util.Constants;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.env.Environment;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
/**
* Opens named pipes that are created elsewhere.
*
* In production, these will have been created in C++ code, as the procedure for creating them is
* platform dependent and uses native OS calls that are not easily available in Java.
*
* Once the named pipes have been created elsewhere Java can open them like normal files, however,
* there are complications:
* - On *nix, when opening a pipe for output Java will create a normal file of the requested name
* if the named pipe doesn't already exist. To avoid this, named pipes are only opened for
* for output once the expected file name exists in the file system.
* - On Windows, the server end of a pipe needs to reset it between client connects. Methods like
* File.isFile() and File.exists() on Windows internally call the Win32 API function CreateFile()
* followed by GetFileInformationByHandle(), and if the CreateFile() succeeds it counts as opening
* the named pipe, requiring it to be reset on the server side before subsequent access. To avoid
* this, the check for whether a given path represents a named pipe is done using simple string
* comparison on Windows.
*/
public class NamedPipeHelper {
/**
* Try this often to open named pipes that we're waiting on another process to create.
*/
private static final long PAUSE_TIME_MS = 20;
/**
* On Windows named pipes are ALWAYS accessed via this path; it is impossible to put them
* anywhere else.
*/
private static final String WIN_PIPE_PREFIX = "\\\\.\\pipe\\";
public NamedPipeHelper() {
// Do nothing - the only reason there's a constructor is to allow mocking
}
/**
* The default path where named pipes will be created. On *nix they can be created elsewhere
* (subject to security manager constraints), but on Windows this is the ONLY place they can
* be created.
* @return The directory prefix as a string.
*/
public String getDefaultPipeDirectoryPrefix(Environment env) {
// The return type is String because we don't want any (too) clever path processing removing
// the seemingly pointless . in the path used on Windows.
if (Constants.WINDOWS) {
return WIN_PIPE_PREFIX;
}
// Use the Java temporary directory. The Elasticsearch bootstrap sets up the security
// manager to allow this to be read from and written to. Also, the code that spawns our
// daemon passes on this location to the C++ code using the $TMPDIR environment variable.
// All these factors need to align for everything to work in production. If any changes
// are made here then CNamedPipeFactory::defaultPath() in the C++ code will probably
// also need to be changed.
return env.tmpFile().toString() + PathUtils.getDefaultFileSystem().getSeparator();
}
/**
* Open a named pipe created elsewhere for input.
*
* @param path
* Path of named pipe to open.
* @param timeout
* How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException
* if the named pipe cannot be opened.
*/
@SuppressForbidden(reason = "Environment doesn't have path for Windows named pipes")
public InputStream openNamedPipeInputStream(String path, Duration timeout) throws IOException {
return openNamedPipeInputStream(PathUtils.get(path), timeout);
}
/**
* Open a named pipe created elsewhere for input.
* @param file The named pipe to open.
* @param timeout How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException if the named pipe cannot be opened.
*/
public InputStream openNamedPipeInputStream(Path file, Duration timeout) throws IOException {
long timeoutMillisRemaining = timeout.toMillis();
// Can't use Files.isRegularFile() on on named pipes on Windows, as it renders them unusable,
// but luckily there's an even simpler check (that's not possible on *nix)
if (Constants.WINDOWS && file.toString().startsWith(WIN_PIPE_PREFIX) == false) {
throw new IOException(file + " is not a named pipe");
}
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
// Try to open the file periodically until the timeout expires, then, if
// it's still not available throw the exception from FileInputStream
while (true) {
// On Windows Files.isRegularFile() will render a genuine named pipe unusable
if (Constants.WINDOWS == false && Files.isRegularFile(file)) {
throw new IOException(file + " is not a named pipe");
}
try {
PrivilegedInputPipeOpener privilegedInputPipeOpener = new PrivilegedInputPipeOpener(file);
return AccessController.doPrivileged(privilegedInputPipeOpener);
} catch (RuntimeException e) {
if (timeoutMillisRemaining <= 0) {
propagatePrivilegedException(e);
}
long thisSleep = Math.min(timeoutMillisRemaining, PAUSE_TIME_MS);
timeoutMillisRemaining -= thisSleep;
try {
Thread.sleep(thisSleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
propagatePrivilegedException(e);
}
}
}
}
/**
* Open a named pipe created elsewhere for output.
*
* @param path
* Path of named pipe to open.
* @param timeout
* How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException
* if the named pipe cannot be opened.
*/
@SuppressForbidden(reason = "Environment doesn't have path for Windows named pipes")
public OutputStream openNamedPipeOutputStream(String path, Duration timeout) throws IOException {
return openNamedPipeOutputStream(PathUtils.get(path), timeout);
}
/**
* Open a named pipe created elsewhere for output.
* @param file The named pipe to open.
* @param timeout How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException if the named pipe cannot be opened.
*/
public OutputStream openNamedPipeOutputStream(Path file, Duration timeout) throws IOException {
if (Constants.WINDOWS) {
return openNamedPipeOutputStreamWindows(file, timeout);
}
return openNamedPipeOutputStreamUnix(file, timeout);
}
/**
* The logic here is very similar to that of opening an input stream, because on Windows
* Java cannot create a regular file when asked to open a named pipe that doesn't exist.
* @param file The named pipe to open.
* @param timeout How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException if the named pipe cannot be opened.
*/
private static OutputStream openNamedPipeOutputStreamWindows(Path file, Duration timeout) throws IOException {
long timeoutMillisRemaining = timeout.toMillis();
// Can't use File.isFile() on Windows, but luckily there's an even simpler check (that's not possible on *nix)
if (file.toString().startsWith(WIN_PIPE_PREFIX) == false) {
throw new IOException(file + " is not a named pipe");
}
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
// Try to open the file periodically until the timeout expires, then, if
// it's still not available throw the exception from FileOutputStream
while (true) {
try {
PrivilegedOutputPipeOpener privilegedOutputPipeOpener = new PrivilegedOutputPipeOpener(file);
return AccessController.doPrivileged(privilegedOutputPipeOpener);
} catch (RuntimeException e) {
if (timeoutMillisRemaining <= 0) {
propagatePrivilegedException(e);
}
long thisSleep = Math.min(timeoutMillisRemaining, PAUSE_TIME_MS);
timeoutMillisRemaining -= thisSleep;
try {
Thread.sleep(thisSleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
propagatePrivilegedException(e);
}
}
}
}
/**
* This has to use different logic to the input pipe case to avoid the danger of creating
* a regular file when the named pipe does not exist when the method is first called.
* @param file The named pipe to open.
* @param timeout How long to wait for the named pipe to exist.
* @return A stream opened to read from the named pipe.
* @throws IOException if the named pipe cannot be opened.
*/
private static OutputStream openNamedPipeOutputStreamUnix(Path file, Duration timeout) throws IOException {
long timeoutMillisRemaining = timeout.toMillis();
// Periodically check whether the file exists until the timeout expires, then, if
// it's still not available throw a FileNotFoundException
while (timeoutMillisRemaining > 0 && Files.exists(file) == false) {
long thisSleep = Math.min(timeoutMillisRemaining, PAUSE_TIME_MS);
timeoutMillisRemaining -= thisSleep;
try {
Thread.sleep(thisSleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
if (Files.isRegularFile(file)) {
throw new IOException(file + " is not a named pipe");
}
if (Files.exists(file) == false) {
throw new FileNotFoundException("Cannot open " + file + " (No such file or directory)");
}
// There's a race condition here in that somebody could delete the named pipe at this point
// causing the line below to create a regular file. Not sure what can be done about this
// without using low level OS calls...
return Files.newOutputStream(file);
}
/**
* To work around the limitation that privileged actions cannot throw checked exceptions the classes
* below wrap IOExceptions in RuntimeExceptions. If such an exception needs to be propagated back
* to a user of this class then it's nice if they get the original IOException rather than having
* it wrapped in a RuntimeException. However, the privileged calls could also possibly throw other
* RuntimeExceptions, so this method accounts for this case too.
*/
private static void propagatePrivilegedException(RuntimeException e) throws IOException {
Throwable ioe = ExceptionsHelper.unwrap(e, IOException.class);
if (ioe != null) {
throw (IOException) ioe;
}
throw e;
}
/**
* Used to work around the limitation that privileged actions cannot throw checked exceptions.
*/
private static class PrivilegedInputPipeOpener implements PrivilegedAction<InputStream> {
private final Path file;
PrivilegedInputPipeOpener(Path file) {
this.file = file;
}
@SuppressForbidden(reason = "Files.newInputStream doesn't work with Windows named pipes")
public InputStream run() {
try {
return new FileInputStream(file.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* Used to work around the limitation that privileged actions cannot throw checked exceptions.
*/
private static class PrivilegedOutputPipeOpener implements PrivilegedAction<OutputStream> {
private final Path file;
PrivilegedOutputPipeOpener(Path file) {
this.file = file;
}
@SuppressForbidden(reason = "Files.newOutputStream doesn't work with Windows named pipes")
public OutputStream run() {
try {
return new FileOutputStream(file.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}