Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.tajo.engine.query;

import org.apache.tajo.IntegrationTest;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
Expand Down Expand Up @@ -374,4 +371,18 @@ public final void testSortWithConstKeys() throws Exception {
public final void testSubQuerySortAfterGroupMultiBlocks() throws Exception {
runSimpleTests();
}

@Test
public final void testOutOfScope() throws Exception {
executeDDL("create_table_with_unique_small_dataset.sql", "table3");
// table has 5 files
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "5");
try {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
} finally {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A,1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
C,3
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
B,2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
D,4
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
E,5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create external table testOutOfScope (col1 text, col2 int4) using text with ('text.delimiter'=',') location ${table.path};
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select col1, col2 from testOutOfScope order by col1 desc, col2 desc;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
col1,col2
-------------------------------
E,5
D,4
C,3
B,2
A,1
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.tajo.engine.planner.physical;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
Expand Down Expand Up @@ -410,7 +409,8 @@ private Scanner externalMergeAndSort(List<FileFragment> chunks)
if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(frag.getPath(), true);
numDeletedFiles++;
LOG.info("Delete merged intermediate file: " + frag);

if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + frag);
}
}
info(LOG, numDeletedFiles + " merged intermediate files deleted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;

Expand Down Expand Up @@ -213,21 +213,12 @@ public Path createBaseDir() throws IOException {
}

public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
Path workDir =
StorageUtil.concatPath(
executionBlockId.getQueryId().toString(),
"output",
String.valueOf(executionBlockId.getId()));
return workDir;
return TajoPullServerService.getBaseOutputDir(
executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
}

public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
Path workDir =
StorageUtil.concatPath(
executionBlockId.getQueryId().toString(),
"in",
executionBlockId.toString());
return workDir;
return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
}

public ExecutionBlockId getExecutionBlockId() {
Expand Down
2 changes: 0 additions & 2 deletions tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,12 @@ public int getMessageReceiveCount() {

public FileChunk get() throws IOException {
if (useLocalFile) {
LOG.info("Get pseudo fetch from local host");
startTime = System.currentTimeMillis();
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FINISHED;
return fileChunk;
}

LOG.info("Get real fetch from remote host");
this.startTime = System.currentTimeMillis();
this.state = TajoProtos.FetcherState.FETCH_FETCHING;
ChannelFuture future = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ public TaskAttemptState getState() {

public void setState(TaskAttemptState state) {
this.state = state;
LOG.info("Query status of " + getTaskId() + " is changed to " + state);

if (LOG.isDebugEnabled()) {
LOG.debug("Query status of " + getTaskId() + " is changed to " + state);
}
}

public void setDataChannel(DataChannel dataChannel) {
Expand Down
111 changes: 44 additions & 67 deletions tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -45,12 +44,15 @@
import org.apache.tajo.engine.query.TaskRequest;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
Expand Down Expand Up @@ -569,7 +571,6 @@ private FileFragment[] localizeFetchedData(File file, String name, TableMeta met
if (name.equals(chunk.getEbId())) {
tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
listTablets.add(tablet);
LOG.info("One local chunk is added to listTablets");
}
}
}
Expand Down Expand Up @@ -670,6 +671,7 @@ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);

int i = 0;
int localStoreChunkCount = 0;
File storeDir;
File defaultStoreFile;
FileChunk storeChunk = null;
Expand All @@ -687,23 +689,18 @@ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,

WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
boolean hasError = false;
try {
LOG.info("Try to get local file chunk at local host");
storeChunk = getLocalStoredFileChunk(uri, systemConf);
} catch (Throwable t) {
hasError = true;
}

storeChunk = getLocalStoredFileChunk(uri, systemConf);

// When a range request is out of range, storeChunk will be NULL. This case is normal state.
// So, we should skip and don't need to create storeChunk.
if (storeChunk == null && !hasError) {
if (storeChunk == null || storeChunk.length() == 0) {
continue;
}

if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
&& hasError == false) {
if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
storeChunk.setFromRemote(false);
localStoreChunkCount++;
} else {
storeChunk = new FileChunk(defaultStoreFile, 0, -1);
storeChunk.setFromRemote(true);
Expand All @@ -717,12 +714,16 @@ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
// represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
storeChunk.setEbId(f.getName());
Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
runnerList.add(fetcher);
i++;
if (LOG.isDebugEnabled()) {
LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
}
}
}
ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
LOG.info("Create shuffle Fetchers local:" + localStoreChunkCount +
", remote:" + (runnerList.size() - localStoreChunkCount));
return runnerList;
} else {
return Lists.newArrayList();
Expand All @@ -731,56 +732,42 @@ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,

private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
// Parse the URI
LOG.info("getLocalStoredFileChunk starts");
final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
final List<String> types = params.get("type");
final List<String> qids = params.get("qid");
final List<String> taskIdList = params.get("ta");
final List<String> stageIds = params.get("sid");
final List<String> partIds = params.get("p");
final List<String> offsetList = params.get("offset");
final List<String> lengthList = params.get("length");

if (types == null || stageIds == null || qids == null || partIds == null) {
LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
return null;
}

if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
return null;
}
// Parsing the URL into key-values
final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());

String queryId = qids.get(0);
String shuffleType = types.get(0);
String sid = stageIds.get(0);
String partId = partIds.get(0);
String partId = params.get("p").get(0);
String queryId = params.get("qid").get(0);
String shuffleType = params.get("type").get(0);
String sid = params.get("sid").get(0);

if (shuffleType.equals("r") && taskIdList == null) {
LOG.error("Invalid URI - For range shuffle, taskId is required");
return null;
}
List<String> taskIds = splitMaps(taskIdList);
final List<String> taskIdList = params.get("ta");
final List<String> offsetList = params.get("offset");
final List<String> lengthList = params.get("length");

FileChunk chunk;
long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;

LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ ", taskIds=" + taskIdList);
if (LOG.isDebugEnabled()) {
LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ ", taskIds=" + taskIdList);
}

// The working directory of Tajo worker for each query, including stage
String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);

FileChunk chunk;
// If the stage requires a range shuffle
if (shuffleType.equals("r")) {
String ta = taskIds.get(0);
if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
LOG.warn("Range shuffle - file not exist");

Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
LOG.warn("Range shuffle - file not exist. " + outputPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
Expand All @@ -794,14 +781,15 @@ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IO

// If the stage requires a hash shuffle or a scattered hash shuffle
} else if (shuffleType.equals("h") || shuffleType.equals("s")) {
int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);

if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
File file = new File(path.toUri());
long startPos = (offset >= 0 && length >= 0) ? offset : 0;
long readLen = (offset >= 0 && length >= 0) ? length : file.length();
Expand All @@ -820,17 +808,6 @@ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IO
return chunk;
}

private List<String> splitMaps(List<String> mapq) {
if (null == mapq) {
return null;
}
final List<String> ret = new ArrayList<String>();
for (String s : mapq) {
Collections.addAll(ret, s.split(","));
}
return ret;
}

public static Path getTaskAttemptDir(TaskAttemptId quid) {
Path workDir =
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
Expand Down
Loading