New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TAJO-1536 Fix minor issues in QueryMaster #519

Closed
wants to merge 1 commit into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+72 −73
Diff settings

Always

Just for now

@@ -33,6 +33,7 @@
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.utils.ThreadUtil;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -70,9 +71,9 @@
private TajoConf systemConf;
private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
private final Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newHashMap();
private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
private final Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newHashMap();
private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
@@ -256,37 +257,46 @@ public void handle(Event event) {
dispatcher.getEventHandler().handle(event);
}
public Query getQuery(QueryId queryId) {
return queryMasterTasks.get(queryId).getQuery();
public QueryMasterTask getQueryMasterTask(QueryId queryId) {
synchronized (queryMasterTasks) {
return queryMasterTasks.get(queryId);
}
}
public QueryMasterTask getQueryMasterTask(QueryId queryId) {
return queryMasterTasks.get(queryId);
private QueryMasterTask removeQueryMasterTask(QueryId queryId) {
synchronized (queryMasterTasks) {
return queryMasterTasks.remove(queryId);
}
}
public QueryMasterTask getFinishedQueryMasterTask(QueryId queryId) {
synchronized (finishedQueryMasterTasks) {
return finishedQueryMasterTasks.get(queryId);
}
}
public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
if(queryMasterTask != null) {
return queryMasterTask;
} else {
if(includeFinished) {
return finishedQueryMasterTasks.get(queryId);
} else {
return null;
}
QueryMasterTask queryMasterTask = getQueryMasterTask(queryId);
if (queryMasterTask == null && includeFinished) {
queryMasterTask = getFinishedQueryMasterTask(queryId);
}
return queryMasterTask;
}
public QueryMasterContext getContext() {
return this.queryMasterContext;
}
public Collection<QueryMasterTask> getQueryMasterTasks() {
return queryMasterTasks.values();
synchronized (queryMasterTasks) {
return new ArrayList<QueryMasterTask>(queryMasterTasks.values());
}
}
public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
return finishedQueryMasterTasks.values();
synchronized (finishedQueryMasterTasks) {
return new ArrayList<QueryMasterTask>(finishedQueryMasterTasks.values());
}
}
public class QueryMasterContext {
@@ -329,13 +339,15 @@ public EventHandler getEventHandler() {
}
public void stopQuery(QueryId queryId) {
QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
QueryMasterTask queryMasterTask = removeQueryMasterTask(queryId);
if(queryMasterTask == null) {
LOG.warn("No query info:" + queryId);
return;
}
finishedQueryMasterTasks.put(queryId, queryMasterTask);
synchronized (finishedQueryMasterTasks) {
finishedQueryMasterTasks.put(queryId, queryMasterTask);
}
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
@@ -433,57 +445,49 @@ public QueryHeartbeatThread() {
@Override
public void run() {
LOG.info("Start QueryMaster heartbeat thread");
while(!queryMasterStop.get()) {
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
synchronized(queryMasterTasks) {
tempTasks.addAll(queryMasterTasks.values());
ServiceTracker tracker = queryMasterContext.getWorkerContext().getServiceTracker();
LOG.info("Start QueryMaster heartbeat thread on " + tracker.getUmbilicalAddress());
for (;!queryMasterStop.get(); ThreadUtil.sleep(2000)) {
Collection<QueryMasterTask> snapshot = getQueryMasterTasks();
if (snapshot.isEmpty()) {
continue;
}
NettyClientBase tmClient;
try {
tmClient = connPool.getConnection(tracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true);
} catch (Throwable t) {
LOG.info("Failed to get connection to master", t);
continue;
}
synchronized(queryMasterTasks) {
for(QueryMasterTask eachTask: tempTasks) {
NettyClientBase tmClient;
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
try {
// TODO: make this single call
for (QueryMasterTask eachTask : snapshot) {
try {
ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
} catch (Throwable t) {
t.printStackTrace();
LOG.info("Failed to send heartbeat to master", t);
}
}
}
synchronized(queryMasterStop) {
try {
queryMasterStop.wait(2000);
} catch (InterruptedException e) {
break;
}
} finally {
connPool.releaseConnection(tmClient);
}
}
LOG.info("QueryMaster heartbeat thread stopped");
}
}
class ClientSessionTimeoutCheckThread extends Thread {
@Override
public void run() {
LOG.info("ClientSessionTimeoutCheckThread started");
while(!queryMasterStop.get()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
synchronized(queryMasterTasks) {
tempTasks.addAll(queryMasterTasks.values());
}
for (;!queryMasterStop.get(); ThreadUtil.sleep(1000)) {
for(QueryMasterTask eachTask: tempTasks) {
for(QueryMasterTask eachTask: getQueryMasterTasks()) {
if(!eachTask.isStopped()) {
try {
long lastHeartbeat = eachTask.getLastClientHeartbeat();
@@ -502,15 +506,11 @@ public void run() {
}
class FinishedQueryMasterTaskCleanThread extends Thread {
@Override
public void run() {
int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_HISTORY_EXPIRE_PERIOD);
LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
while(!queryMasterStop.get()) {
try {
Thread.sleep(60 * 1000); // minimum interval minutes
} catch (InterruptedException e) {
break;
}
for (;!queryMasterStop.get(); ThreadUtil.sleep(60 * 1000)) {
try {
long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l;
cleanExpiredFinishedQueryMasterTask(expireTime);
@@ -521,23 +521,22 @@ public void run() {
}
private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
synchronized(finishedQueryMasterTasks) {
List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) {
/* If a query are abnormal termination, the finished time will be zero. */
long finishedTime = entry.getValue().getStartTime();
Query query = entry.getValue().getQuery();
if (query != null && query.getFinishTime() > 0) {
finishedTime = query.getFinishTime();
}
if(finishedTime < expireTime) {
expiredQueryIds.add(entry.getKey());
}
List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
for (QueryMasterTask finished : getFinishedQueryMasterTasks()) {
/* If a query are abnormal termination, the finished time will be zero. */
long finishedTime = finished.getStartTime();
Query query = finished.getQuery();
if (query != null && query.getFinishTime() > 0) {
finishedTime = query.getFinishTime();
}
for(QueryId eachId: expiredQueryIds) {
if (finishedTime < expireTime) {
expiredQueryIds.add(query.getId());
}
}
synchronized (finishedQueryMasterTasks) {
for (QueryId eachId: expiredQueryIds) {
finishedQueryMasterTasks.remove(eachId);
}
}
ProTip! Use n and p to navigate between commits in a pull request.