-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
AlluxioLogServerProcess.java
226 lines (211 loc) · 8.3 KB
/
AlluxioLogServerProcess.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package alluxio.logserver;
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.retry.RetryPolicy;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import org.apache.log4j.Hierarchy;
import org.apache.log4j.Level;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.spi.LoggerRepository;
import org.apache.log4j.spi.RootLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.Properties;
/**
* A centralized log server for Alluxio
*
* Alluxio masters and workers generate logs and store the logs in local storage.
* {@link AlluxioLogServerProcess} allows masters and workers to "push" their logs to a
* centralized log server where another copy of the logs will be stored.
*/
public class AlluxioLogServerProcess implements LogServerProcess {
private static final Logger LOG = LoggerFactory.getLogger(AlluxioLogServer.class);
private static final long STOP_TIMEOUT_MS = 60000;
private static final int BASE_SLEEP_TIME_MS = 50;
private static final int MAX_SLEEP_TIME_MS = 30000;
private static final int MAX_NUM_RETRY = 20;
private final String mBaseLogsDir;
private int mPort;
private ServerSocket mServerSocket;
private final int mMinNumberOfThreads;
private final int mMaxNumberOfThreads;
private ExecutorService mThreadPool;
private volatile boolean mStopped;
/**
* Construct an {@link AlluxioLogServerProcess} instance.
*
* @param baseLogsDir base directory to store the logs pushed from remote Alluxio servers
*/
public AlluxioLogServerProcess(String baseLogsDir) {
mPort = Configuration.getInt(PropertyKey.LOG_SERVER_PORT);
// The log server serves the logging requests from Alluxio workers, Alluxio master, Alluxio
// secondary master, and Alluxio proxy. Therefore the number of threads required by
// log server is #workers + 1 (master) + 1 (secondary master) + 1 (proxy).
mMinNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MIN) + 3;
mMaxNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MAX) + 3;
mBaseLogsDir = baseLogsDir;
mStopped = true;
}
@Override
public void start() throws Exception {
startServing();
}
@Override
public void stop() throws Exception {
stopServing();
}
@Override
public void waitForReady() {
CommonUtils.waitFor(this + " to start", new Function<Void, Boolean>() {
@Override
public Boolean apply(Void input) {
return mStopped == false;
}
}, WaitForOptions.defaults().setTimeoutMs(10000));
}
/**
* Create and start logging server and client thread pool.
*/
private void startServing() {
SynchronousQueue<Runnable> synchronousQueue =
new SynchronousQueue<>();
mThreadPool =
new ThreadPoolExecutor(mMinNumberOfThreads, mMaxNumberOfThreads,
STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS, synchronousQueue);
try {
mServerSocket = new ServerSocket(mPort);
} catch (IOException e) {
LOG.error("Failed to bind to port {}.", mPort);
throw new RuntimeException(e);
}
mStopped = false;
while (!mStopped) {
try {
Socket client = mServerSocket.accept();
InetAddress inetAddress = client.getInetAddress();
AlluxioLog4jSocketNode clientSocketNode =
new AlluxioLog4jSocketNode(this, client);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_NUM_RETRY);
while (true) {
try {
mThreadPool.execute(clientSocketNode);
break;
} catch (RejectedExecutionException e) {
if (!retryPolicy.attemptRetry()) {
LOG.warn("Connection with {} has been rejected by ExecutorService {} times"
+ "till timedout, reason: {}",
inetAddress.getHostAddress(), retryPolicy.getRetryCount(), e);
client.close();
break;
}
} catch (Error | Exception e) {
LOG.error("ExecutorService threw error: ", e);
throw e;
}
}
} catch (IOException e) {
if (!mStopped) {
LOG.warn("Socket transport error occurred during accepting message.", e);
}
break;
}
}
}
/**
* Called from another thread (actually in the shutdown hook of {@link AlluxioLogServer}
* to stop the main thread of {@link AlluxioLogServerProcess}.
*/
private void stopServing() {
mStopped = true;
if (mServerSocket != null) {
try {
mServerSocket.close();
} catch (IOException e) {
LOG.warn("Exception in closing server socket.", e);
}
}
mThreadPool.shutdown();
long timeoutMS = STOP_TIMEOUT_MS;
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
mThreadPool.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException e) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
/**
* Configure a {@link Hierarchy} instance used to retrive logger by name and maintain the logger
* hierarchy. An instance of this class will be passed to a {@link AlluxioLog4jSocketNode} so
* that the {@link AlluxioLog4jSocketNode} instance can retrieve the logger to log incoming
* {@link org.apache.log4j.spi.LoggingEvent}s.
*
* @param inetAddress inet address of the client
* @param logAppenderName name of the appender to use for this client
* @return A {@link Hierarchy} instance to pass to {@link AlluxioLog4jSocketNode}
* @throws IOException if fails to create an {@link FileInputStream} to read log4j.properties
* @throws URISyntaxException if fails to derive a valid URI from the value of property named
* "log4j.configuration"
*/
protected LoggerRepository configureHierarchy(InetAddress inetAddress, String logAppenderName)
throws IOException {
Hierarchy clientHierarchy;
String inetAddressStr = inetAddress.getHostAddress();
Properties properties = new Properties();
try {
final File configFile = new File(new URI(System.getProperty("log4j.configuration")));
try (FileInputStream inputStream = new FileInputStream(configFile)) {
properties.load(inputStream);
}
} catch (URISyntaxException e) {
// Alluxio log server cannot derive a valid path to log4j.properties. Since this
// properties file is global, we should throw an exception.
throw new RuntimeException(e);
}
Level level = Level.INFO;
clientHierarchy = new Hierarchy(new RootLogger(level));
// Startup script should guarantee that mBaseLogsDir already exists.
String logDirectoryPath = mBaseLogsDir + "/" + logAppenderName.toLowerCase();
File logDirectory = new File(logDirectoryPath);
LOG.info(logDirectoryPath);
if (!logDirectory.exists()) {
logDirectory.mkdir();
}
String logFilePath = logDirectoryPath + "/" + inetAddressStr + ".log";
properties.setProperty("log4j.rootLogger", level.toString() + ",LOGSERVER_CLIENT_LOGGER");
properties.setProperty("log4j.appender.LOGSERVER_CLIENT_LOGGER.File", logFilePath);
new PropertyConfigurator().doConfigure(properties, clientHierarchy);
return clientHierarchy;
}
}