From 1a057fd89186a13f627d067949f4958c71b1cce0 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 15 Feb 2017 10:37:43 -0800 Subject: [PATCH 01/15] Adding s3a schema and s3a implem to hdfs storage module. --- extensions-core/hdfs-storage/pom.xml | 7 ++++++- .../src/main/java/io/druid/indexer/JobHelper.java | 1 + .../java/io/druid/indexing/common/config/TaskConfig.java | 2 +- pom.xml | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index c7d8429e2764..2eb18632276d 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -140,12 +140,17 @@ emitter provided + + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + commons-io commons-io provided - + junit diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 06599dcb2722..928950349600 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -418,6 +418,7 @@ public long push() throws IOException case "hdfs": case "viewfs": case "maprfs": + case "s3a": loadSpec = ImmutableMap.of( "type", "hdfs", "path", indexOutURI.toString() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index e3fb7be6f9d7..eff0df46f384 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.3.0" + "org.apache.hadoop:hadoop-client:2.7.0" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/pom.xml b/pom.xml index 68a6fb245ad7..5613f7e21fd5 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.3.0 + 2.7.0 2.0.0 1.6.6 From cddcf7775c83dd9ed0b30bc5342261a90ff91c18 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 16 Feb 2017 13:29:59 -0800 Subject: [PATCH 02/15] use 2.7.3 --- .../main/java/io/druid/indexing/common/config/TaskConfig.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index eff0df46f384..6a9370324d0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.7.0" + "org.apache.hadoop:hadoop-client:2.7.3" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/pom.xml b/pom.xml index 5613f7e21fd5..90738445e68b 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.7.0 + 2.7.3 2.0.0 1.6.6 From 72b3d6d7d5adf41c8a9ba81cd957e74644792832 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 23 Feb 2017 08:28:50 -0800 Subject: [PATCH 03/15] use segment pusher to make loadspec --- .../segment/loading/DataSegmentPusher.java | 4 ++ .../storage/azure/AzureDataSegmentPusher.java | 14 ++++++ .../cassandra/CassandraDataSegmentPusher.java | 9 ++++ .../CloudFilesDataSegmentPusher.java | 17 ++++++++ .../google/GoogleDataSegmentPusher.java | 24 +++++++++-- .../storage/hdfs/HdfsDataSegmentPusher.java | 16 ++++--- .../druid/storage/s3/S3DataSegmentPusher.java | 12 ++++++ .../indexer/HadoopDruidIndexerConfig.java | 6 ++- .../io/druid/indexer/IndexGeneratorJob.java | 3 +- .../main/java/io/druid/indexer/JobHelper.java | 43 ++----------------- .../indexer/updater/HadoopConverterJob.java | 3 +- .../updater/HadoopDruidConverterConfig.java | 3 ++ .../indexing/common/task/IndexTaskTest.java | 7 +++ .../IngestSegmentFirehoseFactoryTest.java | 7 +++ .../indexing/overlord/TaskLifecycleTest.java | 13 ++++++ .../indexing/test/TestDataSegmentPusher.java | 8 ++++ .../loading/LocalDataSegmentPusher.java | 17 +++++--- .../appenderator/AppenderatorTester.java | 7 +++ .../java/io/druid/cli/CliRealtimeExample.java | 9 ++++ 19 files changed, 163 insertions(+), 59 deletions(-) diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index f77aa198c8a6..4a235588c7e6 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -23,6 +23,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; public interface DataSegmentPusher { @@ -30,4 +32,6 @@ public interface DataSegmentPusher String getPathForHadoop(String dataSource); String getPathForHadoop(); DataSegment push(File file, DataSegment segment) throws IOException; + //use map instead of LoadSpec class to avoid dependency pollution. + Map makeLoadSpec(URI finalIndexZipFilePath); } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 5e42b951779d..20bca6c124ea 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.Callable; @@ -174,4 +175,17 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + AzureStorageDruidModule.SCHEME, + "containerName", + config.getContainer(), + "blobPath", + uri.toString() + ); + } } diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 973de4297555..17e37a5a2e48 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -27,6 +27,7 @@ import com.netflix.astyanax.recipes.storage.ChunkedStorage; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; @@ -36,6 +37,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; /** * Cassandra Segment Pusher @@ -114,4 +117,10 @@ ImmutableMap. of("type", "c*", "key", key) compressedIndexFile.delete(); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException("not supported"); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index fbef1bfe01f5..1a7eb05cff4f 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -34,6 +34,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class CloudFilesDataSegmentPusher implements DataSegmentPusher @@ -146,4 +148,19 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + CloudFilesStorageDruidModule.SCHEME, + "region", + objectApi.getRegion(), + "container", + objectApi.getContainer(), + "path", + uri.toString() + ); + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index 5570cdd9d9ef..f455e5555774 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -37,6 +37,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; public class GoogleDataSegmentPusher implements DataSegmentPusher { @@ -84,7 +86,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen return descriptorFile; } - public void insert(final File file, final String contentType, final String path) throws IOException { + public void insert(final File file, final String contentType, final String path) throws IOException + { LOG.info("Inserting [%s] to [%s]", file, path); FileInputStream fileSteam = new FileInputStream(file); @@ -119,7 +122,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr "bucket", config.getBucket(), "path", indexPath ) - ) + ) .withBinaryVersion(version); descriptorFile = createDescriptorFile(jsonMapper, outSegment); @@ -131,7 +134,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr } catch (Exception e) { throw Throwables.propagate(e); - } finally { + } + finally { if (indexFile != null) { LOG.info("Deleting file [%s]", indexFile); indexFile.delete(); @@ -153,4 +157,18 @@ String buildPath(final String path) return path; } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of( + "type", + GoogleStorageDruidModule.SCHEME, + "bucket", + config.getBucket(), + "path", + finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + ); + } + } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 6b80e3619c25..d9ba0f432f9f 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -40,6 +40,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; +import java.util.Map; /** */ @@ -115,7 +117,6 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException storageDir, segment.getShardSpec().getPartitionNum() )); - final Path outDescriptorFile = new Path(String.format( "%s/%s/%d_descriptor.json", fullyQualifiedStorageDirectory, @@ -123,7 +124,7 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException segment.getShardSpec().getPartitionNum() )); - dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile)) + dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)); @@ -176,11 +177,6 @@ private void copyFilesWithChecks(final FileSystem fs, final Path from, final Pat } } - private ImmutableMap makeLoadSpec(Path outFile) - { - return ImmutableMap.of("type", "hdfs", "path", outFile.toUri().toString()); - } - private static class HdfsOutputStreamSupplier extends ByteSink { private final FileSystem fs; @@ -198,4 +194,10 @@ public OutputStream openStream() throws IOException return fs.create(descriptorFile); } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "hdfs", "path", finalIndexZipFilePath.toString()); + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index a035ff20c22f..f01790a63954 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -38,6 +38,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher @@ -149,4 +151,14 @@ public DataSegment call() throws Exception throw Throwables.propagate(e); } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of( + "type", "s3_zip", + "bucket", finalIndexZipFilePath.getHost(), + "key", finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + ); + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 77938c687021..6a082998e4ee 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -54,6 +54,7 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; @@ -92,9 +93,11 @@ public class HadoopDruidIndexerConfig public static final IndexMerger INDEX_MERGER; public static final IndexMergerV9 INDEX_MERGER_V9; public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG; - + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; + + static { injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -118,6 +121,7 @@ public void configure(Binder binder) INDEX_MERGER = injector.getInstance(IndexMerger.class); INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class); HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } public static enum IndexJobCounters diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 68a49c744e5e..a96f95419f92 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -754,7 +754,8 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator outputFS, segmentTemplate, context.getTaskAttemptID() - ) + ), + config.DATA_SEGMENT_PUSHER ); Path descriptorPath = config.makeDescriptorInfoPath(segment); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 928950349600..63af45aafdc2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -23,7 +23,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; @@ -35,6 +34,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; @@ -379,7 +379,8 @@ public static DataSegment serializeOutIndex( final File mergedBase, final Path finalIndexZipFilePath, final Path finalDescriptorPath, - final Path tmpPath + final Path tmpPath, + DataSegmentPusher dataSegmentPusher ) throws IOException { @@ -412,44 +413,8 @@ public long push() throws IOException log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri()); final URI indexOutURI = finalIndexZipFilePath.toUri(); - final ImmutableMap loadSpec; - // TODO: Make this a part of Pushers or Pullers - switch (outputFS.getScheme()) { - case "hdfs": - case "viewfs": - case "maprfs": - case "s3a": - loadSpec = ImmutableMap.of( - "type", "hdfs", - "path", indexOutURI.toString() - ); - break; - case "gs": - loadSpec = ImmutableMap.of( - "type", "google", - "bucket", indexOutURI.getHost(), - "path", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "s3": - case "s3n": - loadSpec = ImmutableMap.of( - "type", "s3_zip", - "bucket", indexOutURI.getHost(), - "key", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "file": - loadSpec = ImmutableMap.of( - "type", "local", - "path", indexOutURI.getPath() - ); - break; - default: - throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme()); - } final DataSegment finalSegment = segmentTemplate - .withLoadSpec(loadSpec) + .withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI)) .withSize(size.get()) .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 3675b2a84e57..60159e2d62a7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -569,7 +569,8 @@ protected void map( outputFS, finalSegmentTemplate, context.getTaskAttemptID() - ) + ), + config.DATA_SEGMENT_PUSHER ); context.progress(); context.setStatus("Finished PUSH"); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java index 57f2b8f8c51c..a3574f16cb25 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java @@ -37,6 +37,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -53,6 +54,7 @@ public class HadoopDruidConverterConfig public static final ObjectMapper jsonMapper; public static final IndexIO INDEX_IO; public static final IndexMerger INDEX_MERGER; + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -75,6 +77,7 @@ public void configure(Binder binder) jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class); INDEX_IO = injector.getInstance(IndexIO.class); INDEX_MERGER = injector.getInstance(IndexMerger.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } private static final TypeReference> mapTypeReference = new TypeReference>() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 94b3459f3921..7480ac26b551 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -550,6 +551,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), indexMerger, indexIO, null, null, indexMergerV9 ) diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 88c2cebf9d6f..71b656e57b44 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -93,6 +93,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -231,6 +232,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, new DataSegmentKiller() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6fe8e11d95bf..e25444d11592 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -122,6 +122,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -477,6 +478,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments++; return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; } @@ -1017,6 +1024,12 @@ public DataSegment push(File file, DataSegment dataSegment) throws IOException { throw new RuntimeException("FAILURE"); } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index ad99827052d2..923a80299605 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -26,6 +26,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.Set; public class TestDataSegmentPusher implements DataSegmentPusher @@ -52,6 +54,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException return segment; } + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + public Set getPushedSegments() { return ImmutableSet.copyOf(pushedSegments); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 096e7dd07987..e1592dcb3cbf 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -33,7 +33,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.FileAlreadyExistsException; +import java.util.Map; import java.util.UUID; /** @@ -86,7 +88,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce } return createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outDir)) + segment.withLoadSpec(makeLoadSpec(outDir.toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), outDir @@ -98,7 +100,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce final long size = compressSegment(dataSegmentFile, tmpOutDir); final DataSegment dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip"))) + segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), tmpOutDir @@ -118,6 +120,12 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce return dataSegment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "local", "path", finalIndexZipFilePath.getPath()); + } + private String intermediateDirFor(String storageDir) { return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString(); @@ -138,9 +146,4 @@ private DataSegment createDescriptorFile(DataSegment segment, File outDir) throw Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); return segment; } - - private ImmutableMap makeLoadSpec(File outFile) - { - return ImmutableMap.of("type", "local", "path", outFile.toString()); - } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index a692c5b170ad..98399784c511 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -61,6 +61,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -186,6 +187,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; appenderator = Appenderators.createRealtime( schema, diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 0f072cfbf635..b6516fb4c66d 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -20,6 +20,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; @@ -39,7 +40,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; @@ -152,6 +155,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of(); + } } private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer From 817b98552d7a4fe842e759f13cf4e35f4ea3ae73 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Sat, 25 Mar 2017 11:08:51 -0700 Subject: [PATCH 04/15] move getStorageDir and makeLoad spec under DataSegmentPusher --- .../segment/loading/DataSegmentPusher.java | 26 ++ .../loading/DataSegmentPusherUtil.java | 66 ---- .../loading/DataSegmentPusherUtilTest.java | 56 ---- .../storage/azure/AzureDataSegmentPusher.java | 14 +- .../azure/AzureDataSegmentPusherTest.java | 5 +- .../cassandra/CassandraDataSegmentPusher.java | 4 +- .../CloudFilesDataSegmentPusher.java | 15 +- .../storage/cloudfiles/CloudFilesUtils.java | 7 - .../google/GoogleDataSegmentPusher.java | 25 +- .../google/GoogleDataSegmentPusherTest.java | 3 +- extensions-core/hdfs-storage/pom.xml | 8 +- .../storage/hdfs/HdfsDataSegmentPusher.java | 37 ++- .../hdfs/HdfsDataSegmentPusherTest.java | 293 +++++++++++++++++- .../druid/storage/s3/S3DataSegmentMover.java | 3 +- .../druid/storage/s3/S3DataSegmentPusher.java | 33 +- .../storage/s3/S3DataSegmentPusherConfig.java | 13 + .../java/io/druid/storage/s3/S3Utils.java | 6 +- .../io/druid/indexer/IndexGeneratorJob.java | 9 +- .../main/java/io/druid/indexer/JobHelper.java | 71 +---- .../indexer/updater/HadoopConverterJob.java | 11 +- .../indexer/HadoopDruidIndexerConfigTest.java | 231 -------------- .../task/SameIntervalMergeTaskTest.java | 8 + .../loading/LocalDataSegmentPusher.java | 2 +- .../SegmentLoaderLocalCacheManager.java | 8 +- .../loading/LocalDataSegmentPusherTest.java | 6 +- 25 files changed, 449 insertions(+), 511 deletions(-) delete mode 100644 api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java delete mode 100644 api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index 4a235588c7e6..9638ed48b15c 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -19,6 +19,7 @@ package io.druid.segment.loading; +import com.google.common.base.Joiner; import io.druid.timeline.DataSegment; import java.io.File; @@ -28,10 +29,35 @@ public interface DataSegmentPusher { + Joiner JOINER = Joiner.on("/").skipNulls(); + @Deprecated String getPathForHadoop(String dataSource); String getPathForHadoop(); DataSegment push(File file, DataSegment segment) throws IOException; //use map instead of LoadSpec class to avoid dependency pollution. Map makeLoadSpec(URI finalIndexZipFilePath); + default String getStorageDir(DataSegment dataSegment) { + return getDefaultStorageDir(dataSegment); + } + default String makeIndexPathName(DataSegment dataSegment, String indexName) { + return String.format("./%s/%s", getStorageDir(dataSegment),indexName); + } + + // Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/ + // If above format is ever changed, make sure to change it appropriately in other places + // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories + // on segment deletion if segment being deleted was the only segment + static String getDefaultStorageDir(DataSegment segment) { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart(), + segment.getInterval().getEnd() + ), + segment.getVersion(), + segment.getShardSpec().getPartitionNum() + ); + } } diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java deleted file mode 100644 index 7daa125088d9..000000000000 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.base.Joiner; -import io.druid.timeline.DataSegment; -import org.joda.time.format.ISODateTimeFormat; - -/** - */ -public class DataSegmentPusherUtil -{ - private static final Joiner JOINER = Joiner.on("/").skipNulls(); - - // Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/ - // If above format is ever changed, make sure to change it appropriately in other places - // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories - // on segment deletion if segment being deleted was the only segment - public static String getStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart(), - segment.getInterval().getEnd() - ), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - } - - /** - * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in - * path names. So we format paths differently for HDFS. - */ - public static String getHdfsStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), - segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) - ), - segment.getVersion().replaceAll(":", "_") - ); - } -} diff --git a/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java b/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java deleted file mode 100644 index d2f1c6eab400..000000000000 --- a/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.collect.ImmutableMap; -import io.druid.timeline.DataSegment; -import io.druid.timeline.partition.NoneShardSpec; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; - -public class DataSegmentPusherUtilTest -{ - @Test - public void shouldNotHaveColonsInHdfsStorageDir() throws Exception - { - - Interval interval = new Interval("2011-10-01/2011-10-02"); - ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); - - DataSegment segment = new DataSegment( - "something", - interval, - "brand:new:version", - loadSpec, - Arrays.asList("dim1", "dim2"), - Arrays.asList("met1", "met2"), - NoneShardSpec.instance(), - null, - 1 - ); - - String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); - Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir); - - } -} diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 20bca6c124ea..0b9f7bea9afa 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -29,7 +29,6 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; @@ -86,7 +85,7 @@ public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final Dat public Map getAzurePaths(final DataSegment segment) { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); return ImmutableMap.of( "index", String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), @@ -110,16 +109,7 @@ public DataSegment uploadDataSegment( final DataSegment outSegment = segment .withSize(size) - .withLoadSpec( - ImmutableMap.of( - "type", - AzureStorageDruidModule.SCHEME, - "containerName", - config.getContainer(), - "blobPath", - azurePaths.get("index") - ) - ) + .withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index")))) .withBinaryVersion(version); log.info("Deleting file [%s]", compressedSegmentData); diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index 6cb9680ecbb2..e5913c50f5e1 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -27,7 +27,6 @@ import com.microsoft.azure.storage.StorageException; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.MapUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMockSupport; @@ -112,9 +111,9 @@ public void testPush() throws Exception @Test public void getAzurePathsTest() { - final String storageDir = DataSegmentPusherUtil.getStorageDir(dataSegment); - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + final String storageDir = pusher.getStorageDir(dataSegment); Map paths = pusher.getAzurePaths(dataSegment); assertEquals(String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index")); diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 17e37a5a2e48..b26253e03ed1 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -27,11 +27,9 @@ import com.netflix.astyanax.recipes.storage.ChunkedStorage; import io.druid.java.util.common.CompressionUtils; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; @@ -80,7 +78,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO log.info("Writing [%s] to C*", indexFilesDir); String key = JOINER.join( config.getKeyspace().isEmpty() ? null : config.getKeyspace(), - DataSegmentPusherUtil.getStorageDir(segment) + this.getStorageDir(segment) ); // Create index diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index 1a7eb05cff4f..a17a6872b586 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -77,7 +77,7 @@ public String getPathForHadoop(final String dataSource) @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { - final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment); + final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment)); File descriptorFile = null; File zipOutFile = null; @@ -114,18 +114,7 @@ public DataSegment call() throws Exception final DataSegment outSegment = inSegment .withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", - CloudFilesStorageDruidModule.SCHEME, - "region", - segmentData.getRegion(), - "container", - segmentData.getContainer(), - "path", - segmentData.getPath() - ) - ) + .withLoadSpec(makeLoadSpec(new URI(segmentData.getPath()))) .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); return outSegment; diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java index 31b4cafdf383..e409964de399 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java @@ -22,8 +22,6 @@ import com.google.common.base.Predicate; import io.druid.java.util.common.RetryUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; -import io.druid.timeline.DataSegment; import java.io.IOException; import java.util.concurrent.Callable; @@ -70,9 +68,4 @@ public static String buildCloudFilesPath(String basePath, final String fileName) return path; } - public static String buildCloudFilesPath(String basePath, final DataSegment segment) - { - return buildCloudFilesPath(basePath, DataSegmentPusherUtil.getStorageDir(segment)); - } - } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index f455e5555774..d94f233c81f1 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -30,7 +30,6 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; @@ -110,19 +109,13 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr try { indexFile = File.createTempFile("index", ".zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile); - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); final String indexPath = buildPath(storageDir + "/" + "index.zip"); final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json"); final DataSegment outSegment = segment .withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", GoogleStorageDruidModule.SCHEME, - "bucket", config.getBucket(), - "path", indexPath - ) - ) + .withLoadSpec(makeLoadSpec(config.getBucket(), indexPath)) .withBinaryVersion(version); descriptorFile = createDescriptorFile(jsonMapper, outSegment); @@ -161,13 +154,15 @@ String buildPath(final String path) @Override public Map makeLoadSpec(URI finalIndexZipFilePath) { + // remove the leading "/" + return makeLoadSpec(config.getBucket(),finalIndexZipFilePath.getPath().substring(1)); + } + + private Map makeLoadSpec(String bucket, String path) { return ImmutableMap.of( - "type", - GoogleStorageDruidModule.SCHEME, - "bucket", - config.getBucket(), - "path", - finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + "type", GoogleStorageDruidModule.SCHEME, + "bucket", bucket, + "path", path ); } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java index d85d57eb1017..83d35601ff18 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Maps; import com.google.common.io.Files; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -106,7 +105,7 @@ public void testPush() throws Exception jsonMapper ).addMockedMethod("insert", File.class, String.class, String.class).createMock(); - final String storageDir = DataSegmentPusherUtil.getStorageDir(segmentToPush); + final String storageDir = pusher.getStorageDir(segmentToPush); final String indexPath = prefix + "/" + storageDir + "/" + "index.zip"; final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json"; diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 2eb18632276d..8dd014629b99 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -183,6 +183,12 @@ ${hadoop.compile.version} test - + + io.druid + druid-indexing-hadoop + ${project.parent.version} + test + + diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index d9ba0f432f9f..337b6276bf45 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -20,6 +20,7 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; @@ -29,13 +30,13 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HadoopFsWrapper; import org.apache.hadoop.fs.Path; +import org.joda.time.format.ISODateTimeFormat; import java.io.File; import java.io.IOException; @@ -86,7 +87,7 @@ public String getPathForHadoop() @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); + final String storageDir = this.getStorageDir(segment); log.info( "Copying segment[%s] to HDFS at location[%s/%s]", @@ -200,4 +201,36 @@ public Map makeLoadSpec(URI finalIndexZipFilePath) { return ImmutableMap.of("type", "hdfs", "path", finalIndexZipFilePath.toString()); } + + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + + @Override + public String getStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_") + ); + } + + @Override + public String makeIndexPathName(DataSegment dataSegment, String indexName) + { + return String.format( + "./%s/%d_%s", + this.getStorageDir(dataSegment), + dataSegment.getShardSpec().getPartitionNum(), + indexName + ); + } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index f341cd4d65dd..c476343f7904 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -30,20 +31,33 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import io.druid.indexer.Bucket; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.JobHelper; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.jackson.GranularityModule; +import io.druid.segment.loading.LocalDataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusherConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -51,18 +65,33 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; /** */ public class HdfsDataSegmentPusherTest { + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @Rule public final ExpectedException expectedException = ExpectedException.none(); - TestObjectMapper objectMapper = new TestObjectMapper(); + static TestObjectMapper objectMapper = new TestObjectMapper(); + + private HdfsDataSegmentPusher hdfsDataSegmentPusher; + @Before + public void setUp() throws IOException + { + HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new HdfsDataSegmentPusherConfig(); + hdfsDataSegmentPusherConf.setStorageDirectory("path/to/"); + hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), objectMapper); + } + static { + objectMapper = new TestObjectMapper(); + objectMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, objectMapper)); + } @Test public void testPushWithScheme() throws Exception @@ -133,7 +162,7 @@ private void testUsingScheme(final String scheme) throws Exception String indexUri = String.format( "%s/%s/%d_index.zip", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush), + pusher.getStorageDir(segmentToPush), segmentToPush.getShardSpec().getPartitionNum() ); @@ -146,7 +175,7 @@ private void testUsingScheme(final String scheme) throws Exception indexUri ), segment.getLoadSpec()); // rename directory after push - final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment); + final String segmentPath = pusher.getStorageDir(segment); File indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -217,7 +246,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n String indexUri = String.format( "%s/%s/%d_index.zip", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - DataSegmentPusherUtil.getHdfsStorageDir(segments[i]), + pusher.getStorageDir(segments[i]), segments[i].getShardSpec().getPartitionNum() ); @@ -230,7 +259,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), pushedSegment.getLoadSpec()); // rename directory after push - String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment); + String segmentPath = pusher.getStorageDir(pushedSegment); File indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -259,7 +288,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), fromDescriptorFileDataSegment.getLoadSpec()); // rename directory after push - segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment); + segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment); indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -282,7 +311,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n } } - public class TestObjectMapper extends ObjectMapper + public static class TestObjectMapper extends ObjectMapper { public TestObjectMapper() { @@ -292,7 +321,9 @@ public TestObjectMapper() configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(SerializationFeature.INDENT_OUTPUT, false); + configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))); + registerModule(new GranularityModule()); } public class TestModule extends SimpleModule @@ -317,4 +348,250 @@ public Interval deserialize( } } + @Test + public void shouldNotHaveColonsInHdfsStorageDir() throws Exception + { + + Interval interval = new Interval("2011-10-01/2011-10-02"); + ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + DataSegment segment = new DataSegment( + "something", + interval, + "brand:new:version", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + NoneShardSpec.instance(), + null, + 1 + ); + + String storageDir = hdfsDataSegmentPusher.getStorageDir(segment); + Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir); + + } + + + @Test + public void shouldMakeHDFSCompliantSegmentOutputPath() + { + HadoopIngestionSpec schema; + + try { + schema = objectMapper.readValue( + "{\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"source\",\n" + + " \"metricsSpec\": [],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"hour\",\n" + + " \"intervals\": [\"2012-07-10/P1D\"]\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"type\": \"hadoop\",\n" + + " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n" + + " }\n" + + "}", + HadoopIngestionSpec.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + //DataSchema dataSchema = new DataSchema("dataSource", null, null, Gra) + //schema = new HadoopIngestionSpec(dataSchema, ioConfig, HadoopTuningConfig.makeDefaultTuningConfig()); + HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( + schema.withTuningConfig( + schema.getTuningConfig() + .withVersion( + "some:brand:new:version" + ) + ) + ); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.INDEX_ZIP, + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_index.zip", + path.toString() + ); + + path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.DESCRIPTOR_JSON, + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_descriptor.json", + path.toString() + ); + + path = JobHelper.makeTmpPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0), + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_index.zip.0", + path.toString() + ); + + } + + @Test + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() + { + final HadoopIngestionSpec schema; + + try { + schema = objectMapper.readValue( + "{\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"the:data:source\",\n" + + " \"metricsSpec\": [],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"hour\",\n" + + " \"intervals\": [\"2012-07-10/P1D\"]\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"type\": \"hadoop\",\n" + + " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n" + + " }\n" + + "}", + HadoopIngestionSpec.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( + schema.withTuningConfig( + schema.getTuningConfig() + .withVersion( + "some:brand:new:version" + ) + ) + ); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.INDEX_ZIP, + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/index.zip", + path.toString() + ); + + path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.DESCRIPTOR_JSON, + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/descriptor.json", + path.toString() + ); + + path = JobHelper.makeTmpPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0), + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/index.zip.0", + path.toString() + ); + + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 352d6cca15ac..23b331f63251 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import org.jets3t.service.ServiceException; @@ -69,7 +70,7 @@ public DataSegment move(DataSegment segment, Map targetLoadSpec) final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); - final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment); + final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, DataSegmentPusher.getDefaultStorageDir(segment)); String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); if (targetS3Bucket.isEmpty()) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index f01790a63954..4d1caf3203f9 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -67,6 +67,9 @@ public S3DataSegmentPusher( @Override public String getPathForHadoop() { + if (config.isUseS3aSchema()) { + return String.format("s3a://%s/%s", config.getBucket(), config.getBaseKey()); + } return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey()); } @@ -80,7 +83,7 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { - final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment)); log.info("Copying segment[%s] to S3 at location[%s]", inSegment.getIdentifier(), s3Path); @@ -109,16 +112,7 @@ public DataSegment call() throws Exception s3Client.putObject(outputBucket, toPush); final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", - "s3_zip", - "bucket", - outputBucket, - "key", - toPush.getKey() - ) - ) + .withLoadSpec(makeLoadSpec(outputBucket, toPush.getKey())) .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); File descriptorFile = File.createTempFile("druid", "descriptor.json"); @@ -154,11 +148,22 @@ public DataSegment call() throws Exception @Override public Map makeLoadSpec(URI finalIndexZipFilePath) + { + // remove the leading "/" + return makeLoadSpec(finalIndexZipFilePath.getHost(), finalIndexZipFilePath.getPath().substring(1)); + } + + private Map makeLoadSpec(String bucket, String key) { return ImmutableMap.of( - "type", "s3_zip", - "bucket", finalIndexZipFilePath.getHost(), - "key", finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + "type", + "s3_zip", + "bucket", + bucket, + "key", + key, + "S3Schema", + config.isUseS3aSchema() ? "s3a" : "s3n" ); } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java index 7937ea339d17..28c5327da6cf 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java @@ -39,6 +39,9 @@ public class S3DataSegmentPusherConfig @JsonProperty @Min(0) private int maxListingLength = 1000; + // use s3n by default for backward compatibility + @JsonProperty + private boolean useS3aSchema = false; public void setBucket(String bucket) { @@ -60,6 +63,16 @@ public void setMaxListingLength(int maxListingLength) this.maxListingLength = maxListingLength; } + public boolean isUseS3aSchema() + { + return useS3aSchema; + } + + public void setUseS3aSchema(boolean useS3aSchema) + { + this.useS3aSchema = useS3aSchema; + } + public String getBucket() { return bucket; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 0fa86781d548..5d97e79ab25d 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -24,8 +24,6 @@ import com.google.common.base.Throwables; import io.druid.java.util.common.RetryUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; -import io.druid.timeline.DataSegment; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -187,11 +185,11 @@ public void remove() }; } - public static String constructSegmentPath(String baseKey, DataSegment segment) + public static String constructSegmentPath(String baseKey, String storageDir) { return JOINER.join( baseKey.isEmpty() ? null : baseKey, - DataSegmentPusherUtil.getStorageDir(segment) + storageDir ) + "/index.zip"; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a96f95419f92..f3bb6b63bddd 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -741,19 +741,22 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - JobHelper.INDEX_ZIP + JobHelper.INDEX_ZIP, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeFileNamePath( new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - JobHelper.DESCRIPTOR_JSON + JobHelper.DESCRIPTOR_JSON, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeTmpPath( new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - context.getTaskAttemptID() + context.getTaskAttemptID(), + config.DATA_SEGMENT_PUSHER ), config.DATA_SEGMENT_PUSHER ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 63af45aafdc2..d9a99db3d7d5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -35,7 +35,6 @@ import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -541,77 +540,33 @@ private static void createNewZipEntry(ZipOutputStream out, File file) throws IOE out.putNextEntry(new ZipEntry(file.getName())); } - public static boolean isHdfs(FileSystem fs) - { - return "hdfs".equals(fs.getScheme()) || "viewfs".equals(fs.getScheme()) || "maprfs".equals(fs.getScheme()); - } - public static Path makeFileNamePath( final Path basePath, final FileSystem fs, final DataSegment segmentTemplate, - final String baseFileName + final String baseFileName, + DataSegmentPusher dataSegmentPusher ) { - final Path finalIndexZipPath; - final String segmentDir; - if (isHdfs(fs)) { - segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate); - finalIndexZipPath = new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_%s", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - baseFileName - ) - ); - } else { - segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate); - finalIndexZipPath = new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%s", - segmentDir, - baseFileName - ) - ); - } - return finalIndexZipPath; + return new Path(prependFSIfNullScheme(fs, basePath), + dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)); } public static Path makeTmpPath( final Path basePath, final FileSystem fs, final DataSegment segmentTemplate, - final TaskAttemptID taskAttemptID + final TaskAttemptID taskAttemptID, + DataSegmentPusher dataSegmentPusher ) { - final String segmentDir; - - if (isHdfs(fs)) { - segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate); - return new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_index.zip.%d", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - taskAttemptID.getId() - ) - ); - } else { - segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate); - return new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_index.zip.%d", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - taskAttemptID.getId() - ) - ); - } + return new Path( + prependFSIfNullScheme(fs, basePath), + String.format("./%s.%d", + dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), + taskAttemptID.getId() + ) + ); } /** diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 60159e2d62a7..ab92c7032051 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -556,20 +556,23 @@ protected void map( baseOutputPath, outputFS, finalSegmentTemplate, - JobHelper.INDEX_ZIP + JobHelper.INDEX_ZIP, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeFileNamePath( baseOutputPath, outputFS, finalSegmentTemplate, - JobHelper.DESCRIPTOR_JSON + JobHelper.DESCRIPTOR_JSON, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeTmpPath( baseOutputPath, outputFS, finalSegmentTemplate, - context.getTaskAttemptID() - ), + context.getTaskAttemptID(), + config.DATA_SEGMENT_PUSHER + ), config.DATA_SEGMENT_PUSHER ); context.progress(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 4b6cc7ce8a36..e815070092aa 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -31,15 +30,8 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; -import io.druid.timeline.partition.NumberedShardSpec; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -58,229 +50,6 @@ public class HadoopDruidIndexerConfigTest jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); } - public static T jsonReadWriteRead(String s, Class klass) - { - try { - return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Test - public void shouldMakeHDFSCompliantSegmentOutputPath() - { - HadoopIngestionSpec schema; - - try { - schema = jsonReadWriteRead( - "{\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"source\",\n" - + " \"metricsSpec\": [],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"hour\",\n" - + " \"intervals\": [\"2012-07-10/P1D\"]\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"type\": \"hadoop\",\n" - + " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n" - + " }\n" - + "}", - HadoopIngestionSpec.class - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( - schema.withTuningConfig( - schema.getTuningConfig() - .withVersion( - "some:brand:new:version" - ) - ) - ); - - Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.INDEX_ZIP - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_index.zip", - path.toString() - ); - - path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.DESCRIPTOR_JSON - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_descriptor.json", - path.toString() - ); - - path = JobHelper.makeTmpPath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0) - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_index.zip.0", - path.toString() - ); - - } - - @Test - public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() - { - final HadoopIngestionSpec schema; - - try { - schema = jsonReadWriteRead( - "{\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"the:data:source\",\n" - + " \"metricsSpec\": [],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"hour\",\n" - + " \"intervals\": [\"2012-07-10/P1D\"]\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"type\": \"hadoop\",\n" - + " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n" - + " }\n" - + "}", - HadoopIngestionSpec.class - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( - schema.withTuningConfig( - schema.getTuningConfig() - .withVersion( - "some:brand:new:version" - ) - ) - ); - - Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.INDEX_ZIP - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/index.zip", - path.toString() - ); - - path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.DESCRIPTOR_JSON - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/descriptor.json", - path.toString() - ); - - path = JobHelper.makeTmpPath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0) - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/4712_index.zip.0", - path.toString() - ); - - } @Test public void testHashedBucketSelection() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 5bdb182c4b12..9f2bd7f78232 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -54,8 +54,10 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; public class SameIntervalMergeTaskTest @@ -200,6 +202,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return null; + } + }, null, null, null, null, null, null, null, null, null, new SegmentLoader() { @Override diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index e1592dcb3cbf..91bd6bf84570 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -75,7 +75,7 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); final File baseStorageDir = config.getStorageDirectory(); final File outDir = new File(baseStorageDir, storageDir); diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4995c3230274..19a97f7816a2 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -91,7 +91,7 @@ public boolean isSegmentLoaded(final DataSegment segment) public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); if (localStorageDir.exists()) { return location; } @@ -124,7 +124,7 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + String storageDir = DataSegmentPusher.getDefaultStorageDir(segment); if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); @@ -233,11 +233,11 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. - File cacheFile = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); cleanupCacheFiles(location.getPath(), cacheFile); location.removeSegment(segment); } diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index a91e6538d68c..447bfddfe622 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -88,14 +88,14 @@ public void testPush() throws IOException Assert.assertEquals(dataSegment2, returnSegment2); Assert.assertNotEquals( - DataSegmentPusherUtil.getStorageDir(dataSegment), - DataSegmentPusherUtil.getStorageDir(dataSegment2) + localDataSegmentPusher.getStorageDir(dataSegment), + localDataSegmentPusher.getStorageDir(dataSegment2) ); for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) { File outDir = new File( config.getStorageDirectory(), - DataSegmentPusherUtil.getStorageDir(returnSegment) + localDataSegmentPusher.getStorageDir(returnSegment) ); File versionFile = new File(outDir, "index.zip"); File descriptorJson = new File(outDir, "descriptor.json"); From 289c7279c140e473e058cac174f578562f947ca6 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Sat, 25 Mar 2017 12:22:22 -0700 Subject: [PATCH 05/15] fix uts --- .../io/druid/storage/s3/S3DataSegmentPusherConfigTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java index 44859c8a1ff3..8398909e8371 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java @@ -39,7 +39,7 @@ public class S3DataSegmentPusherConfigTest public void testSerialization() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - +"\"disableAcl\":false,\"maxListingLength\":2000}"; + +"\"disableAcl\":false,\"maxListingLength\":2000,\"useS3aSchema\":false}"; S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Assert.assertEquals(jsonConfig, jsonMapper.writeValueAsString(config)); @@ -50,7 +50,7 @@ public void testSerializationWithDefaults() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - +"\"disableAcl\":false,\"maxListingLength\":1000}"; + +"\"disableAcl\":false,\"maxListingLength\":1000,\"useS3aSchema\":false}"; S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Assert.assertEquals(expectedJsonConfig, jsonMapper.writeValueAsString(config)); From e9144c491ff5b45d3cc231a6edb56293f74bce69 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Fri, 31 Mar 2017 12:11:05 -0700 Subject: [PATCH 06/15] fix comment part1 --- .../java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java | 3 --- .../main/java/io/druid/storage/s3/S3DataSegmentPusher.java | 5 +++++ .../src/main/java/io/druid/indexer/JobHelper.java | 7 ++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 337b6276bf45..45030ed63a28 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -20,7 +20,6 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; @@ -202,8 +201,6 @@ public Map makeLoadSpec(URI finalIndexZipFilePath) return ImmutableMap.of("type", "hdfs", "path", finalIndexZipFilePath.toString()); } - private static final Joiner JOINER = Joiner.on("/").skipNulls(); - /** * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in * path names. So we format paths differently for HDFS. diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 4d1caf3203f9..8f9e99256df3 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -153,6 +153,11 @@ public Map makeLoadSpec(URI finalIndexZipFilePath) return makeLoadSpec(finalIndexZipFilePath.getHost(), finalIndexZipFilePath.getPath().substring(1)); } + /** + * Any change in loadSpec need to be reflected {@link io.druid.indexer.JobHelper#getURIFromSegment()} + * + */ + @SuppressWarnings("JavadocReference") private Map makeLoadSpec(String bucket, String key) { return ImmutableMap.of( diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index d9a99db3d7d5..d3214fa006ce 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -714,7 +714,12 @@ public static URI getURIFromSegment(DataSegment dataSegment) final String type = loadSpec.get("type").toString(); final URI segmentLocURI; if ("s3_zip".equals(type)) { - segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + if ("s3a".equals(loadSpec.get("S3Schema"))) { + segmentLocURI = URI.create(String.format("s3a://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + + } else { + segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + } } else if ("hdfs".equals(type)) { segmentLocURI = URI.create(loadSpec.get("path").toString()); } else if ("google".equals(type)) { From b4821cfa228ca59f377f85d931729cd3819ef6bb Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Fri, 31 Mar 2017 15:39:55 -0700 Subject: [PATCH 07/15] move to hadoop 2.8 --- .../main/java/io/druid/indexing/common/config/TaskConfig.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 6a9370324d0a..6fa94eca8bd8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.7.3" + "org.apache.hadoop:hadoop-client:2.8.0" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/pom.xml b/pom.xml index 90738445e68b..3cbe40361b4b 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.7.3 + 2.8.0 2.0.0 1.6.6 From 5218238e35804102f5fe7632fc012329bac9215f Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 18 May 2017 12:13:45 -0700 Subject: [PATCH 08/15] inject deep storage properties --- .../main/java/io/druid/indexer/IndexGeneratorJob.java | 2 ++ .../src/main/java/io/druid/indexer/JobHelper.java | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index f3bb6b63bddd..e40c49ab1669 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -166,6 +166,8 @@ public boolean run() JobHelper.injectSystemProperties(job); config.addJobProperties(job); + // inject druid properties like deep storage bindings + JobHelper.injectDruidProprties(job.getConfiguration()); job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index d3214fa006ce..275db5ad132c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -44,6 +44,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.UserGroupInformation; @@ -308,6 +309,16 @@ public static void injectSystemProperties(Job job) { injectSystemProperties(job.getConfiguration()); } + public static void injectDruidProprties(Configuration configuration) { + String mapJavaOpst = configuration.get(MRJobConfig.MAP_JAVA_OPTS); + String reduceJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS); + for (String propName: System.getProperties().stringPropertyNames()){ + if (propName.startsWith("druid.storage.")) { + mapJavaOpst = String.format("%s -D%s=%s", mapJavaOpst, propName, System.getProperty(propName)); + reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName)); + } + } + } public static Configuration injectSystemProperties(Configuration conf) { From 80f507e8071f7d318a7977ef6a077f896b4942b8 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 18 May 2017 12:48:17 -0700 Subject: [PATCH 09/15] set version to 2.7.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3cbe40361b4b..90738445e68b 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.8.0 + 2.7.3 2.0.0 1.6.6 From a46580f88d4765d47b0996485695a053ec2a64d2 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Mon, 22 May 2017 10:26:53 -0700 Subject: [PATCH 10/15] fix build issue about static class --- .../java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index c476343f7904..661499d5e355 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -326,7 +326,7 @@ public TestObjectMapper() registerModule(new GranularityModule()); } - public class TestModule extends SimpleModule + public static class TestModule extends SimpleModule { TestModule() { From df1e0ea35bead34a5e2883a11f4805be9558695b Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Tue, 30 May 2017 10:37:45 -0700 Subject: [PATCH 11/15] fix comments --- .../input/orc/OrcIndexGeneratorJobTest.java | 3 ++- .../indexer/HadoopDruidIndexerConfig.java | 7 +++++ .../io/druid/indexer/HadoopTuningConfig.java | 26 +++++++++++++++---- .../io/druid/indexer/IndexGeneratorJob.java | 2 +- .../main/java/io/druid/indexer/JobHelper.java | 22 +++++++++++----- .../indexer/BatchDeltaIngestionTest.java | 3 ++- .../DetermineHashedPartitionsJobTest.java | 3 ++- .../indexer/DeterminePartitionsJobTest.java | 3 ++- .../indexer/HadoopDruidIndexerConfigTest.java | 6 +++-- .../druid/indexer/HadoopTuningConfigTest.java | 3 ++- .../druid/indexer/IndexGeneratorJobTest.java | 3 ++- .../java/io/druid/indexer/JobHelperTest.java | 3 ++- .../indexer/path/GranularityPathSpecTest.java | 3 ++- .../updater/HadoopConverterJobTest.java | 3 ++- 14 files changed, 67 insertions(+), 23 deletions(-) diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index f56479cef5b6..949247d9a294 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -216,7 +216,8 @@ public void setUp() throws Exception true, null, false, - false + false, + null ) ) ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 6a082998e4ee..29a01638a367 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -222,6 +222,7 @@ public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) private final Map shardSpecLookups = Maps.newHashMap(); private final Map> hadoopShardSpecLookup = Maps.newHashMap(); private final Granularity rollupGran; + private final List allowedHadoopPrefix; @JsonCreator public HadoopDruidIndexerConfig( @@ -258,6 +259,7 @@ public ShardSpec apply(HadoopyShardSpec input) } this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity(); + this.allowedHadoopPrefix = spec.getTuningConfig().getAllowedHadoopPrefix(); } @JsonProperty(value = "spec") @@ -596,4 +598,9 @@ public void verify() Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath"); Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version"); } + + public List getAllowedHadoopPrefix() + { + return allowedHadoopPrefix; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 718dc6565c59..e64c0e7884df 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; @@ -66,7 +67,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_BUILD_V9_DIRECTLY, DEFAULT_NUM_BACKGROUND_PERSIST_THREADS, false, - false + false, + null ); } @@ -87,6 +89,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final int numBackgroundPersistThreads; private final boolean forceExtendableShardSpecs; private final boolean useExplicitVersion; + private final List allowedHadoopPrefix; @JsonCreator public HadoopTuningConfig( @@ -108,7 +111,8 @@ public HadoopTuningConfig( final @JsonProperty("buildV9Directly") Boolean buildV9Directly, final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads, final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs, - final @JsonProperty("useExplicitVersion") boolean useExplicitVersion + final @JsonProperty("useExplicitVersion") boolean useExplicitVersion, + final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix ) { this.workingPath = workingPath; @@ -135,6 +139,9 @@ public HadoopTuningConfig( this.forceExtendableShardSpecs = forceExtendableShardSpecs; Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0"); this.useExplicitVersion = useExplicitVersion; + this.allowedHadoopPrefix = allowedHadoopPrefix == null + ? ImmutableList.of("druid.storage.", "druid.javascript.") + : allowedHadoopPrefix; } @JsonProperty @@ -259,7 +266,8 @@ public HadoopTuningConfig withWorkingPath(String path) buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } @@ -283,7 +291,8 @@ public HadoopTuningConfig withVersion(String ver) buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } @@ -307,7 +316,14 @@ public HadoopTuningConfig withShardSpecs(Map> specs buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } + + @JsonProperty + public List getAllowedHadoopPrefix() + { + return allowedHadoopPrefix; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index e40c49ab1669..5edb1f69b6e6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -167,7 +167,7 @@ public boolean run() JobHelper.injectSystemProperties(job); config.addJobProperties(job); // inject druid properties like deep storage bindings - JobHelper.injectDruidProprties(job.getConfiguration()); + JobHelper.injectDruidProperties(job.getConfiguration(), config.getAllowedHadoopPrefix()); job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 275db5ad132c..998cce63edf8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -309,14 +309,24 @@ public static void injectSystemProperties(Job job) { injectSystemProperties(job.getConfiguration()); } - public static void injectDruidProprties(Configuration configuration) { - String mapJavaOpst = configuration.get(MRJobConfig.MAP_JAVA_OPTS); - String reduceJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS); + public static void injectDruidProperties(Configuration configuration, List listOfAllowedPrefix) { + String mapJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS); + String reduceJavaOpts = configuration.get(MRJobConfig.REDUCE_JAVA_OPTS); + for (String propName: System.getProperties().stringPropertyNames()){ - if (propName.startsWith("druid.storage.")) { - mapJavaOpst = String.format("%s -D%s=%s", mapJavaOpst, propName, System.getProperty(propName)); - reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName)); + for (String prefix : listOfAllowedPrefix) { + if (propName.startsWith(prefix)) { + mapJavaOpts = String.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName)); + reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName)); + } } + + } + if (!Strings.isNullOrEmpty(mapJavaOpts)) { + configuration.set(MRJobConfig.MAP_JAVA_OPTS, mapJavaOpts); + } + if (!Strings.isNullOrEmpty(reduceJavaOpts)) { + configuration.set(MRJobConfig.REDUCE_JAVA_OPTS, reduceJavaOpts); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 2746dd9c004e..954910470246 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -386,7 +386,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map Date: Wed, 31 May 2017 13:11:44 -0700 Subject: [PATCH 12/15] fix default hadoop default coordinate --- .../conf-quickstart/druid/middleManager/runtime.properties | 2 +- examples/conf/druid/middleManager/runtime.properties | 2 +- .../src/main/java/io/druid/indexer/JobHelper.java | 7 +++++-- .../java/io/druid/indexing/common/config/TaskConfig.java | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/conf-quickstart/druid/middleManager/runtime.properties b/examples/conf-quickstart/druid/middleManager/runtime.properties index 422b94387ffc..dc1f6aceddb3 100644 --- a/examples/conf-quickstart/druid/middleManager/runtime.properties +++ b/examples/conf-quickstart/druid/middleManager/runtime.properties @@ -17,4 +17,4 @@ druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp -druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"] +druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"] diff --git a/examples/conf/druid/middleManager/runtime.properties b/examples/conf/druid/middleManager/runtime.properties index 84945838d6ef..ce857a0d09d8 100644 --- a/examples/conf/druid/middleManager/runtime.properties +++ b/examples/conf/druid/middleManager/runtime.properties @@ -17,4 +17,4 @@ druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp -druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"] +druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"] diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 998cce63edf8..34b19245138f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -309,15 +309,18 @@ public static void injectSystemProperties(Job job) { injectSystemProperties(job.getConfiguration()); } - public static void injectDruidProperties(Configuration configuration, List listOfAllowedPrefix) { + + public static void injectDruidProperties(Configuration configuration, List listOfAllowedPrefix) + { String mapJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS); String reduceJavaOpts = configuration.get(MRJobConfig.REDUCE_JAVA_OPTS); - for (String propName: System.getProperties().stringPropertyNames()){ + for (String propName : System.getProperties().stringPropertyNames()) { for (String prefix : listOfAllowedPrefix) { if (propName.startsWith(prefix)) { mapJavaOpts = String.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName)); reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName)); + break; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 6fa94eca8bd8..6a9370324d0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.8.0" + "org.apache.hadoop:hadoop-client:2.7.3" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); From 70f9f54638326eac14b026cc24a18ba4d5a62090 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 1 Jun 2017 10:23:28 -0700 Subject: [PATCH 13/15] fix create filesytem --- .../java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java | 7 +++++-- .../io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 45030ed63a28..25dcf4fb4f9b 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -64,8 +64,11 @@ public HdfsDataSegmentPusher( this.config = config; this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; - this.fullyQualifiedStorageDirectory = FileSystem.newInstance(hadoopConfig).makeQualified(new Path(config.getStorageDirectory())) - .toUri().toString(); + Path storageDir = new Path(config.getStorageDirectory()); + this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) + .makeQualified(storageDir) + .toUri() + .toString(); log.info("Configured HDFS as deep storage"); } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 661499d5e355..8191e60df2fc 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -102,8 +102,8 @@ public void testPushWithScheme() throws Exception @Test public void testPushWithBadScheme() throws Exception { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Wrong FS"); + expectedException.expect(IOException.class); + expectedException.expectMessage("No FileSystem for scheme"); testUsingScheme("xyzzy"); // Not reached From f62d617fefc75db1b118406cfc866c343339018a Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 1 Jun 2017 12:38:37 -0700 Subject: [PATCH 14/15] downgrade aws sdk --- pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 90738445e68b..cf7e346f8782 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,9 @@ com.amazonaws aws-java-sdk - 1.10.21 + + 1.7.4 javax.mail From 55204638dedd2b4db7571fa1b71ca902b2ec6993 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Fri, 2 Jun 2017 12:29:33 -0700 Subject: [PATCH 15/15] bump the version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cf7e346f8782..47f2865bcac0 100644 --- a/pom.xml +++ b/pom.xml @@ -182,7 +182,7 @@ aws-java-sdk - 1.7.4 + 1.10.56 javax.mail