From 351c5aa59650eda0ef4c6f04e13bd6fe3d3590e5 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 14:37:22 +0900 Subject: [PATCH 1/7] TAJO-1685: Query fails when using table data which located on local file system occasionally on fully distributed mode. --- .../tajo/master/exec/QueryExecutor.java | 67 ++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 5d42157217..bec6182357 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -47,9 +47,11 @@ import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.QueryManager; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; +import org.apache.tajo.master.rm.Worker; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; @@ -61,15 +63,20 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.TajoMasterInfo; import org.apache.tajo.session.Session; import org.apache.tajo.storage.*; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class QueryExecutor { private static final Log LOG = LogFactory.getLog(QueryExecutor.class); @@ -489,6 +496,65 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, space.prepareTable(rootNode.getChild()); } + + LogicalNode[] scans = PlannerUtil.findAllNodes(rootNode, NodeType.SCAN, NodeType.PARTITIONS_SCAN); + + // Tajo allows that the location of table would be set the path of local file system, for example, + // “file:///home/tajo/xyz”. When querying above table data on pseudo distributed mode, + // the query would finished successfully. Pseudo distributed mode for tajo means that TajoMaster and TajoWorker + // just run on the same host. But when querying the data on fully distribute mode, + // the query would failed because the data was’t located on all hosts for running TajoWorker. This will throw an + // exception with a well-defined message for avoiding users confusion. + if (scans.length > 0) { + URI localFileUri = null; + for (int i = 0; i < scans.length; i++) { + ScanNode scanNode = (ScanNode) scans[i]; + if(scanNode.getTableDesc().getUri().getScheme().equals("file")) { + localFileUri = scanNode.getTableDesc().getUri(); + break; + } + } + + if (localFileUri != null) { + ServiceTracker haService = context.getHAService(); + List masterHostList = TUtil.newList(); + if (!haService.isHighAvailable()) { + InetSocketAddress masterAddress = context.getTajoMasterService().getBindAddress(); + // If users don't setup cluster mode in tajo-site.xml, default configuration would be used and host address + // would be set to localhost. But in this case, address for TajoMaster is different from address for + // TajoWorker on pseudo distributed mode. Thus, this need to find right host address for comparing TajoWorker. + if (masterAddress.getAddress().getHostName().equals("localhost") + || masterAddress.getAddress().getHostAddress().equals("127.0.0.1")) { + masterHostList.add(masterAddress.getAddress().getLocalHost().getHostAddress()); + } else { + masterHostList.add(masterAddress.getHostName()); + } + + } else { + for(TajoMasterInfo masterInfo : context.getHAService().getMasters()) { + masterHostList.add(masterInfo.getTajoClientAddress().getAddress().getHostAddress()); + } + } + + int distributedWorkerCount = 0; + // Get hosts to run as TajoWorker. + Map workers = context.getResourceManager().getWorkers(); + for(Worker worker : workers.values()) { + WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); + // If TajoWorker run on host different from host to run TajoMaster, it couldn't get the table data. + if (!masterHostList.contains(connectionInfo.getHost())) { + distributedWorkerCount++; + } + } + + if (distributedWorkerCount > 0) { + throw new VerifyException( + String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + + ": %s", localFileUri.toString())); + } + } + + } context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); @@ -496,7 +562,6 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, QueryInfo queryInfo; queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode); - if(queryInfo == null) { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); From 3fb9b35a497d2f33f63e51e7041c8acdbaf8e47d Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 16:03:46 +0900 Subject: [PATCH 2/7] Add debug log for the Travis CI Build --- .../apache/tajo/master/exec/QueryExecutor.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index bec6182357..88812d5467 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -526,8 +526,11 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, if (masterAddress.getAddress().getHostName().equals("localhost") || masterAddress.getAddress().getHostAddress().equals("127.0.0.1")) { masterHostList.add(masterAddress.getAddress().getLocalHost().getHostAddress()); + LOG.info(">>> host:" + masterAddress.getAddress().getLocalHost().getHostAddress() + ", " + + "file:" + localFileUri.toString()); } else { masterHostList.add(masterAddress.getHostName()); + LOG.info(">>> host:" + masterAddress.getHostName() + ", file:" + localFileUri.toString()); } } else { @@ -548,13 +551,26 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, } if (distributedWorkerCount > 0) { + /// Debug codes + LOG.info(">>> start >>> file:" + localFileUri.toString()); + + for(String master: masterHostList) { + LOG.info(">>> master:" + master); + } + + for(Worker worker : workers.values()) { + WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); + LOG.info(">>> worker:" + connectionInfo.getHost()); + } + LOG.info(">>> end >>> file:" + localFileUri.toString()); + throw new VerifyException( String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + ": %s", localFileUri.toString())); } } - } + context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); From fb0cb985a3baae4968c2dc3a266ce4077e7d3990 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 16:14:41 +0900 Subject: [PATCH 3/7] Update test log level --- .../java/org/apache/tajo/master/exec/QueryExecutor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 88812d5467..20c0990f91 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -552,17 +552,17 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, if (distributedWorkerCount > 0) { /// Debug codes - LOG.info(">>> start >>> file:" + localFileUri.toString()); + LOG.warn(">>> start >>> file:" + localFileUri.toString()); for(String master: masterHostList) { - LOG.info(">>> master:" + master); + LOG.warn(">>> master:" + master); } for(Worker worker : workers.values()) { WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); - LOG.info(">>> worker:" + connectionInfo.getHost()); + LOG.warn(">>> worker:" + connectionInfo.getHost()); } - LOG.info(">>> end >>> file:" + localFileUri.toString()); + LOG.warn(">>> end >>> file:" + localFileUri.toString()); throw new VerifyException( String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + From 545147dc6088c1836f49e67125ece0f7f671294e Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 16:40:55 +0900 Subject: [PATCH 4/7] Skip checking local file existence on local mode. --- .../tajo/master/exec/QueryExecutor.java | 49 ++++++------------- 1 file changed, 15 insertions(+), 34 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 20c0990f91..7fdb24201e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -520,53 +520,34 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, List masterHostList = TUtil.newList(); if (!haService.isHighAvailable()) { InetSocketAddress masterAddress = context.getTajoMasterService().getBindAddress(); - // If users don't setup cluster mode in tajo-site.xml, default configuration would be used and host address - // would be set to localhost. But in this case, address for TajoMaster is different from address for - // TajoWorker on pseudo distributed mode. Thus, this need to find right host address for comparing TajoWorker. - if (masterAddress.getAddress().getHostName().equals("localhost") - || masterAddress.getAddress().getHostAddress().equals("127.0.0.1")) { - masterHostList.add(masterAddress.getAddress().getLocalHost().getHostAddress()); - LOG.info(">>> host:" + masterAddress.getAddress().getLocalHost().getHostAddress() + ", " + - "file:" + localFileUri.toString()); - } else { + // When specify localhost or 127.0.0.1 for TajoMaster in tajo-site.xml, skip checking local file existence. + if (!masterAddress.getAddress().getHostName().equals("localhost") + && !masterAddress.getAddress().getHostAddress().equals("127.0.0.1")) { masterHostList.add(masterAddress.getHostName()); - LOG.info(">>> host:" + masterAddress.getHostName() + ", file:" + localFileUri.toString()); } - } else { for(TajoMasterInfo masterInfo : context.getHAService().getMasters()) { masterHostList.add(masterInfo.getTajoClientAddress().getAddress().getHostAddress()); } } - int distributedWorkerCount = 0; - // Get hosts to run as TajoWorker. - Map workers = context.getResourceManager().getWorkers(); - for(Worker worker : workers.values()) { - WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); - // If TajoWorker run on host different from host to run TajoMaster, it couldn't get the table data. - if (!masterHostList.contains(connectionInfo.getHost())) { - distributedWorkerCount++; - } - } - - if (distributedWorkerCount > 0) { - /// Debug codes - LOG.warn(">>> start >>> file:" + localFileUri.toString()); - - for(String master: masterHostList) { - LOG.warn(">>> master:" + master); - } - + if (masterHostList.size() > 0) { + int distributedWorkerCount = 0; + // Get hosts to run as TajoWorker. + Map workers = context.getResourceManager().getWorkers(); for(Worker worker : workers.values()) { WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); - LOG.warn(">>> worker:" + connectionInfo.getHost()); + // If TajoWorker run on host different from host to run TajoMaster, it couldn't get the table data. + if (!masterHostList.contains(connectionInfo.getHost())) { + distributedWorkerCount++; + } } - LOG.warn(">>> end >>> file:" + localFileUri.toString()); - throw new VerifyException( - String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + + if (distributedWorkerCount > 0) { + throw new VerifyException( + String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + ": %s", localFileUri.toString())); + } } } } From a97990fe3af88c518a3273f9bf92ddcbfe658fcb Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 17:09:03 +0900 Subject: [PATCH 5/7] Trigger for travis CI build --- .../java/org/apache/tajo/master/exec/QueryExecutor.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 7fdb24201e..263b4bb9fe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -499,11 +499,10 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, LogicalNode[] scans = PlannerUtil.findAllNodes(rootNode, NodeType.SCAN, NodeType.PARTITIONS_SCAN); - // Tajo allows that the location of table would be set the path of local file system, for example, - // “file:///home/tajo/xyz”. When querying above table data on pseudo distributed mode, - // the query would finished successfully. Pseudo distributed mode for tajo means that TajoMaster and TajoWorker - // just run on the same host. But when querying the data on fully distribute mode, - // the query would failed because the data was’t located on all hosts for running TajoWorker. This will throw an + // Tajo allows that the location of table would be set the path of local file system. When querying above table + // data on pseudo distributed mode, the query would finished successfully. Pseudo distributed mode for tajo means + // that TajoMaster and TajoWorker just run on the same host. But when querying the data on fully distribute mode, + // the query would failed because the data was’t located on all hosts for running TajoWorker. This will throw an // exception with a well-defined message for avoiding users confusion. if (scans.length > 0) { URI localFileUri = null; From 7be9b65725d9d56f037d50137778845d44587e2d Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 16 Jul 2015 17:52:17 +0900 Subject: [PATCH 6/7] Trigger for travis CI build --- .../main/java/org/apache/tajo/master/exec/QueryExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 263b4bb9fe..375d0bafab 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -502,7 +502,7 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, // Tajo allows that the location of table would be set the path of local file system. When querying above table // data on pseudo distributed mode, the query would finished successfully. Pseudo distributed mode for tajo means // that TajoMaster and TajoWorker just run on the same host. But when querying the data on fully distribute mode, - // the query would failed because the data was’t located on all hosts for running TajoWorker. This will throw an + // the query would failed because the data was’t located on all hosts for running TajoWorker. This will throw an // exception with a well-defined message for avoiding users confusion. if (scans.length > 0) { URI localFileUri = null; @@ -517,6 +517,7 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, if (localFileUri != null) { ServiceTracker haService = context.getHAService(); List masterHostList = TUtil.newList(); + if (!haService.isHighAvailable()) { InetSocketAddress masterAddress = context.getTajoMasterService().getBindAddress(); // When specify localhost or 127.0.0.1 for TajoMaster in tajo-site.xml, skip checking local file existence. From accaff199bd9cd394b82641ee10a418ce759361c Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 17 Jul 2015 15:04:01 +0900 Subject: [PATCH 7/7] Update the message for exception. --- .../main/java/org/apache/tajo/master/exec/QueryExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 375d0bafab..17fa4d1359 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -545,7 +545,7 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, if (distributedWorkerCount > 0) { throw new VerifyException( - String.format("The table data should be on all hosts to run TajoWorker or be on distributed file system. " + + String.format("The table data should be be on distributed file system. " + ": %s", localFileUri.toString())); } }