-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
SessionCleaner.java
82 lines (71 loc) · 2.9 KB
/
SessionCleaner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package tachyon.worker.block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.util.CommonUtils;
import tachyon.worker.WorkerContext;
/**
* SessionCleaner periodically checks if any session have become zombies, removes the zombie session
* and associated data when necessary. The syncing parameters (intervals) adopt directly from
* worker-to-master heartbeat configurations.
*/
public final class SessionCleaner implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
/** Block data manager responsible for interacting with Tachyon and UFS storage */
private final BlockDataManager mBlockDataManager;
/** Milliseconds between each check */
private final int mCheckIntervalMs;
/** Flag to indicate if the checking should continue */
private volatile boolean mRunning;
/**
* Constructor for SessionCleaner
*
* @param blockDataManager the blockDataManager this checker is updating to
*/
public SessionCleaner(BlockDataManager blockDataManager) {
mBlockDataManager = blockDataManager;
mCheckIntervalMs =
WorkerContext.getConf().getInt(Constants.WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS);
mRunning = true;
}
/**
* Main loop for the cleanup, continuously look for zombie sessions
*/
@Override
public void run() {
long lastCheckMs = System.currentTimeMillis();
while (mRunning) {
// Check the time since last check, and wait until it is within check interval
long lastIntervalMs = System.currentTimeMillis() - lastCheckMs;
long toSleepMs = mCheckIntervalMs - lastIntervalMs;
if (toSleepMs > 0) {
CommonUtils.sleepMs(LOG, toSleepMs);
} else {
LOG.warn("Session cleanup took: " + lastIntervalMs + ", expected: " + mCheckIntervalMs);
}
// Check if any sessions have become zombies, if so clean them up
mBlockDataManager.cleanupSessions();
lastCheckMs = System.currentTimeMillis();
}
}
/**
* Stops the checking, once this method is called, the object should be discarded
*/
public void stop() {
mRunning = false;
}
}