diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index f77aa198c8a6..9638ed48b15c 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -19,15 +19,45 @@ package io.druid.segment.loading; +import com.google.common.base.Joiner; import io.druid.timeline.DataSegment; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; public interface DataSegmentPusher { + Joiner JOINER = Joiner.on("/").skipNulls(); + @Deprecated String getPathForHadoop(String dataSource); String getPathForHadoop(); DataSegment push(File file, DataSegment segment) throws IOException; + //use map instead of LoadSpec class to avoid dependency pollution. + Map makeLoadSpec(URI finalIndexZipFilePath); + default String getStorageDir(DataSegment dataSegment) { + return getDefaultStorageDir(dataSegment); + } + default String makeIndexPathName(DataSegment dataSegment, String indexName) { + return String.format("./%s/%s", getStorageDir(dataSegment),indexName); + } + + // Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/ + // If above format is ever changed, make sure to change it appropriately in other places + // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories + // on segment deletion if segment being deleted was the only segment + static String getDefaultStorageDir(DataSegment segment) { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart(), + segment.getInterval().getEnd() + ), + segment.getVersion(), + segment.getShardSpec().getPartitionNum() + ); + } } diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java deleted file mode 100644 index 7daa125088d9..000000000000 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusherUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.base.Joiner; -import io.druid.timeline.DataSegment; -import org.joda.time.format.ISODateTimeFormat; - -/** - */ -public class DataSegmentPusherUtil -{ - private static final Joiner JOINER = Joiner.on("/").skipNulls(); - - // Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/ - // If above format is ever changed, make sure to change it appropriately in other places - // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories - // on segment deletion if segment being deleted was the only segment - public static String getStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart(), - segment.getInterval().getEnd() - ), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - } - - /** - * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in - * path names. So we format paths differently for HDFS. - */ - public static String getHdfsStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), - segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) - ), - segment.getVersion().replaceAll(":", "_") - ); - } -} diff --git a/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java b/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java deleted file mode 100644 index d2f1c6eab400..000000000000 --- a/api/src/test/java/io/druid/segment/loading/DataSegmentPusherUtilTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.collect.ImmutableMap; -import io.druid.timeline.DataSegment; -import io.druid.timeline.partition.NoneShardSpec; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; - -public class DataSegmentPusherUtilTest -{ - @Test - public void shouldNotHaveColonsInHdfsStorageDir() throws Exception - { - - Interval interval = new Interval("2011-10-01/2011-10-02"); - ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); - - DataSegment segment = new DataSegment( - "something", - interval, - "brand:new:version", - loadSpec, - Arrays.asList("dim1", "dim2"), - Arrays.asList("met1", "met2"), - NoneShardSpec.instance(), - null, - 1 - ); - - String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); - Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir); - - } -} diff --git a/examples/conf-quickstart/druid/middleManager/runtime.properties b/examples/conf-quickstart/druid/middleManager/runtime.properties index 422b94387ffc..dc1f6aceddb3 100644 --- a/examples/conf-quickstart/druid/middleManager/runtime.properties +++ b/examples/conf-quickstart/druid/middleManager/runtime.properties @@ -17,4 +17,4 @@ druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp -druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"] +druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"] diff --git a/examples/conf/druid/middleManager/runtime.properties b/examples/conf/druid/middleManager/runtime.properties index 84945838d6ef..ce857a0d09d8 100644 --- a/examples/conf/druid/middleManager/runtime.properties +++ b/examples/conf/druid/middleManager/runtime.properties @@ -17,4 +17,4 @@ druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp -druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"] +druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"] diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 5e42b951779d..0b9f7bea9afa 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -29,12 +29,12 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.Callable; @@ -85,7 +85,7 @@ public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final Dat public Map getAzurePaths(final DataSegment segment) { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); return ImmutableMap.of( "index", String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), @@ -109,16 +109,7 @@ public DataSegment uploadDataSegment( final DataSegment outSegment = segment .withSize(size) - .withLoadSpec( - ImmutableMap.of( - "type", - AzureStorageDruidModule.SCHEME, - "containerName", - config.getContainer(), - "blobPath", - azurePaths.get("index") - ) - ) + .withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index")))) .withBinaryVersion(version); log.info("Deleting file [%s]", compressedSegmentData); @@ -174,4 +165,17 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + AzureStorageDruidModule.SCHEME, + "containerName", + config.getContainer(), + "blobPath", + uri.toString() + ); + } } diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index 6cb9680ecbb2..e5913c50f5e1 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -27,7 +27,6 @@ import com.microsoft.azure.storage.StorageException; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.MapUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMockSupport; @@ -112,9 +111,9 @@ public void testPush() throws Exception @Test public void getAzurePathsTest() { - final String storageDir = DataSegmentPusherUtil.getStorageDir(dataSegment); - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); + final String storageDir = pusher.getStorageDir(dataSegment); Map paths = pusher.getAzurePaths(dataSegment); assertEquals(String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index")); diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 973de4297555..b26253e03ed1 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -30,12 +30,13 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; /** * Cassandra Segment Pusher @@ -77,7 +78,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO log.info("Writing [%s] to C*", indexFilesDir); String key = JOINER.join( config.getKeyspace().isEmpty() ? null : config.getKeyspace(), - DataSegmentPusherUtil.getStorageDir(segment) + this.getStorageDir(segment) ); // Create index @@ -114,4 +115,10 @@ ImmutableMap. of("type", "c*", "key", key) compressedIndexFile.delete(); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException("not supported"); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index fbef1bfe01f5..a17a6872b586 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -34,6 +34,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class CloudFilesDataSegmentPusher implements DataSegmentPusher @@ -75,7 +77,7 @@ public String getPathForHadoop(final String dataSource) @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { - final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment); + final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment)); File descriptorFile = null; File zipOutFile = null; @@ -112,18 +114,7 @@ public DataSegment call() throws Exception final DataSegment outSegment = inSegment .withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", - CloudFilesStorageDruidModule.SCHEME, - "region", - segmentData.getRegion(), - "container", - segmentData.getContainer(), - "path", - segmentData.getPath() - ) - ) + .withLoadSpec(makeLoadSpec(new URI(segmentData.getPath()))) .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); return outSegment; @@ -146,4 +137,19 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + CloudFilesStorageDruidModule.SCHEME, + "region", + objectApi.getRegion(), + "container", + objectApi.getContainer(), + "path", + uri.toString() + ); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java index 31b4cafdf383..e409964de399 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java @@ -22,8 +22,6 @@ import com.google.common.base.Predicate; import io.druid.java.util.common.RetryUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; -import io.druid.timeline.DataSegment; import java.io.IOException; import java.util.concurrent.Callable; @@ -70,9 +68,4 @@ public static String buildCloudFilesPath(String basePath, final String fileName) return path; } - public static String buildCloudFilesPath(String basePath, final DataSegment segment) - { - return buildCloudFilesPath(basePath, DataSegmentPusherUtil.getStorageDir(segment)); - } - } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index 5570cdd9d9ef..d94f233c81f1 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -30,13 +30,14 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; public class GoogleDataSegmentPusher implements DataSegmentPusher { @@ -84,7 +85,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen return descriptorFile; } - public void insert(final File file, final String contentType, final String path) throws IOException { + public void insert(final File file, final String contentType, final String path) throws IOException + { LOG.info("Inserting [%s] to [%s]", file, path); FileInputStream fileSteam = new FileInputStream(file); @@ -107,19 +109,13 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr try { indexFile = File.createTempFile("index", ".zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile); - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); final String indexPath = buildPath(storageDir + "/" + "index.zip"); final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json"); final DataSegment outSegment = segment .withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", GoogleStorageDruidModule.SCHEME, - "bucket", config.getBucket(), - "path", indexPath - ) - ) + .withLoadSpec(makeLoadSpec(config.getBucket(), indexPath)) .withBinaryVersion(version); descriptorFile = createDescriptorFile(jsonMapper, outSegment); @@ -131,7 +127,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr } catch (Exception e) { throw Throwables.propagate(e); - } finally { + } + finally { if (indexFile != null) { LOG.info("Deleting file [%s]", indexFile); indexFile.delete(); @@ -153,4 +150,20 @@ String buildPath(final String path) return path; } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + // remove the leading "/" + return makeLoadSpec(config.getBucket(),finalIndexZipFilePath.getPath().substring(1)); + } + + private Map makeLoadSpec(String bucket, String path) { + return ImmutableMap.of( + "type", GoogleStorageDruidModule.SCHEME, + "bucket", bucket, + "path", path + ); + } + } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java index d85d57eb1017..83d35601ff18 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Maps; import com.google.common.io.Files; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -106,7 +105,7 @@ public void testPush() throws Exception jsonMapper ).addMockedMethod("insert", File.class, String.class, String.class).createMock(); - final String storageDir = DataSegmentPusherUtil.getStorageDir(segmentToPush); + final String storageDir = pusher.getStorageDir(segmentToPush); final String indexPath = prefix + "/" + storageDir + "/" + "index.zip"; final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json"; diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index f56479cef5b6..949247d9a294 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -216,7 +216,8 @@ public void setUp() throws Exception true, null, false, - false + false, + null ) ) ); diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index c7d8429e2764..8dd014629b99 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -140,12 +140,17 @@ emitter provided + + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + commons-io commons-io provided - + junit @@ -178,6 +183,12 @@ ${hadoop.compile.version} test - + + io.druid + druid-indexing-hadoop + ${project.parent.version} + test + + diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 6b80e3619c25..25dcf4fb4f9b 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -29,17 +29,19 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HadoopFsWrapper; import org.apache.hadoop.fs.Path; +import org.joda.time.format.ISODateTimeFormat; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; +import java.util.Map; /** */ @@ -62,8 +64,11 @@ public HdfsDataSegmentPusher( this.config = config; this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; - this.fullyQualifiedStorageDirectory = FileSystem.newInstance(hadoopConfig).makeQualified(new Path(config.getStorageDirectory())) - .toUri().toString(); + Path storageDir = new Path(config.getStorageDirectory()); + this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) + .makeQualified(storageDir) + .toUri() + .toString(); log.info("Configured HDFS as deep storage"); } @@ -84,7 +89,7 @@ public String getPathForHadoop() @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); + final String storageDir = this.getStorageDir(segment); log.info( "Copying segment[%s] to HDFS at location[%s/%s]", @@ -115,7 +120,6 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException storageDir, segment.getShardSpec().getPartitionNum() )); - final Path outDescriptorFile = new Path(String.format( "%s/%s/%d_descriptor.json", fullyQualifiedStorageDirectory, @@ -123,7 +127,7 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException segment.getShardSpec().getPartitionNum() )); - dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile)) + dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)); @@ -176,11 +180,6 @@ private void copyFilesWithChecks(final FileSystem fs, final Path from, final Pat } } - private ImmutableMap makeLoadSpec(Path outFile) - { - return ImmutableMap.of("type", "hdfs", "path", outFile.toUri().toString()); - } - private static class HdfsOutputStreamSupplier extends ByteSink { private final FileSystem fs; @@ -198,4 +197,40 @@ public OutputStream openStream() throws IOException return fs.create(descriptorFile); } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "hdfs", "path", finalIndexZipFilePath.toString()); + } + + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + + @Override + public String getStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_") + ); + } + + @Override + public String makeIndexPathName(DataSegment dataSegment, String indexName) + { + return String.format( + "./%s/%d_%s", + this.getStorageDir(dataSegment), + dataSegment.getShardSpec().getPartitionNum(), + indexName + ); + } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index f341cd4d65dd..8191e60df2fc 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -30,20 +31,33 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import io.druid.indexer.Bucket; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.JobHelper; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.jackson.GranularityModule; +import io.druid.segment.loading.LocalDataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusherConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -51,18 +65,33 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; /** */ public class HdfsDataSegmentPusherTest { + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @Rule public final ExpectedException expectedException = ExpectedException.none(); - TestObjectMapper objectMapper = new TestObjectMapper(); + static TestObjectMapper objectMapper = new TestObjectMapper(); + + private HdfsDataSegmentPusher hdfsDataSegmentPusher; + @Before + public void setUp() throws IOException + { + HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new HdfsDataSegmentPusherConfig(); + hdfsDataSegmentPusherConf.setStorageDirectory("path/to/"); + hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), objectMapper); + } + static { + objectMapper = new TestObjectMapper(); + objectMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, objectMapper)); + } @Test public void testPushWithScheme() throws Exception @@ -73,8 +102,8 @@ public void testPushWithScheme() throws Exception @Test public void testPushWithBadScheme() throws Exception { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Wrong FS"); + expectedException.expect(IOException.class); + expectedException.expectMessage("No FileSystem for scheme"); testUsingScheme("xyzzy"); // Not reached @@ -133,7 +162,7 @@ private void testUsingScheme(final String scheme) throws Exception String indexUri = String.format( "%s/%s/%d_index.zip", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush), + pusher.getStorageDir(segmentToPush), segmentToPush.getShardSpec().getPartitionNum() ); @@ -146,7 +175,7 @@ private void testUsingScheme(final String scheme) throws Exception indexUri ), segment.getLoadSpec()); // rename directory after push - final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment); + final String segmentPath = pusher.getStorageDir(segment); File indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -217,7 +246,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n String indexUri = String.format( "%s/%s/%d_index.zip", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - DataSegmentPusherUtil.getHdfsStorageDir(segments[i]), + pusher.getStorageDir(segments[i]), segments[i].getShardSpec().getPartitionNum() ); @@ -230,7 +259,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), pushedSegment.getLoadSpec()); // rename directory after push - String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment); + String segmentPath = pusher.getStorageDir(pushedSegment); File indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -259,7 +288,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), fromDescriptorFileDataSegment.getLoadSpec()); // rename directory after push - segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment); + segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment); indexFile = new File(String.format( "%s/%s/%d_index.zip", @@ -282,7 +311,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n } } - public class TestObjectMapper extends ObjectMapper + public static class TestObjectMapper extends ObjectMapper { public TestObjectMapper() { @@ -292,10 +321,12 @@ public TestObjectMapper() configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(SerializationFeature.INDENT_OUTPUT, false); + configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))); + registerModule(new GranularityModule()); } - public class TestModule extends SimpleModule + public static class TestModule extends SimpleModule { TestModule() { @@ -317,4 +348,250 @@ public Interval deserialize( } } + @Test + public void shouldNotHaveColonsInHdfsStorageDir() throws Exception + { + + Interval interval = new Interval("2011-10-01/2011-10-02"); + ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + DataSegment segment = new DataSegment( + "something", + interval, + "brand:new:version", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + NoneShardSpec.instance(), + null, + 1 + ); + + String storageDir = hdfsDataSegmentPusher.getStorageDir(segment); + Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir); + + } + + + @Test + public void shouldMakeHDFSCompliantSegmentOutputPath() + { + HadoopIngestionSpec schema; + + try { + schema = objectMapper.readValue( + "{\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"source\",\n" + + " \"metricsSpec\": [],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"hour\",\n" + + " \"intervals\": [\"2012-07-10/P1D\"]\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"type\": \"hadoop\",\n" + + " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n" + + " }\n" + + "}", + HadoopIngestionSpec.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + //DataSchema dataSchema = new DataSchema("dataSource", null, null, Gra) + //schema = new HadoopIngestionSpec(dataSchema, ioConfig, HadoopTuningConfig.makeDefaultTuningConfig()); + HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( + schema.withTuningConfig( + schema.getTuningConfig() + .withVersion( + "some:brand:new:version" + ) + ) + ); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.INDEX_ZIP, + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_index.zip", + path.toString() + ); + + path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.DESCRIPTOR_JSON, + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_descriptor.json", + path.toString() + ); + + path = JobHelper.makeTmpPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0), + hdfsDataSegmentPusher + ); + Assert.assertEquals( + "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" + + "/4712_index.zip.0", + path.toString() + ); + + } + + @Test + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() + { + final HadoopIngestionSpec schema; + + try { + schema = objectMapper.readValue( + "{\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"the:data:source\",\n" + + " \"metricsSpec\": [],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"hour\",\n" + + " \"intervals\": [\"2012-07-10/P1D\"]\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"type\": \"hadoop\",\n" + + " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n" + + " }\n" + + "}", + HadoopIngestionSpec.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( + schema.withTuningConfig( + schema.getTuningConfig() + .withVersion( + "some:brand:new:version" + ) + ) + ); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.INDEX_ZIP, + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/index.zip", + path.toString() + ); + + path = JobHelper.makeFileNamePath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + JobHelper.DESCRIPTOR_JSON, + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/descriptor.json", + path.toString() + ); + + path = JobHelper.makeTmpPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ), + new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0), + new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper) + ); + Assert.assertEquals( + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" + + "version/4712/index.zip.0", + path.toString() + ); + + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 352d6cca15ac..23b331f63251 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import org.jets3t.service.ServiceException; @@ -69,7 +70,7 @@ public DataSegment move(DataSegment segment, Map targetLoadSpec) final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); - final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment); + final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, DataSegmentPusher.getDefaultStorageDir(segment)); String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); if (targetS3Bucket.isEmpty()) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index a035ff20c22f..8f9e99256df3 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -38,6 +38,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher @@ -65,6 +67,9 @@ public S3DataSegmentPusher( @Override public String getPathForHadoop() { + if (config.isUseS3aSchema()) { + return String.format("s3a://%s/%s", config.getBucket(), config.getBaseKey()); + } return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey()); } @@ -78,7 +83,7 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { - final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment)); log.info("Copying segment[%s] to S3 at location[%s]", inSegment.getIdentifier(), s3Path); @@ -107,16 +112,7 @@ public DataSegment call() throws Exception s3Client.putObject(outputBucket, toPush); final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec( - ImmutableMap.of( - "type", - "s3_zip", - "bucket", - outputBucket, - "key", - toPush.getKey() - ) - ) + .withLoadSpec(makeLoadSpec(outputBucket, toPush.getKey())) .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); File descriptorFile = File.createTempFile("druid", "descriptor.json"); @@ -149,4 +145,30 @@ public DataSegment call() throws Exception throw Throwables.propagate(e); } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + // remove the leading "/" + return makeLoadSpec(finalIndexZipFilePath.getHost(), finalIndexZipFilePath.getPath().substring(1)); + } + + /** + * Any change in loadSpec need to be reflected {@link io.druid.indexer.JobHelper#getURIFromSegment()} + * + */ + @SuppressWarnings("JavadocReference") + private Map makeLoadSpec(String bucket, String key) + { + return ImmutableMap.of( + "type", + "s3_zip", + "bucket", + bucket, + "key", + key, + "S3Schema", + config.isUseS3aSchema() ? "s3a" : "s3n" + ); + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java index 7937ea339d17..28c5327da6cf 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusherConfig.java @@ -39,6 +39,9 @@ public class S3DataSegmentPusherConfig @JsonProperty @Min(0) private int maxListingLength = 1000; + // use s3n by default for backward compatibility + @JsonProperty + private boolean useS3aSchema = false; public void setBucket(String bucket) { @@ -60,6 +63,16 @@ public void setMaxListingLength(int maxListingLength) this.maxListingLength = maxListingLength; } + public boolean isUseS3aSchema() + { + return useS3aSchema; + } + + public void setUseS3aSchema(boolean useS3aSchema) + { + this.useS3aSchema = useS3aSchema; + } + public String getBucket() { return bucket; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 0fa86781d548..5d97e79ab25d 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -24,8 +24,6 @@ import com.google.common.base.Throwables; import io.druid.java.util.common.RetryUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; -import io.druid.timeline.DataSegment; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -187,11 +185,11 @@ public void remove() }; } - public static String constructSegmentPath(String baseKey, DataSegment segment) + public static String constructSegmentPath(String baseKey, String storageDir) { return JOINER.join( baseKey.isEmpty() ? null : baseKey, - DataSegmentPusherUtil.getStorageDir(segment) + storageDir ) + "/index.zip"; } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java index 44859c8a1ff3..8398909e8371 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherConfigTest.java @@ -39,7 +39,7 @@ public class S3DataSegmentPusherConfigTest public void testSerialization() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - +"\"disableAcl\":false,\"maxListingLength\":2000}"; + +"\"disableAcl\":false,\"maxListingLength\":2000,\"useS3aSchema\":false}"; S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Assert.assertEquals(jsonConfig, jsonMapper.writeValueAsString(config)); @@ -50,7 +50,7 @@ public void testSerializationWithDefaults() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - +"\"disableAcl\":false,\"maxListingLength\":1000}"; + +"\"disableAcl\":false,\"maxListingLength\":1000,\"useS3aSchema\":false}"; S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Assert.assertEquals(expectedJsonConfig, jsonMapper.writeValueAsString(config)); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 77938c687021..29a01638a367 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -54,6 +54,7 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; @@ -92,9 +93,11 @@ public class HadoopDruidIndexerConfig public static final IndexMerger INDEX_MERGER; public static final IndexMergerV9 INDEX_MERGER_V9; public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG; - + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; + + static { injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -118,6 +121,7 @@ public void configure(Binder binder) INDEX_MERGER = injector.getInstance(IndexMerger.class); INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class); HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } public static enum IndexJobCounters @@ -218,6 +222,7 @@ public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) private final Map shardSpecLookups = Maps.newHashMap(); private final Map> hadoopShardSpecLookup = Maps.newHashMap(); private final Granularity rollupGran; + private final List allowedHadoopPrefix; @JsonCreator public HadoopDruidIndexerConfig( @@ -254,6 +259,7 @@ public ShardSpec apply(HadoopyShardSpec input) } this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity(); + this.allowedHadoopPrefix = spec.getTuningConfig().getAllowedHadoopPrefix(); } @JsonProperty(value = "spec") @@ -592,4 +598,9 @@ public void verify() Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath"); Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version"); } + + public List getAllowedHadoopPrefix() + { + return allowedHadoopPrefix; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 718dc6565c59..e64c0e7884df 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; @@ -66,7 +67,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_BUILD_V9_DIRECTLY, DEFAULT_NUM_BACKGROUND_PERSIST_THREADS, false, - false + false, + null ); } @@ -87,6 +89,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final int numBackgroundPersistThreads; private final boolean forceExtendableShardSpecs; private final boolean useExplicitVersion; + private final List allowedHadoopPrefix; @JsonCreator public HadoopTuningConfig( @@ -108,7 +111,8 @@ public HadoopTuningConfig( final @JsonProperty("buildV9Directly") Boolean buildV9Directly, final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads, final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs, - final @JsonProperty("useExplicitVersion") boolean useExplicitVersion + final @JsonProperty("useExplicitVersion") boolean useExplicitVersion, + final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix ) { this.workingPath = workingPath; @@ -135,6 +139,9 @@ public HadoopTuningConfig( this.forceExtendableShardSpecs = forceExtendableShardSpecs; Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0"); this.useExplicitVersion = useExplicitVersion; + this.allowedHadoopPrefix = allowedHadoopPrefix == null + ? ImmutableList.of("druid.storage.", "druid.javascript.") + : allowedHadoopPrefix; } @JsonProperty @@ -259,7 +266,8 @@ public HadoopTuningConfig withWorkingPath(String path) buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } @@ -283,7 +291,8 @@ public HadoopTuningConfig withVersion(String ver) buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } @@ -307,7 +316,14 @@ public HadoopTuningConfig withShardSpecs(Map> specs buildV9Directly, numBackgroundPersistThreads, forceExtendableShardSpecs, - useExplicitVersion + useExplicitVersion, + null ); } + + @JsonProperty + public List getAllowedHadoopPrefix() + { + return allowedHadoopPrefix; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 68a49c744e5e..5edb1f69b6e6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -166,6 +166,8 @@ public boolean run() JobHelper.injectSystemProperties(job); config.addJobProperties(job); + // inject druid properties like deep storage bindings + JobHelper.injectDruidProperties(job.getConfiguration(), config.getAllowedHadoopPrefix()); job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); @@ -741,20 +743,24 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - JobHelper.INDEX_ZIP + JobHelper.INDEX_ZIP, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeFileNamePath( new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - JobHelper.DESCRIPTOR_JSON + JobHelper.DESCRIPTOR_JSON, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeTmpPath( new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, - context.getTaskAttemptID() - ) + context.getTaskAttemptID(), + config.DATA_SEGMENT_PUSHER + ), + config.DATA_SEGMENT_PUSHER ); Path descriptorPath = config.makeDescriptorInfoPath(segment); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 06599dcb2722..34b19245138f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -23,7 +23,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; @@ -35,7 +34,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; -import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -45,6 +44,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.UserGroupInformation; @@ -310,6 +310,29 @@ public static void injectSystemProperties(Job job) injectSystemProperties(job.getConfiguration()); } + public static void injectDruidProperties(Configuration configuration, List listOfAllowedPrefix) + { + String mapJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS); + String reduceJavaOpts = configuration.get(MRJobConfig.REDUCE_JAVA_OPTS); + + for (String propName : System.getProperties().stringPropertyNames()) { + for (String prefix : listOfAllowedPrefix) { + if (propName.startsWith(prefix)) { + mapJavaOpts = String.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName)); + reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName)); + break; + } + } + + } + if (!Strings.isNullOrEmpty(mapJavaOpts)) { + configuration.set(MRJobConfig.MAP_JAVA_OPTS, mapJavaOpts); + } + if (!Strings.isNullOrEmpty(reduceJavaOpts)) { + configuration.set(MRJobConfig.REDUCE_JAVA_OPTS, reduceJavaOpts); + } + } + public static Configuration injectSystemProperties(Configuration conf) { for (String propName : System.getProperties().stringPropertyNames()) { @@ -379,7 +402,8 @@ public static DataSegment serializeOutIndex( final File mergedBase, final Path finalIndexZipFilePath, final Path finalDescriptorPath, - final Path tmpPath + final Path tmpPath, + DataSegmentPusher dataSegmentPusher ) throws IOException { @@ -412,43 +436,8 @@ public long push() throws IOException log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri()); final URI indexOutURI = finalIndexZipFilePath.toUri(); - final ImmutableMap loadSpec; - // TODO: Make this a part of Pushers or Pullers - switch (outputFS.getScheme()) { - case "hdfs": - case "viewfs": - case "maprfs": - loadSpec = ImmutableMap.of( - "type", "hdfs", - "path", indexOutURI.toString() - ); - break; - case "gs": - loadSpec = ImmutableMap.of( - "type", "google", - "bucket", indexOutURI.getHost(), - "path", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "s3": - case "s3n": - loadSpec = ImmutableMap.of( - "type", "s3_zip", - "bucket", indexOutURI.getHost(), - "key", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "file": - loadSpec = ImmutableMap.of( - "type", "local", - "path", indexOutURI.getPath() - ); - break; - default: - throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme()); - } final DataSegment finalSegment = segmentTemplate - .withLoadSpec(loadSpec) + .withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI)) .withSize(size.get()) .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); @@ -575,77 +564,33 @@ private static void createNewZipEntry(ZipOutputStream out, File file) throws IOE out.putNextEntry(new ZipEntry(file.getName())); } - public static boolean isHdfs(FileSystem fs) - { - return "hdfs".equals(fs.getScheme()) || "viewfs".equals(fs.getScheme()) || "maprfs".equals(fs.getScheme()); - } - public static Path makeFileNamePath( final Path basePath, final FileSystem fs, final DataSegment segmentTemplate, - final String baseFileName + final String baseFileName, + DataSegmentPusher dataSegmentPusher ) { - final Path finalIndexZipPath; - final String segmentDir; - if (isHdfs(fs)) { - segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate); - finalIndexZipPath = new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_%s", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - baseFileName - ) - ); - } else { - segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate); - finalIndexZipPath = new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%s", - segmentDir, - baseFileName - ) - ); - } - return finalIndexZipPath; + return new Path(prependFSIfNullScheme(fs, basePath), + dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)); } public static Path makeTmpPath( final Path basePath, final FileSystem fs, final DataSegment segmentTemplate, - final TaskAttemptID taskAttemptID + final TaskAttemptID taskAttemptID, + DataSegmentPusher dataSegmentPusher ) { - final String segmentDir; - - if (isHdfs(fs)) { - segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate); - return new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_index.zip.%d", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - taskAttemptID.getId() - ) - ); - } else { - segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate); - return new Path( - prependFSIfNullScheme(fs, basePath), - String.format( - "./%s/%d_index.zip.%d", - segmentDir, - segmentTemplate.getShardSpec().getPartitionNum(), - taskAttemptID.getId() - ) - ); - } + return new Path( + prependFSIfNullScheme(fs, basePath), + String.format("./%s.%d", + dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), + taskAttemptID.getId() + ) + ); } /** @@ -793,7 +738,12 @@ public static URI getURIFromSegment(DataSegment dataSegment) final String type = loadSpec.get("type").toString(); final URI segmentLocURI; if ("s3_zip".equals(type)) { - segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + if ("s3a".equals(loadSpec.get("S3Schema"))) { + segmentLocURI = URI.create(String.format("s3a://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + + } else { + segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + } } else if ("hdfs".equals(type)) { segmentLocURI = URI.create(loadSpec.get("path").toString()); } else if ("google".equals(type)) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 3675b2a84e57..ab92c7032051 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -556,20 +556,24 @@ protected void map( baseOutputPath, outputFS, finalSegmentTemplate, - JobHelper.INDEX_ZIP + JobHelper.INDEX_ZIP, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeFileNamePath( baseOutputPath, outputFS, finalSegmentTemplate, - JobHelper.DESCRIPTOR_JSON + JobHelper.DESCRIPTOR_JSON, + config.DATA_SEGMENT_PUSHER ), JobHelper.makeTmpPath( baseOutputPath, outputFS, finalSegmentTemplate, - context.getTaskAttemptID() - ) + context.getTaskAttemptID(), + config.DATA_SEGMENT_PUSHER + ), + config.DATA_SEGMENT_PUSHER ); context.progress(); context.setStatus("Finished PUSH"); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java index 57f2b8f8c51c..a3574f16cb25 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java @@ -37,6 +37,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -53,6 +54,7 @@ public class HadoopDruidConverterConfig public static final ObjectMapper jsonMapper; public static final IndexIO INDEX_IO; public static final IndexMerger INDEX_MERGER; + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -75,6 +77,7 @@ public void configure(Binder binder) jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class); INDEX_IO = injector.getInstance(IndexIO.class); INDEX_MERGER = injector.getInstance(IndexMerger.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } private static final TypeReference> mapTypeReference = new TypeReference>() diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 2746dd9c004e..954910470246 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -386,7 +386,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map T jsonReadWriteRead(String s, Class klass) - { - try { - return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Test - public void shouldMakeHDFSCompliantSegmentOutputPath() - { - HadoopIngestionSpec schema; - - try { - schema = jsonReadWriteRead( - "{\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"source\",\n" - + " \"metricsSpec\": [],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"hour\",\n" - + " \"intervals\": [\"2012-07-10/P1D\"]\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"type\": \"hadoop\",\n" - + " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n" - + " }\n" - + "}", - HadoopIngestionSpec.class - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( - schema.withTuningConfig( - schema.getTuningConfig() - .withVersion( - "some:brand:new:version" - ) - ) - ); - - Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.INDEX_ZIP - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_index.zip", - path.toString() - ); - - path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.DESCRIPTOR_JSON - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_descriptor.json", - path.toString() - ); - - path = JobHelper.makeTmpPath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new DistributedFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0) - ); - Assert.assertEquals( - "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version" - + "/4712_index.zip.0", - path.toString() - ); - - } - - @Test - public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() - { - final HadoopIngestionSpec schema; - - try { - schema = jsonReadWriteRead( - "{\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"the:data:source\",\n" - + " \"metricsSpec\": [],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"hour\",\n" - + " \"intervals\": [\"2012-07-10/P1D\"]\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"type\": \"hadoop\",\n" - + " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n" - + " }\n" - + "}", - HadoopIngestionSpec.class - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig( - schema.withTuningConfig( - schema.getTuningConfig() - .withVersion( - "some:brand:new:version" - ) - ) - ); - - Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.INDEX_ZIP - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/index.zip", - path.toString() - ); - - path = JobHelper.makeFileNamePath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - JobHelper.DESCRIPTOR_JSON - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/descriptor.json", - path.toString() - ); - - path = JobHelper.makeTmpPath( - new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), - new LocalFileSystem(), - new DataSegment( - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - cfg.getSchema().getTuningConfig().getVersion(), - null, - null, - null, - new NumberedShardSpec(bucket.partitionNum, 5000), - -1, - -1 - ), - new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0) - ); - Assert.assertEquals( - "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:" - + "version/4712/4712_index.zip.0", - path.toString() - ); - - } @Test public void testHashedBucketSelection() @@ -325,7 +94,8 @@ public void testHashedBucketSelection() null, null, false, - false + false, + null ) ); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); @@ -397,7 +167,8 @@ public void testNoneShardSpecBucketSelection() null, null, false, - false + false, + null ) ); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index c36af52fc950..753379ba709b 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -57,7 +57,8 @@ public void testSerde() throws Exception null, null, true, - true + true, + null ); HadoopTuningConfig actual = jsonReadWriteRead(jsonMapper.writeValueAsString(expected), HadoopTuningConfig.class); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 07edcd9e32a4..190744358573 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -525,7 +525,8 @@ public void setUp() throws Exception buildV9Directly, null, forceExtendableShardSpecs, - false + false, + null ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 17bda8f4bcb7..83f4449f20c6 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -125,7 +125,8 @@ public void setup() throws Exception null, null, false, - false + false, + null ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index ba86ce2708d4..154e03399e58 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -71,7 +71,8 @@ public class GranularityPathSpecTest null, null, false, - false + false, + null ); private GranularityPathSpec granularityPathSpec; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index f7ff27c9c3a2..4a3eceedc56f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -209,7 +209,8 @@ public InputStream openStream() throws IOException null, null, false, - false + false, + null ) ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index e3fb7be6f9d7..6a9370324d0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.3.0" + "org.apache.hadoop:hadoop-client:2.7.3" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 94b3459f3921..7480ac26b551 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -550,6 +551,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), indexMerger, indexIO, null, null, indexMergerV9 ) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 5bdb182c4b12..9f2bd7f78232 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -54,8 +54,10 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; public class SameIntervalMergeTaskTest @@ -200,6 +202,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return null; + } + }, null, null, null, null, null, null, null, null, null, new SegmentLoader() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 88c2cebf9d6f..71b656e57b44 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -93,6 +93,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -231,6 +232,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, new DataSegmentKiller() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6fe8e11d95bf..e25444d11592 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -122,6 +122,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -477,6 +478,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments++; return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; } @@ -1017,6 +1024,12 @@ public DataSegment push(File file, DataSegment dataSegment) throws IOException { throw new RuntimeException("FAILURE"); } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index ad99827052d2..923a80299605 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -26,6 +26,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.Set; public class TestDataSegmentPusher implements DataSegmentPusher @@ -52,6 +54,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException return segment; } + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + public Set getPushedSegments() { return ImmutableSet.copyOf(pushedSegments); diff --git a/pom.xml b/pom.xml index 68a6fb245ad7..47f2865bcac0 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.3.0 + 2.7.3 2.0.0 1.6.6 @@ -180,7 +180,9 @@ com.amazonaws aws-java-sdk - 1.10.21 + + 1.10.56 javax.mail diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 096e7dd07987..91bd6bf84570 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -33,7 +33,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.FileAlreadyExistsException; +import java.util.Map; import java.util.UUID; /** @@ -73,7 +75,7 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment); final File baseStorageDir = config.getStorageDirectory(); final File outDir = new File(baseStorageDir, storageDir); @@ -86,7 +88,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce } return createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outDir)) + segment.withLoadSpec(makeLoadSpec(outDir.toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), outDir @@ -98,7 +100,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce final long size = compressSegment(dataSegmentFile, tmpOutDir); final DataSegment dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip"))) + segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), tmpOutDir @@ -118,6 +120,12 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce return dataSegment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "local", "path", finalIndexZipFilePath.getPath()); + } + private String intermediateDirFor(String storageDir) { return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString(); @@ -138,9 +146,4 @@ private DataSegment createDescriptorFile(DataSegment segment, File outDir) throw Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); return segment; } - - private ImmutableMap makeLoadSpec(File outFile) - { - return ImmutableMap.of("type", "local", "path", outFile.toString()); - } } diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4995c3230274..19a97f7816a2 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -91,7 +91,7 @@ public boolean isSegmentLoaded(final DataSegment segment) public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); if (localStorageDir.exists()) { return location; } @@ -124,7 +124,7 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + String storageDir = DataSegmentPusher.getDefaultStorageDir(segment); if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); @@ -233,11 +233,11 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. - File cacheFile = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); cleanupCacheFiles(location.getPath(), cacheFile); location.removeSegment(segment); } diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index a91e6538d68c..447bfddfe622 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -88,14 +88,14 @@ public void testPush() throws IOException Assert.assertEquals(dataSegment2, returnSegment2); Assert.assertNotEquals( - DataSegmentPusherUtil.getStorageDir(dataSegment), - DataSegmentPusherUtil.getStorageDir(dataSegment2) + localDataSegmentPusher.getStorageDir(dataSegment), + localDataSegmentPusher.getStorageDir(dataSegment2) ); for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) { File outDir = new File( config.getStorageDirectory(), - DataSegmentPusherUtil.getStorageDir(returnSegment) + localDataSegmentPusher.getStorageDir(returnSegment) ); File versionFile = new File(outDir, "index.zip"); File descriptorJson = new File(outDir, "descriptor.json"); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index a692c5b170ad..98399784c511 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -61,6 +61,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -186,6 +187,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; appenderator = Appenderators.createRealtime( schema, diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 0f072cfbf635..b6516fb4c66d 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -20,6 +20,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; @@ -39,7 +40,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; @@ -152,6 +155,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of(); + } } private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer