Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,16 @@ public MiniDFSCluster startMiniDFSCluster(int servers,
final String hosts[])
throws IOException {

conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.toString());
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath());
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, 0);

MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
builder.hosts(hosts);
builder.numDataNodes(servers);
builder.format(true);
builder.manageNameDfsDirs(true);
builder.manageDataDfsDirs(true);
builder.storagesPerDatanode(1);
builder.waitSafeMode(true);
this.dfsCluster = builder.build();

Expand Down
2 changes: 1 addition & 1 deletion tajo-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<artifactId>jsp-2.1-jetty</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
18 changes: 16 additions & 2 deletions tajo-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<hive.version>1.1.0</hive.version>
<netty.version>4.0.33.Final</netty.version>
<jersey.version>2.6</jersey.version>
<jetty.version>6.1.14</jetty.version>
<jetty.version>6.1.26</jetty.version>
<tajo.root>${project.parent.relativePath}/..</tajo.root>
<extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
</properties>
Expand Down Expand Up @@ -485,6 +485,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<trimStackTrace>false</trimStackTrace>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -1190,7 +1193,7 @@
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<artifactId>jsp-2.1-jetty</artifactId>
<version>${jetty.version}</version>
<exclusions>
<exclusion>
Expand All @@ -1199,6 +1202,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1-glassfish</artifactId>
<version>2.1.v20091210</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jdt.core.compiler</groupId>
<artifactId>ecj</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
Expand Down Expand Up @@ -84,11 +83,12 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) thr
ChannelFuture lastContentFuture;
if (ctx.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)));
lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
} else {
// No encryption - use zero-copy.
final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
writeFuture = ctx.write(region);
writeFuture = ctx.write(region, ctx.newProgressivePromise());
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
writeFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
Expand All @@ -99,7 +99,7 @@ public void operationProgressed(ChannelProgressiveFuture future, long progress,

@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
region.release();
LOG.trace(future.channel() + " Transfer complete.");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.text.ByteBufLineReader;
Expand All @@ -35,7 +33,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.UUID;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -85,16 +82,12 @@ public void testReaderWithLocalFS() throws Exception {
fs.close();
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testReaderWithDFS() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final Configuration conf = TestFileTablespace.getTestHdfsConfiguration();

final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).waitSafeMode(true).build();
.numDataNodes(2).storagesPerDatanode(1).format(true).build();

TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand All @@ -48,7 +49,7 @@

public class TestFileTablespace {
private TajoConf conf;
private static String TEST_PATH = "target/test-data/TestFileTablespace";
private static String TEST_PATH = "target/test-data/hdfs";
private Path testDir;
private FileSystem localFs;

Expand All @@ -63,6 +64,21 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
}

public static HdfsConfiguration getTestHdfsConfiguration() {
HdfsConfiguration conf = new HdfsConfiguration();
String testDataPath = new File(TEST_PATH + "/" + UUID.randomUUID().toString()).getAbsolutePath();

String namenodeDir = new File(testDataPath, "name").getAbsolutePath();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, 0);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);

return conf;
}

@Test
public final void testGetScannerAndAppender() throws IOException {
Schema schema = new Schema();
Expand Down Expand Up @@ -102,19 +118,11 @@ public final void testGetScannerAndAppender() throws IOException {
localFs.delete(path, true);
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testGetSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);

final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
final Configuration hdfsConf = getTestHdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(1).storagesPerDatanode(1).format(true).build();

int testCount = 10;
Path tablePath = new Path("/testGetSplit");
Expand All @@ -131,7 +139,9 @@ public void testGetSplit() throws Exception {

assertTrue(fs.exists(tablePath));
FileTablespace space = new FileTablespace("testGetSplit", fs.getUri(), null);
space.init(new TajoConf(conf));
space.init(conf);

TablespaceManager.addTableSpaceForTest(space);
assertEquals(fs.getUri(), space.getUri());

Schema schema = new Schema();
Expand All @@ -153,24 +163,18 @@ public void testGetSplit() throws Exception {
assertEquals(testCount / 2, splits.size());
assertEquals(1, splits.get(0).getHosts().length);
assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);

fs.close();
} finally {
cluster.shutdown();
}
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testZeroLengthSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);

final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
final Configuration hdfsConf = getTestHdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(1).storagesPerDatanode(1).format(true).build();

int testCount = 10;
Path tablePath = new Path("/testZeroLengthSplit");
Expand All @@ -188,40 +192,37 @@ public void testZeroLengthSplit() throws Exception {
}

assertTrue(fs.exists(tablePath));

FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri(), new JSONObject());
space.init(new TajoConf(conf));
space.init(conf);
TablespaceManager.addTableSpaceForTest(space);
assertEquals(fs.getUri(), space.getUri());

Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("age",Type.INT4);
schema.addColumn("name",Type.TEXT);
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT);

List<Fragment> splits = Lists.newArrayList();
// Get FileFragments in partition batch
splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
assertEquals(0, splits.size());

fs.close();
} finally {
cluster.shutdown();
}
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);

final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
final Configuration hdfsConf = getTestHdfsConfiguration();

TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(2).storagesPerDatanode(1).format(true).build();

int testCount = 10;
Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
Expand All @@ -236,15 +237,15 @@ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
assertTrue(fs.exists(tablePath));

FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri(), null);
sm.init(new TajoConf(conf));

sm.init(new TajoConf(hdfsConf));
TablespaceManager.addTableSpaceForTest(sm);
assertEquals(fs.getUri(), sm.getUri());

Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("age", Type.INT4);
schema.addColumn("name", Type.TEXT);
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT);

List<Fragment> splits = Lists.newArrayList();
splits.addAll(sm.getSplits("data", meta, schema, tablePath));
Expand All @@ -253,23 +254,19 @@ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
assertEquals(2, splits.get(0).getHosts().length);
assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);

fs.close();
} finally {
cluster.shutdown();
}
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testGetFileTablespace() throws Exception {
final Configuration hdfsConf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final Configuration hdfsConf = getTestHdfsConfiguration();

final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(hdfsConf).numDataNodes(1).build();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(1).storagesPerDatanode(1).format(true).build();
URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");

try {
Expand Down
Loading