From 0f48003e4d8464d5da45dce3869aea0bfe3c1606 Mon Sep 17 00:00:00 2001
From: JaeHwa Jung
Date: Thu, 20 Nov 2014 02:02:44 +0900
Subject: [PATCH 1/9] Initial Commit
---
tajo-core/pom.xml | 1 +
.../apache/tajo/master/ContainerProxy.java | 10 +-
.../tajo/master/DefaultTaskScheduler.java | 18 +-
.../tajo/master/LaunchTaskRunnersEvent.java | 5 +-
.../apache/tajo/master/LazyTaskScheduler.java | 9 +-
.../tajo/master/TajoContainerProxy.java | 22 +-
.../apache/tajo/master/TajoMasterService.java | 5 +-
.../tajo/master/TaskRunnerGroupEvent.java | 8 +-
.../tajo/master/container/TajoContainer.java | 170 ++++++++++
.../master/container/TajoContainerId.java | 169 ++++++++++
.../master/container/TajoConverterUtils.java | 260 +++++++++++++++
.../master/container/TajoRecordFactory.java | 26 ++
.../container/TajoRecordFactoryPBImpl.java | 103 ++++++
.../container/TajoRecordFactoryProvider.java | 69 ++++
.../tajo/master/container/TajoRecords.java | 36 ++
.../impl/pb/TajoContainerIdPBImpl.java | 95 ++++++
.../tajo/master/event/LocalTaskEvent.java | 9 +-
.../event/QueryUnitAttemptScheduleEvent.java | 10 +-
.../SubQueryContainerAllocationEvent.java | 8 +-
.../event/TaskAttemptAssignedEvent.java | 8 +-
.../tajo/master/event/TaskRequestEvent.java | 8 +-
.../master/querymaster/QueryInProgress.java | 3 +-
.../QueryMasterManagerService.java | 4 +-
.../master/querymaster/QueryUnitAttempt.java | 6 +-
.../tajo/master/querymaster/SubQuery.java | 12 +-
.../apache/tajo/master/rm/TajoRMContext.java | 8 +-
.../tajo/master/rm/TajoWorkerContainer.java | 12 +-
.../tajo/master/rm/TajoWorkerContainerId.java | 53 +--
.../master/rm/TajoWorkerResourceManager.java | 63 ++--
.../tajo/master/rm/WorkerResourceManager.java | 4 +-
.../worker/AbstractResourceAllocator.java | 14 +-
.../apache/tajo/worker/ResourceAllocator.java | 6 +-
.../tajo/worker/TajoResourceAllocator.java | 101 +++---
.../org/apache/tajo/worker/TaskRunner.java | 16 +-
.../apache/tajo/worker/TaskRunnerHistory.java | 14 +-
.../src/main/proto/ContainerProtocol.proto | 48 +++
.../src/main/proto/QueryMasterProtocol.proto | 3 +
.../main/proto/ResourceTrackerProtocol.proto | 3 +
.../src/main/proto/TajoMasterProtocol.proto | 7 +-
.../src/main/proto/TajoWorkerProtocol.proto | 5 +-
.../master/rm/TestTajoResourceManager.java | 16 +-
.../apache/tajo/storage/TestFileSystems.java | 16 +-
.../org/apache/tajo/storage/s3/INode.java | 124 -------
.../storage/s3/InMemoryFileSystemStore.java | 175 ----------
.../tajo/storage/s3/S3OutputStream.java | 234 -------------
.../storage/s3/SmallBlockS3FileSystem.java | 314 ------------------
46 files changed, 1234 insertions(+), 1076 deletions(-)
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
create mode 100644 tajo-core/src/main/proto/ContainerProtocol.proto
delete mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
delete mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
delete mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
delete mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index fce96e4b60..060ac1bb61 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -162,6 +162,7 @@
--proto_path=../tajo-client/src/main/proto--proto_path=../tajo-plan/src/main/proto--java_out=target/generated-sources/proto
+ src/main/proto/ContainerProtocol.protosrc/main/proto/ResourceTrackerProtocol.protosrc/main/proto/QueryMasterProtocol.protosrc/main/proto/TajoMasterProtocol.proto
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 59b071a890..462de91ac4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -22,11 +22,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
public abstract class ContainerProxy {
protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
@@ -45,8 +45,8 @@ protected static enum ContainerState {
protected ContainerState state;
// store enough information to be able to cleanup the container
- protected Container container;
- protected ContainerId containerID;
+ protected TajoContainer container;
+ protected TajoContainerId containerID;
protected String hostName;
protected int port = -1;
@@ -54,7 +54,7 @@ protected static enum ContainerState {
public abstract void stopContainer();
public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf,
- ExecutionBlockId executionBlockId, Container container) {
+ ExecutionBlockId executionBlockId, TajoContainer container) {
this.context = context;
this.conf = conf;
this.state = ContainerState.PREP;
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 62d4892a4e..77e32576c7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -41,6 +41,7 @@
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.QueryUnitAttempt;
import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
@@ -338,7 +339,8 @@ public class HostVolumeMapping {
private Map> unassignedTaskForEachVolume =
Collections.synchronizedMap(new HashMap>());
/** A value is last assigned volume id for each task runner */
- private HashMap lastAssignedVolumeId = new HashMap();
+ private HashMap lastAssignedVolumeId = new HashMap();
/**
* A key is disk volume id, and a value is the load of this volume.
* This load is measured by counting how many number of tasks are running.
@@ -378,7 +380,7 @@ public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt atte
* 2. unknown block or Non-splittable task in host
* 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
*/
- public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+ public synchronized QueryUnitAttemptId getLocalTask(TajoContainerId containerId) {
int volumeId;
QueryUnitAttemptId queryUnitAttemptId = null;
@@ -489,7 +491,7 @@ private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt
* @param volumeId Volume identifier
* @return the volume load (i.e., how many running tasks use this volume)
*/
- private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+ private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
int concurrency = 1;
if (diskVolumeLoads.containsKey(volumeId)) {
@@ -514,7 +516,7 @@ private synchronized int increaseConcurrency(ContainerId containerId, int volume
/**
* Decrease the count of running tasks of a certain task runner
*/
- private synchronized void decreaseConcurrency(ContainerId containerId){
+ private synchronized void decreaseConcurrency(TajoContainerId containerId){
Integer volumeId = lastAssignedVolumeId.get(containerId);
if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
Integer concurrency = diskVolumeLoads.get(volumeId);
@@ -552,11 +554,11 @@ public int getLowestVolumeId(){
}
}
- public boolean isAssigned(ContainerId containerId){
+ public boolean isAssigned(TajoContainerId containerId){
return lastAssignedVolumeId.containsKey(containerId);
}
- public boolean isRemote(ContainerId containerId){
+ public boolean isRemote(TajoContainerId containerId){
Integer volumeId = lastAssignedVolumeId.get(containerId);
if(volumeId == null || volumeId > REMOTE){
return false;
@@ -647,7 +649,7 @@ public int nonLeafTaskNum() {
public Set assignedRequest = new HashSet();
- private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+ private QueryUnitAttemptId allocateLocalTask(String host, TajoContainerId containerId){
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
@@ -778,7 +780,7 @@ public void assignToLeafTasks(LinkedList taskRequests) {
}
}
- ContainerId containerId = taskRequest.getContainerId();
+ TajoContainerId containerId = taskRequest.getContainerId();
LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + containerId);
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
index 9a4a01d922..e620afa161 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
@@ -18,9 +18,9 @@
package org.apache.tajo.master;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.container.TajoContainer;
import java.util.Collection;
@@ -29,7 +29,8 @@ public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent {
private final String planJson;
public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId,
- Collection containers, QueryContext queryContext, String planJson) {
+ Collection containers, QueryContext queryContext,
+ String planJson) {
super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers);
this.queryContext = queryContext;
this.planJson = planJson;
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index f7953e02c1..b2883cc123 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -21,7 +21,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
@@ -38,6 +37,7 @@
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.QueryUnitAttempt;
import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.FetchImpl;
@@ -246,7 +246,8 @@ private void initDiskBalancer(String[] hosts, int[] diskIds) {
}
private static class DiskBalancer {
- private HashMap containerDiskMap = new HashMap();
+ private HashMap containerDiskMap = new HashMap();
private HashMap diskReferMap = new HashMap();
private String host;
@@ -260,7 +261,7 @@ public void addDiskId(Integer diskId) {
}
}
- public Integer getDiskId(ContainerId containerId) {
+ public Integer getDiskId(TajoContainerId containerId) {
if (!containerDiskMap.containsKey(containerId)) {
assignVolumeId(containerId);
}
@@ -268,7 +269,7 @@ public Integer getDiskId(ContainerId containerId) {
return containerDiskMap.get(containerId);
}
- public void assignVolumeId(ContainerId containerId){
+ public void assignVolumeId(TajoContainerId containerId){
Map.Entry volumeEntry = null;
for (Map.Entry entry : diskReferMap.entrySet()) {
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index c236c206ea..158316e7d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -20,17 +20,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.TajoWorkerContainerId;
import org.apache.tajo.rpc.NettyClientBase;
@@ -47,7 +47,7 @@ public class TajoContainerProxy extends ContainerProxy {
private final String planJson;
public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
- Configuration conf, Container container,
+ Configuration conf, TajoContainer container,
QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
super(context, conf, executionBlockId, container);
this.queryContext = queryContext;
@@ -89,7 +89,7 @@ public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
}
}
- private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
+ private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) {
NettyClientBase tajoWorkerRpc = null;
try {
InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
@@ -149,8 +149,8 @@ public synchronized void stopContainer() {
public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
ExecutionBlockId executionBlockId,
- ContainerId containerId) throws Exception {
- List containerIds = new ArrayList();
+ TajoContainerId containerId) throws Exception {
+ List containerIds = new ArrayList();
containerIds.add(containerId);
releaseWorkerResource(context, executionBlockId, containerIds);
@@ -158,11 +158,11 @@ public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext
public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
ExecutionBlockId executionBlockId,
- List containerIds) throws Exception {
- List containerIdProtos =
- new ArrayList();
+ List containerIds) throws Exception {
+ List containerIdProtos =
+ new ArrayList();
- for(ContainerId eachContainerId: containerIds) {
+ for(TajoContainerId eachContainerId: containerIds) {
containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index ddf24d3d65..1e3501c80d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -27,6 +27,7 @@
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
@@ -128,9 +129,9 @@ public void allocateWorkerResources(
public void releaseWorkerResource(RpcController controller,
TajoMasterProtocol.WorkerResourceReleaseRequest request,
RpcCallback done) {
- List containerIds = request.getContainerIdsList();
+ List containerIds = request.getContainerIdsList();
- for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
context.getResourceManager().releaseWorkerResource(eachContainer);
}
done.run(BOOL_TRUE);
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
index 1e6655c81b..c1c6522e68 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -18,10 +18,10 @@
package org.apache.tajo.master;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.container.TajoContainer;
import java.util.Collection;
@@ -32,16 +32,16 @@ public enum EventType {
}
protected final ExecutionBlockId executionBlockId;
- protected final Collection containers;
+ protected final Collection containers;
public TaskRunnerGroupEvent(EventType eventType,
ExecutionBlockId executionBlockId,
- Collection containers) {
+ Collection containers) {
super(eventType);
this.executionBlockId = executionBlockId;
this.containers = containers;
}
- public Collection getContainers() {
+ public Collection getContainers() {
return containers;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
new file mode 100644
index 0000000000..e7d851a0a2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
TajoContainer represents an allocated resource in the cluster.
+ *
+ *
+ *
The ResourceManager is the sole authority to allocate any
+ * TajoContainer to applications. The allocated TajoContainer
+ * is always on a single node and has a unique {@link org.apache.hadoop.yarn.api.records.TajoContainerId}. It has
+ * a specific amount of {@link org.apache.hadoop.yarn.api.records.Resource} allocated.
+ *
+ *
It includes details such as:
+ *
+ *
{@link org.apache.hadoop.yarn.api.records.TajoContainerId} for the container, which is globally unique.
+ *
+ * {@link org.apache.hadoop.yarn.api.records.NodeId} of the node on which it is allocated.
+ *
+ *
HTTP uri of the node.
+ *
{@link org.apache.hadoop.yarn.api.records.Resource} allocated to the container.
+ *
{@link org.apache.hadoop.yarn.api.records.Priority} at which the container was allocated.
+ *
+ * TajoContainer {@link org.apache.hadoop.yarn.api.records.Token} of the container, used to securely verify
+ * authenticity of the allocation.
+ *
+ *
+ *
+ *
+ *
Typically, an ApplicationMaster receives the
+ * TajoContainer from the ResourceManager during
+ * resource-negotiation and then talks to the NodeManager to
+ * start/stop containers.
+ *
+ * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
+ * @see TajoContainerManagementProtocol#startTajoContainers(org.apache.hadoop.yarn.api.protocolrecords.StartTajoContainersRequest)
+ * @see TajoContainerManagementProtocol#stopTajoContainers(org.apache.hadoop.yarn.api.protocolrecords.StopTajoContainersRequest)
+ */
+@Public
+@Stable
+public abstract class TajoContainer implements Comparable {
+
+ @Private
+ @Unstable
+ public static TajoContainer newInstance(TajoContainerId containerId, NodeId nodeId,
+ String nodeHttpAddress, Resource resource, Priority priority,
+ Token containerToken) {
+ TajoContainer container = Records.newRecord(TajoContainer.class);
+ container.setId(containerId);
+ container.setNodeId(nodeId);
+ container.setNodeHttpAddress(nodeHttpAddress);
+ container.setResource(resource);
+ container.setPriority(priority);
+ container.setContainerToken(containerToken);
+ return container;
+ }
+
+ /**
+ * Get the globally unique identifier for the container.
+ * @return globally unique identifier for the container
+ */
+ @Public
+ @Stable
+ public abstract TajoContainerId getId();
+
+ @Private
+ @Unstable
+ public abstract void setId(TajoContainerId id);
+
+ /**
+ * Get the identifier of the node on which the container is allocated.
+ * @return identifier of the node on which the container is allocated
+ */
+ @Public
+ @Stable
+ public abstract NodeId getNodeId();
+
+ @Private
+ @Unstable
+ public abstract void setNodeId(NodeId nodeId);
+
+ /**
+ * Get the http uri of the node on which the container is allocated.
+ * @return http uri of the node on which the container is allocated
+ */
+ @Public
+ @Stable
+ public abstract String getNodeHttpAddress();
+
+ @Private
+ @Unstable
+ public abstract void setNodeHttpAddress(String nodeHttpAddress);
+
+ /**
+ * Get the Resource allocated to the container.
+ * @return Resource allocated to the container
+ */
+ @Public
+ @Stable
+ public abstract Resource getResource();
+
+ @Private
+ @Unstable
+ public abstract void setResource(Resource resource);
+
+ /**
+ * Get the Priority at which the TajoContainer was
+ * allocated.
+ * @return Priority at which the TajoContainer was
+ * allocated
+ */
+ @Public
+ @Stable
+ public abstract Priority getPriority();
+
+ @Private
+ @Unstable
+ public abstract void setPriority(Priority priority);
+
+ /**
+ * Get the TajoContainerToken for the container.
+ *
TajoContainerToken is the security token used by the framework
+ * to verify authenticity of any TajoContainer.
+ *
+ *
The ResourceManager, on container allocation provides a
+ * secure token which is verified by the NodeManager on
+ * container launch.
+ *
+ *
Applications do not need to care about TajoContainerToken, they
+ * are transparently handled by the framework - the allocated
+ * TajoContainer includes the TajoContainerToken.
+ *
+ * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
+ * @see TajoContainerManagementProtocol#startTajoContainers(org.apache.hadoop.yarn.api.protocolrecords.StartTajoContainersRequest)
+ *
+ * @return TajoContainerToken for the container
+ */
+ @Public
+ @Stable
+ public abstract Token getContainerToken();
+
+ @Private
+ @Unstable
+ public abstract void setContainerToken(Token containerToken);
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
new file mode 100644
index 0000000000..cd2ca7a3a6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+import java.text.NumberFormat;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
TajoContainerId represents a globally unique identifier
+ * for a {@link org.apache.hadoop.yarn.api.records.Container} in the cluster.
+ */
+@Public
+@Stable
+public abstract class TajoContainerId implements Comparable{
+
+ @Private
+ @Unstable
+ public static TajoContainerId newInstance(ApplicationAttemptId appAttemptId,
+ int containerId) {
+ TajoContainerId id = TajoRecords.newRecord(TajoContainerId.class);
+ id.setId(containerId);
+ id.setApplicationAttemptId(appAttemptId);
+ id.build();
+ return id;
+ }
+
+ /**
+ * Get the ApplicationAttemptId of the application to which the
+ * Container was assigned.
+ *
+ * Note: If containers are kept alive across application attempts via
+ * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
+ * the TajoContainerId does not necessarily contain the current
+ * running application attempt's ApplicationAttemptId This
+ * container can be allocated by previously exited application attempt and
+ * managed by the current running attempt thus have the previous application
+ * attempt's ApplicationAttemptId.
+ *
+ *
+ * @return ApplicationAttemptId of the application to which the
+ * Container was assigned
+ */
+ @Public
+ @Stable
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ @Private
+ @Unstable
+ protected abstract void setApplicationAttemptId(ApplicationAttemptId atId);
+
+ /**
+ * Get the identifier of the TajoContainerId.
+ * @return identifier of the TajoContainerId
+ */
+ @Public
+ @Stable
+ public abstract int getId();
+
+ @Private
+ @Unstable
+ protected abstract void setId(int id);
+
+
+ // TODO: fail the app submission if attempts are more than 10 or something
+ private static final ThreadLocal appAttemptIdFormat =
+ new ThreadLocal() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(2);
+ return fmt;
+ }
+ };
+ // TODO: Why thread local?
+ // ^ NumberFormat instances are not threadsafe
+ private static final ThreadLocal containerIdFormat =
+ new ThreadLocal() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(6);
+ return fmt;
+ }
+ };
+
+ @Override
+ public int hashCode() {
+ // Generated by eclipse.
+ final int prime = 435569;
+ int result = 7507;
+ result = prime * result + getId();
+ result = prime * result + getApplicationAttemptId().hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TajoContainerId other = (TajoContainerId) obj;
+ if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId()))
+ return false;
+ if (this.getId() != other.getId())
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(TajoContainerId other) {
+ if (this.getApplicationAttemptId().compareTo(
+ other.getApplicationAttemptId()) == 0) {
+ return this.getId() - other.getId();
+ } else {
+ return this.getApplicationAttemptId().compareTo(
+ other.getApplicationAttemptId());
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(4);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("container_");
+ ApplicationId appId = getApplicationAttemptId().getApplicationId();
+ sb.append(appId.getClusterTimestamp()).append("_");
+ sb.append(fmt.format(appId.getId()))
+ .append("_");
+ sb.append(
+ appAttemptIdFormat.get().format(
+ getApplicationAttemptId().getAttemptId())).append("_");
+ sb.append(containerIdFormat.get().format(getId()));
+ return sb.toString();
+ }
+
+ protected abstract void build();
+}
\ No newline at end of file
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
new file mode 100644
index 0000000000..1c58658a37
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.master.container;
+
+
+import static org.apache.hadoop.yarn.util.StringHelper._split;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+
+/**
+ * This class contains a set of utilities which help converting data structures
+ * from/to 'serializableFormat' to/from hadoop/nativejava data structures.
+ *
+ */
+@Private
+public class TajoConverterUtils {
+
+ public static final String APPLICATION_PREFIX = "application";
+ public static final String CONTAINER_PREFIX = "container";
+ public static final String APPLICATION_ATTEMPT_PREFIX = "appattempt";
+
+ /**
+ * return a hadoop path from a given url
+ *
+ * @param url
+ * url to convert
+ * @return path from {@link URL}
+ * @throws URISyntaxException
+ */
+ public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
+ String scheme = url.getScheme() == null ? "" : url.getScheme();
+
+ String authority = "";
+ if (url.getHost() != null) {
+ authority = url.getHost();
+ if (url.getUserInfo() != null) {
+ authority = url.getUserInfo() + "@" + authority;
+ }
+ if (url.getPort() > 0) {
+ authority += ":" + url.getPort();
+ }
+ }
+
+ return new Path(
+ (new URI(scheme, authority, url.getFile(), null, null)).normalize());
+ }
+
+ /**
+ * change from CharSequence to string for map key and value
+ * @param env map for converting
+ * @return string,string map
+ */
+ public static Map convertToString(
+ Map env) {
+
+ Map stringMap = new HashMap();
+ for (Entry entry: env.entrySet()) {
+ stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return stringMap;
+ }
+
+ public static URL getYarnUrlFromPath(Path path) {
+ return getYarnUrlFromURI(path.toUri());
+ }
+
+ public static URL getYarnUrlFromURI(URI uri) {
+ URL url = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(URL.class);
+ if (uri.getHost() != null) {
+ url.setHost(uri.getHost());
+ }
+ if (uri.getUserInfo() != null) {
+ url.setUserInfo(uri.getUserInfo());
+ }
+ url.setPort(uri.getPort());
+ url.setScheme(uri.getScheme());
+ url.setFile(uri.getPath());
+ return url;
+ }
+
+ public static String toString(ApplicationId appId) {
+ return appId.toString();
+ }
+
+ public static ApplicationId toApplicationId(RecordFactory recordFactory,
+ String appIdStr) {
+ Iterator it = _split(appIdStr).iterator();
+ it.next(); // prefix. TODO: Validate application prefix
+ return toApplicationId(recordFactory, it);
+ }
+
+ private static ApplicationId toApplicationId(RecordFactory recordFactory,
+ Iterator it) {
+ ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+ Integer.parseInt(it.next()));
+ return appId;
+ }
+
+ private static ApplicationAttemptId toApplicationAttemptId(
+ Iterator it) throws NumberFormatException {
+ ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+ Integer.parseInt(it.next()));
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next()));
+ return appAttemptId;
+ }
+
+ private static ApplicationId toApplicationId(
+ Iterator it) throws NumberFormatException {
+ ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+ Integer.parseInt(it.next()));
+ return appId;
+ }
+
+ public static String toString(TajoContainerId cId) {
+ return cId == null ? null : cId.toString();
+ }
+
+ public static NodeId toNodeId(String nodeIdStr) {
+ String[] parts = nodeIdStr.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid NodeId [" + nodeIdStr
+ + "]. Expected host:port");
+ }
+ try {
+ NodeId nodeId =
+ NodeId.newInstance(parts[0], Integer.parseInt(parts[1]));
+ return nodeId;
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid port: " + parts[1], e);
+ }
+ }
+
+ public static TajoContainerId toTajoContainerId(String containerIdStr) {
+ Iterator it = _split(containerIdStr).iterator();
+ if (!it.next().equals(CONTAINER_PREFIX)) {
+ throw new IllegalArgumentException("Invalid TajoContainerId prefix: "
+ + containerIdStr);
+ }
+ try {
+ ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
+ TajoContainerId containerId =
+ TajoContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()));
+ return containerId;
+ } catch (NumberFormatException n) {
+ throw new IllegalArgumentException("Invalid TajoContainerId: "
+ + containerIdStr, n);
+ }
+ }
+
+ public static ApplicationAttemptId toApplicationAttemptId(
+ String applicationAttmeptIdStr) {
+ Iterator it = _split(applicationAttmeptIdStr).iterator();
+ if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) {
+ throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+ + applicationAttmeptIdStr);
+ }
+ try {
+ return toApplicationAttemptId(it);
+ } catch (NumberFormatException n) {
+ throw new IllegalArgumentException("Invalid AppAttemptId: "
+ + applicationAttmeptIdStr, n);
+ }
+ }
+
+ public static ApplicationId toApplicationId(
+ String appIdStr) {
+ Iterator it = _split(appIdStr).iterator();
+ if (!it.next().equals(APPLICATION_PREFIX)) {
+ throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ + appIdStr + ". The valid ApplicationId should start with prefix "
+ + APPLICATION_PREFIX);
+ }
+ try {
+ return toApplicationId(it);
+ } catch (NumberFormatException n) {
+ throw new IllegalArgumentException("Invalid AppAttemptId: "
+ + appIdStr, n);
+ }
+ }
+
+ /**
+ * Convert a protobuf token into a rpc token and set its service. Supposed
+ * to be used for tokens other than RMDelegationToken. For
+ * RMDelegationToken, use
+ * {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token,
+ * org.apache.hadoop.io.Text)} instead.
+ *
+ * @param protoToken the yarn token
+ * @param serviceAddr the connect address for the service
+ * @return rpc token
+ */
+ public static Token convertFromYarn(
+ org.apache.hadoop.yarn.api.records.Token protoToken,
+ InetSocketAddress serviceAddr) {
+ Token token = new Token(protoToken.getIdentifier().array(),
+ protoToken.getPassword().array(),
+ new Text(protoToken.getKind()),
+ new Text(protoToken.getService()));
+ if (serviceAddr != null) {
+ SecurityUtil.setTokenService(token, serviceAddr);
+ }
+ return token;
+ }
+
+ /**
+ * Convert a protobuf token into a rpc token and set its service.
+ *
+ * @param protoToken the yarn token
+ * @param service the service for the token
+ */
+ public static Token convertFromYarn(
+ org.apache.hadoop.yarn.api.records.Token protoToken,
+ Text service) {
+ Token token = new Token(protoToken.getIdentifier().array(),
+ protoToken.getPassword().array(),
+ new Text(protoToken.getKind()),
+ new Text(protoToken.getService()));
+
+ if (service != null) {
+ token.setService(service);
+ }
+ return token;
+ }
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
new file mode 100644
index 0000000000..e965691367
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+@Unstable
+public interface TajoRecordFactory {
+ public T newRecordInstance(Class clazz);
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
new file mode 100644
index 0000000000..40fef81d18
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tajo.master.container;
+
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.tajo.master.container.TajoRecordFactory;
+
+@Private
+public class TajoRecordFactoryPBImpl implements TajoRecordFactory {
+
+ private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
+ private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
+
+ private static final TajoRecordFactoryPBImpl self = new TajoRecordFactoryPBImpl();
+ private Configuration localConf = new Configuration();
+ private ConcurrentMap, Constructor>> cache = new ConcurrentHashMap, Constructor>>();
+
+ private TajoRecordFactoryPBImpl() {
+ }
+
+ public static TajoRecordFactory get() {
+ return self;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T newRecordInstance(Class clazz) {
+
+ Constructor> constructor = cache.get(clazz);
+ if (constructor == null) {
+ Class> pbClazz = null;
+ try {
+ System.out.println("### 100 ### " + getPBImplClassName(clazz));
+ pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
+ System.out.println("### 200 ### " + pbClazz);
+// pbClazz = TajoRecordFactoryPBImpl.class;
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException("Failed to load class: ["
+ + getPBImplClassName(clazz) + "]", e);
+ }
+ try {
+ constructor = pbClazz.getConstructor();
+ constructor.setAccessible(true);
+ cache.putIfAbsent(clazz, constructor);
+ } catch (NoSuchMethodException e) {
+ throw new YarnRuntimeException("Could not find 0 argument constructor", e);
+ }
+ }
+ try {
+ Object retObject = constructor.newInstance();
+ return (T)retObject;
+ } catch (InvocationTargetException e) {
+ throw new YarnRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new YarnRuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ private String getPBImplClassName(Class> clazz) {
+ String srcPackagePart = getPackageName(clazz);
+ String srcClassName = getClassName(clazz);
+ String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
+ String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
+ return destPackagePart + "." + destClassPart;
+ }
+
+ private String getClassName(Class> clazz) {
+ String fqName = clazz.getName();
+ return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
+ }
+
+ private String getPackageName(Class> clazz) {
+ return clazz.getPackage().getName();
+ }
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
new file mode 100644
index 0000000000..4ac2021f44
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+@Unstable
+public class TajoRecordFactoryProvider {
+ private static Configuration defaultConf;
+
+ static {
+ defaultConf = new Configuration();
+ }
+
+ private TajoRecordFactoryProvider() {
+ }
+
+ public static TajoRecordFactory getRecordFactory(Configuration conf) {
+ if (conf == null) {
+ //Assuming the default configuration has the correct factories set.
+ //Users can specify a particular factory by providing a configuration.
+ conf = defaultConf;
+ }
+// String recordFactoryClassName = conf.get(
+// YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
+// YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
+ return (TajoRecordFactory) getFactoryClassInstance(TajoRecordFactoryPBImpl.class.getCanonicalName());
+ }
+
+ private static Object getFactoryClassInstance(String factoryClassName) {
+ try {
+ Class> clazz = Class.forName(factoryClassName);
+ Method method = clazz.getMethod("get", null);
+ method.setAccessible(true);
+ return method.invoke(null, null);
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnRuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new YarnRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
new file mode 100644
index 0000000000..f78fb693c1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Convenient API record utils
+ */
+@Unstable
+public class TajoRecords {
+ // The default record factory
+ private static final TajoRecordFactory factory =
+ TajoRecordFactoryProvider.getRecordFactory(null);
+
+ public static T newRecord(Class cls) {
+ return factory.newRecordInstance(cls);
+ }
+}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
new file mode 100644
index 0000000000..d42883fe5f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container.impl.pb;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
+
+@Private
+@Unstable
+public class TajoContainerIdPBImpl extends TajoContainerId {
+ ContainerProtocol.TajoContainerIdProto proto = null;
+ ContainerProtocol.TajoContainerIdProto.Builder builder = null;
+ private ApplicationAttemptId applicationAttemptId = null;
+
+ public TajoContainerIdPBImpl() {
+ builder = ContainerProtocol.TajoContainerIdProto.newBuilder();
+ }
+
+ public TajoContainerIdPBImpl(ContainerProtocol.TajoContainerIdProto proto) {
+ this.proto = proto;
+ this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId());
+ }
+
+ public ContainerProtocol.TajoContainerIdProto getProto() {
+ return proto;
+ }
+
+ @Override
+ public int getId() {
+ Preconditions.checkNotNull(proto);
+ return proto.getId();
+ }
+
+ @Override
+ protected void setId(int id) {
+ Preconditions.checkNotNull(builder);
+ builder.setId((id));
+ }
+
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return this.applicationAttemptId;
+ }
+
+ @Override
+ protected void setApplicationAttemptId(ApplicationAttemptId atId) {
+ if (atId != null) {
+ Preconditions.checkNotNull(builder);
+ builder.setAppAttemptId(convertToProtoFormat(atId));
+ }
+ this.applicationAttemptId = atId;
+ }
+
+ private ApplicationAttemptIdPBImpl convertFromProtoFormat(
+ ApplicationAttemptIdProto p) {
+ return new ApplicationAttemptIdPBImpl(p);
+ }
+
+ private ApplicationAttemptIdProto convertToProtoFormat(
+ ApplicationAttemptId t) {
+ return ((ApplicationAttemptIdPBImpl)t).getProto();
+ }
+
+ @Override
+ protected void build() {
+ proto = builder.build();
+ builder = null;
+ }
+}
+
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
index 92e6695dee..cab220213f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -18,18 +18,19 @@
package org.apache.tajo.master.event;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.container.TajoContainerId;
/**
* This event is sent to a running TaskAttempt on a worker.
*/
public class LocalTaskEvent extends AbstractEvent {
private final QueryUnitAttemptId taskAttemptId;
- private final ContainerId containerId;
+ private final TajoContainerId containerId;
- public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) {
+ public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, TajoContainerId containerId,
+ LocalTaskEventType eventType) {
super(eventType);
this.taskAttemptId = taskAttemptId;
this.containerId = containerId;
@@ -39,7 +40,7 @@ public QueryUnitAttemptId getTaskAttemptId() {
return taskAttemptId;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return containerId;
}
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
index a2acc7e818..6e0d9fd6bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
@@ -19,10 +19,10 @@
package org.apache.tajo.master.event;
import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.container.TajoContainerId;
public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
private final QueryUnitAttemptScheduleContext context;
@@ -44,7 +44,7 @@ public QueryUnitAttemptScheduleContext getContext() {
}
public static class QueryUnitAttemptScheduleContext {
- private ContainerId containerId;
+ private TajoContainerId containerId;
private String host;
private RpcCallback callback;
@@ -52,7 +52,7 @@ public QueryUnitAttemptScheduleContext() {
}
- public QueryUnitAttemptScheduleContext(ContainerId containerId,
+ public QueryUnitAttemptScheduleContext(TajoContainerId containerId,
String host,
RpcCallback callback) {
this.containerId = containerId;
@@ -60,11 +60,11 @@ public QueryUnitAttemptScheduleContext(ContainerId containerId,
this.callback = callback;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return containerId;
}
- public void setContainerId(ContainerId containerId) {
+ public void setContainerId(TajoContainerId containerId) {
this.containerId = containerId;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
index a8f4800fb2..e617d53139 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
@@ -18,21 +18,21 @@
package org.apache.tajo.master.event;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.container.TajoContainer;
import java.util.List;
public class SubQueryContainerAllocationEvent extends SubQueryEvent {
- private List allocatedContainer;
+ private List allocatedContainer;
public SubQueryContainerAllocationEvent(final ExecutionBlockId id,
- List allocatedContainer) {
+ List allocatedContainer) {
super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED);
this.allocatedContainer = allocatedContainer;
}
- public List getAllocatedContainer() {
+ public List getAllocatedContainer() {
return this.allocatedContainer;
}
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index e0928c5ce1..3b9edcb633 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -18,22 +18,22 @@
package org.apache.tajo.master.event;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
- private final ContainerId cId;
+ private final TajoContainerId cId;
private final WorkerConnectionInfo workerConnectionInfo;
- public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
+ public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId,
WorkerConnectionInfo connectionInfo) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.cId = cId;
this.workerConnectionInfo = connectionInfo;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return cId;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 2197c33384..9e8e3ddaef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -19,11 +19,11 @@
package org.apache.tajo.master.event;
import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
+import org.apache.tajo.master.container.TajoContainerId;
public class TaskRequestEvent extends AbstractEvent {
@@ -32,13 +32,13 @@ public enum TaskRequestEventType {
}
private final int workerId;
- private final ContainerId containerId;
+ private final TajoContainerId containerId;
private final ExecutionBlockId executionBlockId;
private final RpcCallback callback;
public TaskRequestEvent(int workerId,
- ContainerId containerId,
+ TajoContainerId containerId,
ExecutionBlockId executionBlockId,
RpcCallback callback) {
super(TaskRequestEventType.TASK_REQ);
@@ -52,7 +52,7 @@ public int getWorkerId() {
return this.workerId;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return this.containerId;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index d949ca4d60..e361c7f572 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,6 +28,7 @@
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -74,7 +75,7 @@ public class QueryInProgress extends CompositeService {
private QueryMasterProtocolService queryMasterRpcClient;
- private YarnProtos.ContainerIdProto qmContainerId;
+ private ContainerProtocol.TajoContainerIdProto qmContainerId;
public QueryInProgress(
TajoMaster.MasterContext masterContext,
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index f9539950d0..f4bd8a3650 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -25,7 +25,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
@@ -33,6 +32,7 @@
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.LazyTaskScheduler;
import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -130,7 +130,7 @@ public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestP
if(queryMasterTask == null || queryMasterTask.isStopped()) {
done.run(LazyTaskScheduler.stopTaskRunnerReq);
} else {
- ContainerId cid =
+ TajoContainerId cid =
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
LOG.debug("getTask:" + cid + ", ebId:" + ebId);
queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index db6f130c4f..d88173f973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -21,7 +21,6 @@
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.QueryUnitAttemptId;
@@ -35,6 +34,7 @@
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
+import org.apache.tajo.master.container.TajoContainerId;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -55,7 +55,7 @@ public class QueryUnitAttempt implements EventHandler {
private final QueryUnit queryUnit;
final EventHandler eventHandler;
- private ContainerId containerId;
+ private TajoContainerId containerId;
private WorkerConnectionInfo workerConnectionInfo;
private int expire;
@@ -214,7 +214,7 @@ public WorkerConnectionInfo getWorkerConnectionInfo() {
return this.workerConnectionInfo;
}
- public void setContainerId(ContainerId containerId) {
+ public void setContainerId(TajoContainerId containerId) {
this.containerId = containerId;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 96534dfc06..fba76a5288 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -24,7 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,6 +57,8 @@
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.StorageManager;
@@ -105,7 +106,8 @@ public class SubQuery implements EventHandler {
private long finishTime;
volatile Map tasks = new ConcurrentHashMap();
- volatile Map containers = new ConcurrentHashMap();
+ volatile Map containers = new ConcurrentHashMap();
private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
@@ -665,7 +667,7 @@ private void releaseContainers() {
public void releaseContainer(ContainerId containerId) {
// try to kill the container.
- ArrayList list = new ArrayList();
+ ArrayList list = new ArrayList();
list.add(containers.get(containerId));
eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
}
@@ -1129,8 +1131,8 @@ public void transition(SubQuery subQuery, SubQueryEvent event) {
try {
SubQueryContainerAllocationEvent allocationEvent =
(SubQueryContainerAllocationEvent) event;
- for (Container container : allocationEvent.getAllocatedContainer()) {
- ContainerId cId = container.getId();
+ for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
+ TajoContainerId cId = container.getId();
if (subQuery.containers.containsKey(cId)) {
subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
"Duplicated containers are allocated: " + cId.toString()));
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index 5d07ff230d..bb8cc126b7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -21,14 +21,13 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ContainerProtocol;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-
/**
* It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
*/
@@ -43,7 +42,8 @@ public class TajoRMContext {
private final ConcurrentMap inactiveWorkers = Maps.newConcurrentMap();
/** map between queryIds and query master ContainerId */
- private final ConcurrentMap qmContainerMap = Maps.newConcurrentMap();
+ private final ConcurrentMap qmContainerMap = Maps
+ .newConcurrentMap();
private final Set liveQueryMasterWorkerResources =
Collections.newSetFromMap(new ConcurrentHashMap());
@@ -77,7 +77,7 @@ public ConcurrentMap getInactiveWorkers() {
*
* @return The Map for query master containers
*/
- public ConcurrentMap getQueryMasterContainer() {
+ public ConcurrentMap getQueryMasterContainer() {
return qmContainerMap;
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
index 4d6cbd2123..1297284a75 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
@@ -19,9 +19,11 @@
package org.apache.tajo.master.rm;
import org.apache.hadoop.yarn.api.records.*;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
-public class TajoWorkerContainer extends Container {
- ContainerId id;
+public class TajoWorkerContainer extends TajoContainer {
+ TajoContainerId id;
NodeId nodeId;
Worker worker;
@@ -34,12 +36,12 @@ public void setWorkerResource(Worker workerResource) {
}
@Override
- public ContainerId getId() {
+ public TajoContainerId getId() {
return id;
}
@Override
- public void setId(ContainerId id) {
+ public void setId(TajoContainerId id) {
this.id = id;
}
@@ -94,7 +96,7 @@ public void setContainerToken(Token containerToken) {
}
@Override
- public int compareTo(Container container) {
+ public int compareTo(TajoContainer container) {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
index 634ad2b8ee..184de71fdd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -19,10 +19,11 @@
package org.apache.tajo.master.rm;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
-public class TajoWorkerContainerId extends ContainerId {
+public class TajoWorkerContainerId extends TajoContainerId {
ApplicationAttemptId applicationAttemptId;
int id;
@@ -46,43 +47,43 @@ public void setId(int id) {
this.id = id;
}
- public YarnProtos.ContainerIdProto getProto() {
+ public ContainerProtocol.TajoContainerIdProto getProto() {
YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
- .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
- .setId(applicationAttemptId.getApplicationId().getId())
- .build();
+ .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+ .setId(applicationAttemptId.getApplicationId().getId())
+ .build();
YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
- .setAttemptId(applicationAttemptId.getAttemptId())
- .setApplicationId(appIdProto)
- .build();
+ .setAttemptId(applicationAttemptId.getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
- return YarnProtos.ContainerIdProto.newBuilder()
- .setAppAttemptId(attemptIdProto)
- .setAppId(appIdProto)
- .setId(id)
- .build();
+ return ContainerProtocol.TajoContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(id)
+ .build();
}
- public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+ public static ContainerProtocol.TajoContainerIdProto getContainerIdProto(TajoContainerId containerId) {
if(containerId instanceof TajoWorkerContainerId) {
return ((TajoWorkerContainerId)containerId).getProto();
} else {
YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
- .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
- .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
- .build();
+ .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+ .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+ .build();
YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
- .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
- .setApplicationId(appIdProto)
- .build();
+ .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
- return YarnProtos.ContainerIdProto.newBuilder()
- .setAppAttemptId(attemptIdProto)
- .setAppId(appIdProto)
- .setId(containerId.getId())
- .build();
+ return ContainerProtocol.TajoContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(containerId.getId())
+ .build();
}
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 0e3ccadf9b..0cc87fc8b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -48,7 +49,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import static org.apache.tajo.ipc.TajoMasterProtocol.*;
@@ -80,7 +80,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
private TajoConf systemConf;
- private ConcurrentMap allocatedResourceMap = Maps.newConcurrentMap();
+ private ConcurrentMap allocatedResourceMap = Maps
+ .newConcurrentMap();
/** It receives status messages from workers and their resources. */
private TajoResourceTracker resourceTracker;
@@ -194,7 +195,7 @@ TajoResourceTracker getResourceTracker() {
private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) {
float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
- TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+ TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder();
@@ -235,7 +236,7 @@ public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgre
return resource;
}
- private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) {
+ private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) {
rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
}
@@ -256,9 +257,9 @@ static class WorkerResourceRequest {
WorkerResourceAllocationRequest request;
RpcCallback callBack;
WorkerResourceRequest(
- QueryId queryId,
- boolean queryMasterRequest, WorkerResourceAllocationRequest request,
- RpcCallback callBack) {
+ QueryId queryId,
+ boolean queryMasterRequest, WorkerResourceAllocationRequest request,
+ RpcCallback callBack) {
this.queryId = queryId;
this.queryMasterRequest = queryMasterRequest;
this.request = request;
@@ -282,14 +283,14 @@ public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("allocateWorkerResources:" +
- (new QueryId(resourceRequest.request.getQueryId())) +
- ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
- "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
- ", requiredContainers:" + resourceRequest.request.getNumContainers() +
- ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
- "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
- ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
- ", liveWorkers=" + rmContext.getWorkers().size());
+ (new QueryId(resourceRequest.request.getQueryId())) +
+ ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+ "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+ ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+ ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+ "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+ ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+ ", liveWorkers=" + rmContext.getWorkers().size());
}
// TajoWorkerResourceManager can't return allocated disk slots occasionally.
@@ -300,25 +301,25 @@ public void run() {
if(allocatedWorkerResources.size() > 0) {
List allocatedResources =
- new ArrayList();
+ new ArrayList();
for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
- allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
+ allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+ ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
containerId.setId(containerIdSeq.incrementAndGet());
- ContainerIdProto containerIdProto = containerId.getProto();
+ ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto();
allocatedResources.add(WorkerAllocatedResource.newBuilder()
- .setContainerId(containerIdProto)
- .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
- .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
- .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
- .build());
+ .setContainerId(containerIdProto)
+ .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
+ .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+ .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+ .build());
allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
@@ -358,7 +359,7 @@ private List chooseWorkers(WorkerResourceRequest resour
int allocatedResources = 0;
TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
- = resourceRequest.request.getResourceRequestPriority();
+ = resourceRequest.request.getResourceRequestPriority();
if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
@@ -369,7 +370,7 @@ private List chooseWorkers(WorkerResourceRequest resour
int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
- resourceRequest.request.getMinDiskSlotPerContainer());
+ resourceRequest.request.getMinDiskSlotPerContainer());
int liveWorkerSize = randomWorkers.size();
Set insufficientWorkers = new HashSet();
@@ -418,7 +419,7 @@ private List chooseWorkers(WorkerResourceRequest resour
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
- allocatedWorkerResource.allocatedMemoryMB);
+ allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
@@ -438,7 +439,7 @@ private List chooseWorkers(WorkerResourceRequest resour
float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
- resourceRequest.request.getMinMemoryMBPerContainer());
+ resourceRequest.request.getMinMemoryMBPerContainer());
int liveWorkerSize = randomWorkers.size();
Set insufficientWorkers = new HashSet();
@@ -487,7 +488,7 @@ private List chooseWorkers(WorkerResourceRequest resour
allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
- allocatedWorkerResource.allocatedMemoryMB);
+ allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
@@ -508,7 +509,7 @@ private List chooseWorkers(WorkerResourceRequest resour
* @param containerId ContainerIdProto to be released
*/
@Override
- public void releaseWorkerResource(ContainerIdProto containerId) {
+ public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) {
AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId);
if(allocated != null) {
LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB);
@@ -530,7 +531,7 @@ public void stopQueryMaster(QueryId queryId) {
LOG.warn("No QueryMaster resource info for " + queryId);
return;
} else {
- ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
+ ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
releaseWorkerResource(containerId);
rmContext.getStoppedQueryIds().add(queryId);
LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString()));
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 8e8ac51c45..9c2b71b898 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -20,8 +20,8 @@
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -63,7 +63,7 @@ public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationR
*
* @param containerId ContainerIdProto to be released
*/
- public void releaseWorkerResource(ContainerIdProto containerId);
+ public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId);
public String getSeedQueryId() throws IOException;
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index ca71c53580..68c57f2603 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -20,9 +20,9 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.master.ContainerProxy;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -41,29 +41,29 @@ public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) {
workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
}
- private Map containers = Maps.newConcurrentMap();
+ private Map containers = Maps.newConcurrentMap();
public AbstractResourceAllocator() {
super(AbstractResourceAllocator.class.getName());
}
- public void addContainer(ContainerId cId, ContainerProxy container) {
+ public void addContainer(TajoContainerId cId, ContainerProxy container) {
containers.put(cId, container);
}
- public void removeContainer(ContainerId cId) {
+ public void removeContainer(TajoContainerId cId) {
containers.remove(cId);
}
- public boolean containsContainer(ContainerId cId) {
+ public boolean containsContainer(TajoContainerId cId) {
return containers.containsKey(cId);
}
- public ContainerProxy getContainer(ContainerId cId) {
+ public ContainerProxy getContainer(TajoContainerId cId) {
return containers.get(cId);
}
- public Map getContainers() {
+ public Map getContainers() {
return containers;
}
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index 8b9219c60a..b713e70904 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -18,12 +18,12 @@
package org.apache.tajo.worker;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
public interface ResourceAllocator {
public void allocateTaskWorker();
- public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+ public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId);
public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
int numTasks, int memoryMBPerTask);
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2220089e7c..9345885acf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -23,28 +23,25 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.master.querymaster.SubQueryState;
-import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
@@ -72,11 +69,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
this.queryTaskContext = queryTaskContext;
executorService = Executors.newFixedThreadPool(
- queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
@Override
- public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+ public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) {
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
containerId.setApplicationAttemptId(appAttemptId);
@@ -98,7 +95,7 @@ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
- ", Number of Cluster Slots=" + clusterSlots);
+ ", Number of Cluster Slots=" + clusterSlots);
return Math.min(numTasks, clusterSlots);
}
@@ -121,7 +118,8 @@ public synchronized void stop() {
executorService.shutdownNow();
- Map containers = queryTaskContext.getResourceAllocator().getContainers();
+ Map containers = queryTaskContext.getResourceAllocator()
+ .getContainers();
List list = new ArrayList(containers.values());
for(ContainerProxy eachProxy: list) {
try {
@@ -156,16 +154,17 @@ public void handle(TaskRunnerGroupEvent event) {
private void launchTaskRunners(LaunchTaskRunnersEvent event) {
// Query in standby mode doesn't need launch Worker.
// But, Assign ExecutionBlock to assigned tajo worker
- for(Container eachContainer: event.getContainers()) {
+ for(TajoContainer eachContainer: event.getContainers()) {
TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
- eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
+ eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
}
}
- public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection containers) {
+ public void stopExecutionBlock(final ExecutionBlockId executionBlockId,
+ Collection containers) {
Set workers = Sets.newHashSet();
- for (Container container : containers){
+ for (TajoContainer container : containers){
workers.add(container.getNodeId());
}
@@ -196,8 +195,8 @@ private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker
protected static class LaunchRunner implements Runnable {
private final ContainerProxy proxy;
- private final ContainerId id;
- public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+ private final TajoContainerId id;
+ public LaunchRunner(TajoContainerId id, ContainerProxy proxy) {
this.proxy = proxy;
this.id = id;
}
@@ -210,8 +209,8 @@ public void run() {
}
}
- private void stopContainers(Collection containers) {
- for (Container container : containers) {
+ private void stopContainers(Collection containers) {
+ for (TajoContainer container : containers) {
final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
executorService.submit(new StopContainerRunner(container.getId(), proxy));
}
@@ -219,8 +218,8 @@ private void stopContainers(Collection containers) {
private static class StopContainerRunner implements Runnable {
private final ContainerProxy proxy;
- private final ContainerId id;
- public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+ private final TajoContainerId id;
+ public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) {
this.id = id;
this.proxy = proxy;
}
@@ -251,23 +250,23 @@ class TajoWorkerAllocationThread extends Thread {
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture callBack =
- new CallFuture();
+ new CallFuture();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
TajoMasterProtocol.WorkerResourceAllocationRequest request =
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMinMemoryMBPerContainer(requiredMemoryMB)
- .setMaxMemoryMBPerContainer(requiredMemoryMB)
- .setNumContainers(event.getRequiredNum())
- .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
- : TajoMasterProtocol.ResourceRequestPriority.DISK)
- .setMinDiskSlotPerContainer(requiredDiskSlots)
- .setMaxDiskSlotPerContainer(requiredDiskSlots)
- .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
- .build();
+ TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ .setMinMemoryMBPerContainer(requiredMemoryMB)
+ .setMaxMemoryMBPerContainer(requiredMemoryMB)
+ .setNumContainers(event.getRequiredNum())
+ .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+ : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setMinDiskSlotPerContainer(requiredDiskSlots)
+ .setMaxDiskSlotPerContainer(requiredDiskSlots)
+ .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
+ .build();
RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
NettyClientBase tmClient = null;
@@ -280,21 +279,21 @@ public void run() {
if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
} catch (Exception e) {
queryTaskContext.getQueryMasterContext().getWorkerContext().
- setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
+ setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
queryTaskContext.getQueryMasterContext().getWorkerContext().
- setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
+ setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
}
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
@@ -325,17 +324,17 @@ public void run() {
List allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
- List containers = new ArrayList();
+ List containers = new ArrayList();
for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
- eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
+ eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
- eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+ ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+ eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
containerId.setId(eachAllocatedResource.getContainerId().getId());
container.setId(containerId);
@@ -347,7 +346,7 @@ public void run() {
workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
Worker worker = new Worker(null, workerResource,
- new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
+ new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
container.setWorkerResource(worker);
addWorkerConnectionInfo(worker.getConnectionInfo());
containers.add(container);
@@ -356,8 +355,8 @@ public void run() {
SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState();
if (!SubQuery.isRunningState(state)) {
try {
- List containerIds = new ArrayList();
- for(Container eachContainer: containers) {
+ List containerIds = new ArrayList();
+ for(TajoContainer eachContainer: containers) {
containerIds.add(eachContainer.getId());
}
TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
@@ -378,10 +377,10 @@ public void run() {
}
if(event.getRequiredNum() > numAllocatedContainers) {
ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
- event.getType(), event.getExecutionBlockId(), event.getPriority(),
- event.getResource(),
- event.getRequiredNum() - numAllocatedContainers,
- event.isLeafQuery(), event.getProgress()
+ event.getType(), event.getExecutionBlockId(), event.getPriority(),
+ event.getResource(),
+ event.getRequiredNum() - numAllocatedContainers,
+ event.isLeafQuery(), event.getProgress()
);
queryTaskContext.getEventHandler().handle(shortRequestEvent);
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 191057584d..4e9860bd3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -24,15 +24,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl;
+import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
import org.jboss.netty.channel.ConnectTimeoutException;
@@ -53,7 +53,7 @@ public class TaskRunner extends AbstractService {
private volatile boolean stopped = false;
private Path baseDirPath;
- private ContainerId containerId;
+ private TajoContainerId containerId;
// for Fetcher
private ExecutorService fetchLauncher;
@@ -77,7 +77,7 @@ public TaskRunner(ExecutionBlockContext executionBlockContext, String containerI
this.fetchLauncher = Executors.newFixedThreadPool(
systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
try {
- this.containerId = ConverterUtils.toContainerId(containerId);
+ this.containerId = TajoConverterUtils.toTajoContainerId(containerId);
this.executionBlockContext = executionBlockContext;
this.history = executionBlockContext.createTaskRunnerHistory(this);
this.history.setState(getServiceState());
@@ -91,11 +91,11 @@ public String getId() {
return getId(getContext().getExecutionBlockId(), containerId);
}
- public ContainerId getContainerId(){
+ public TajoContainerId getContainerId(){
return containerId;
}
- public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
+ public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) {
return executionBlockId + "," + containerId;
}
@@ -211,7 +211,7 @@ public void run() {
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(getExecutionBlockId().getProto())
- .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .setContainerId(((TajoContainerIdPBImpl) containerId).getProto())
.setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
.build();
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index a8a11c1b46..364348f169 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -21,11 +21,11 @@
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.TajoConverterUtils;
import java.util.Collections;
import java.util.Map;
@@ -39,13 +39,13 @@
public class TaskRunnerHistory implements ProtoObject {
private Service.STATE state;
- private ContainerId containerId;
+ private TajoContainerId containerId;
private long startTime;
private long finishTime;
private ExecutionBlockId executionBlockId;
private Map taskHistoryMap = null;
- public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) {
+ public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) {
init();
this.containerId = containerId;
this.executionBlockId = executionBlockId;
@@ -53,7 +53,7 @@ public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBloc
public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
this.state = Service.STATE.valueOf(proto.getState());
- this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+ this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId());
this.startTime = proto.getStartTime();
this.finishTime = proto.getFinishTime();
this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
@@ -129,11 +129,11 @@ public void setState(Service.STATE state) {
this.state = state;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return containerId;
}
- public void setContainerId(ContainerId containerId) {
+ public void setContainerId(TajoContainerId containerId) {
this.containerId = containerId;
}
diff --git a/tajo-core/src/main/proto/ContainerProtocol.proto b/tajo-core/src/main/proto/ContainerProtocol.proto
new file mode 100644
index 0000000000..df7a450b5f
--- /dev/null
+++ b/tajo-core/src/main/proto/ContainerProtocol.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "ContainerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+package hadoop.yarn;
+
+import "Security.proto";
+import "yarn_protos.proto";
+
+message TajoContainerIdProto {
+ optional ApplicationIdProto app_id = 1;
+ optional ApplicationAttemptIdProto app_attempt_id = 2;
+ optional int32 id = 3;
+}
+
+message TajoContainerProto {
+ optional TajoContainerIdProto id = 1;
+ optional NodeIdProto nodeId = 2;
+ optional string node_http_address = 3;
+ optional ResourceProto resource = 4;
+ optional PriorityProto priority = 5;
+ optional hadoop.common.TokenProto container_token = 6;
+}
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 06d2a4281d..494d29661a 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -27,6 +27,9 @@ import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
import "TajoWorkerProtocol.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
service QueryMasterProtocolService {
//from Worker
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b117cac17d..b2db46aed4 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,8 +23,11 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "TajoMasterProtocol.proto";
+import "ContainerProtocol.proto";
import "tajo_protos.proto";
+package hadoop.yarn;
+
message NodeHeartbeat {
required WorkerConnectionInfoProto connectionInfo = 1;
optional ServerStatusProto serverStatus = 2;
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 7283543ddf..e5eab4f690 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -28,6 +28,9 @@ import "tajo_protos.proto";
import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
message ServerStatusProto {
message System {
@@ -119,11 +122,11 @@ message WorkerResourcesRequest {
message WorkerResourceReleaseRequest {
required ExecutionBlockIdProto executionBlockId = 1;
- repeated hadoop.yarn.ContainerIdProto containerIds = 2;
+ repeated TajoContainerIdProto containerIds = 2;
}
message WorkerAllocatedResource {
- required hadoop.yarn.ContainerIdProto containerId = 1;
+ required TajoContainerIdProto containerId = 1;
required WorkerConnectionInfoProto connectionInfo = 2;
required int32 allocatedMemoryMB = 3;
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e515438a10..989b0e3267 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -28,6 +28,9 @@ import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
import "Plan.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
message SessionProto {
required string session_id = 1;
@@ -170,7 +173,7 @@ message QueryExecutionRequestProto {
message GetTaskRequestProto {
required int32 workerId = 1;
- required hadoop.yarn.ContainerIdProto containerId = 2;
+ required TajoContainerIdProto containerId = 2;
required ExecutionBlockIdProto executionBlockId = 3;
}
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 04238949c9..b8fbd673f1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -23,6 +23,7 @@
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
@@ -150,7 +151,8 @@ public void testMemoryResource() throws Exception {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List containerIds = new ArrayList();
+ final List containerIds = new
+ ArrayList();
RpcCallback callBack = new RpcCallback() {
@@ -190,7 +192,7 @@ public void run(WorkerResourceAllocationResponse response) {
containerIds.add(eachResource.getContainerId());
}
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
@@ -318,7 +320,8 @@ public void testDiskResource() throws Exception {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List containerIds = new ArrayList();
+ final List containerIds = new
+ ArrayList();
RpcCallback callBack = new RpcCallback() {
@@ -356,7 +359,7 @@ public void run(WorkerResourceAllocationResponse response) {
assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
@@ -399,7 +402,8 @@ public void testDiskResourceWithStoppedQuery() throws Exception {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List containerIds = new ArrayList();
+ final List containerIds = new
+ ArrayList();
RpcCallback callBack = new RpcCallback() {
@@ -431,7 +435,7 @@ public void run(WorkerResourceAllocationResponse response) {
assertEquals(0, totalUsedDisks, 0);
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a355a94877..584a04f71d 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -20,8 +20,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.S3FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
@@ -32,8 +32,6 @@
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
-import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -60,11 +58,10 @@ public class TestFileSystems {
public TestFileSystems(FileSystem fs) throws IOException {
conf = new TajoConf();
+ conf.set("fs.local.block.size", "10");
+
+ fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- if(fs instanceof S3FileSystem){
- conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
- fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- }
this.fs = fs;
sm = StorageManager.getStorageManager(conf);
testDir = getTestDir(this.fs, TEST_PATH);
@@ -83,7 +80,7 @@ public Path getTestDir(FileSystem fs, String dir) throws IOException {
@Parameterized.Parameters
public static Collection