diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 21480fd1b1d8..002d3ec6c1a6 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.impl; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -28,6 +29,8 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PasswordProvider; import javax.annotation.Nullable; @@ -45,18 +48,31 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn private final String httpAuthenticationUsername; @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; + private final HttpInputSourceConfig config; @JsonCreator public HttpInputSource( @JsonProperty("uris") List uris, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, - @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider + @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, + @JacksonInject HttpInputSourceConfig config ) { Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs"); + throwIfInvalidProtocols(config, uris); this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; + this.config = config; + } + + public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List uris) + { + for (URI uri : uris) { + if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(uri.getScheme()))) { + throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols()); + } + } } @JsonProperty @@ -97,7 +113,8 @@ public SplittableInputSource withSplit(InputSplit split) return new HttpInputSource( Collections.singletonList(split.get()), httpAuthenticationUsername, - httpAuthenticationPasswordProvider + httpAuthenticationPasswordProvider, + config ); } @@ -129,16 +146,17 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - HttpInputSource source = (HttpInputSource) o; - return Objects.equals(uris, source.uris) && - Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) && - Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider); + HttpInputSource that = (HttpInputSource) o; + return Objects.equals(uris, that.uris) && + Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && + Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && + Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider); + return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java new file mode 100644 index 000000000000..40a2521b1186 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class HttpInputSourceConfig +{ + @VisibleForTesting + public static final Set DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("http", "https"); + + @JsonProperty + private final Set allowedProtocols; + + @JsonCreator + public HttpInputSourceConfig( + @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols + ) + { + this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty() + ? DEFAULT_ALLOWED_PROTOCOLS + : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); + } + + public Set getAllowedProtocols() + { + return allowedProtocols; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HttpInputSourceConfig that = (HttpInputSourceConfig) o; + return Objects.equals(allowedProtocols, that.allowedProtocols); + } + + @Override + public int hashCode() + { + return Objects.hash(allowedProtocols); + } + + @Override + public String toString() + { + return "HttpInputSourceConfig{" + + ", allowedProtocols=" + allowedProtocols + + '}'; + } +} + diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java new file mode 100644 index 000000000000..a3f24b9551e9 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Assert; +import org.junit.Test; + +public class HttpInputSourceConfigTest +{ + + @Test + public void testEquals() + { + EqualsVerifier.forClass(HttpInputSourceConfig.class).usingGetClass().verify(); + } + + @Test + public void testNullAllowedProtocolsUseDefault() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(null); + Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testEmptyAllowedProtocolsUseDefault() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of()); + Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testCustomAllowedProtocols() + { + HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid")); + Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 61be2cc1b206..9c17b57d21eb 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -19,29 +19,87 @@ package org.apache.druid.data.input.impl; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.InputSource; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; public class HttpInputSourceTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testSerde() throws IOException { + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null); final ObjectMapper mapper = new ObjectMapper(); + mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig)); final HttpInputSource source = new HttpInputSource( ImmutableList.of(URI.create("http://test.com/http-test")), "myName", - new DefaultPasswordProvider("myPassword") + new DefaultPasswordProvider("myPassword"), + httpInputSourceConfig ); final byte[] json = mapper.writeValueAsBytes(source); final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class); Assert.assertEquals(source, fromJson); } + + @Test + public void testConstructorAllowsOnlyDefaultProtocols() + { + new HttpInputSource( + ImmutableList.of(URI.create("http:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null) + ); + + new HttpInputSource( + ImmutableList.of(URI.create("https:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null) + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [http, https] protocols are allowed"); + new HttpInputSource( + ImmutableList.of(URI.create("my-protocol:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + new HttpInputSourceConfig(null) + ); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocols() + { + final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid")); + new HttpInputSource( + ImmutableList.of(URI.create("druid:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + customConfig + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + new HttpInputSource( + ImmutableList.of(URI.create("https:///")), + "myName", + new DefaultPasswordProvider("myPassword"), + customConfig + ); + } } diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b23432c1cbd0..e6650b1cd38a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -515,6 +515,28 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas |`druid.storage.keyspace`|Cassandra key space.|none| +### Ingestion Security Configuration + +#### HDFS input source + +You can set the following property to specify permissible protocols for +the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose). + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|["hdfs"]| + + +#### HTTP input source + +You can set the following property to specify permissible protocols for +the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose). + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]| + + ### Task Logging If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS. @@ -1355,6 +1377,7 @@ The amount of direct memory needed by Druid is at least ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command line. + #### Query Configurations See [general query configuration](#general-query-configuration). diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index cc06475e96c3..5032ed1a518c 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1064,7 +1064,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs://foo/bar/", "hdfs://bar/foo" + "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo" }, "inputFormat": { "type": "json" @@ -1080,7 +1080,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs://foo/bar", "hdfs://bar/foo"] + "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo" }, "inputFormat": { "type": "json" @@ -1096,7 +1096,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json" + "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json" }, "inputFormat": { "type": "json" @@ -1112,7 +1112,7 @@ Sample specs: "type": "index_parallel", "inputSource": { "type": "hdfs", - "paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"] + "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"] }, "inputFormat": { "type": "json" @@ -1127,9 +1127,10 @@ Sample specs: |type|This should be `hdfs`.|None|yes| |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| -You can also ingest from cloud storage using the HDFS input source. -However, if you want to read from AWS S3 or Google Cloud Storage, consider using -the [S3 input source](#s3-input-source) or the [Google Cloud Storage input source](#google-cloud-storage-input-source) instead. +You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. +However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. +If you want to use a non-hdfs protocol with the HDFS input source, include the protocol +in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details. ### HTTP Input Source @@ -1209,10 +1210,13 @@ You can also use the other existing Druid PasswordProviders. Here is an example |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `http`|None|yes| -|uris|URIs of the input files.|None|yes| +|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes| |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| +You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources. +The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details. + ### Inline Input Source The Inline input source can be used to read the data inlined in its own spec. @@ -1559,6 +1563,11 @@ Note that prefetching or caching isn't that useful in the Parallel task. |fetchTimeout|Timeout for fetching each file.|60000| |maxFetchRetry|Maximum number of retries for fetching each file.|3| +You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage. +However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. +If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want +in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details. + ### LocalFirehose This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers. @@ -1596,6 +1605,9 @@ A sample HTTP Firehose spec is shown below: } ``` +You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources. +The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details. + The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header. diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index ee5bf032b4d0..f7fac9f3b479 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.guice.Hdfs; import org.apache.druid.inputsource.hdfs.HdfsInputSource; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; @@ -44,21 +45,25 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory inputPaths; private final Configuration conf; + private final HdfsInputSourceConfig inputSourceConfig; @JsonCreator public HdfsFirehoseFactory( - @JacksonInject @Hdfs Configuration conf, @JsonProperty("paths") Object inputPaths, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, @JsonProperty("fetchTimeout") Long fetchTimeout, - @JsonProperty("maxFetchRetry") Integer maxFetchRetry + @JsonProperty("maxFetchRetry") Integer maxFetchRetry, + @JacksonInject @Hdfs Configuration conf, + @JacksonInject HdfsInputSourceConfig inputSourceConfig ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); - this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths"); + this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths"); this.conf = conf; + this.inputSourceConfig = inputSourceConfig; + this.inputPaths.forEach(p -> HdfsInputSource.verifyProtocol(conf, inputSourceConfig, p)); } @JsonProperty("paths") @@ -109,21 +114,14 @@ public boolean isSplittable() public FiniteFirehoseFactory withSplit(InputSplit split) { return new HdfsFirehoseFactory( - conf, split.get().toString(), getMaxCacheCapacityBytes(), getMaxFetchCapacityBytes(), getPrefetchTriggerBytes(), getFetchTimeout(), - getMaxFetchRetry() + getMaxFetchRetry(), + conf, + inputSourceConfig ); } - - @Override - public String toString() - { - return "HdfsFirehoseFactory{" + - "inputPaths=" + inputPaths + - '}'; - } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index b8c798a77b5c..7faebccf268b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.utils.Streams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -64,6 +65,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn private final List inputPaths; private final Configuration configuration; + private final HdfsInputSourceConfig inputSourceConfig; // Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource // *does* cache the splits for the following reasons: @@ -73,32 +75,49 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn // // 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone // migrating from Hadoop. - private List cachedPaths; + @Nullable + private List cachedPaths = null; @JsonCreator public HdfsInputSource( @JsonProperty(PROP_PATHS) Object inputPaths, - @JacksonInject @Hdfs Configuration configuration + @JacksonInject @Hdfs Configuration configuration, + @JacksonInject HdfsInputSourceConfig inputSourceConfig ) { this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); this.configuration = configuration; - this.cachedPaths = null; + this.inputSourceConfig = inputSourceConfig; + this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p)); } public static List coerceInputPathsToList(Object inputPaths, String propertyName) { - final List paths; - if (inputPaths instanceof String) { - paths = Collections.singletonList((String) inputPaths); + return Collections.singletonList((String) inputPaths); } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { - paths = ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); + return ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); } else { throw new IAE("'%s' must be a string or an array of strings", propertyName); } + } + + public static void verifyProtocol(Configuration conf, HdfsInputSourceConfig config, String pathString) + { + Path path = new Path(pathString); + try { + throwIfInvalidProtocol(config, path.getFileSystem(conf).getScheme()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } - return paths; + private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String scheme) + { + if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(scheme))) { + throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols()); + } } public static Collection getPaths(List inputPaths, Configuration configuration) throws IOException @@ -202,7 +221,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp public SplittableInputSource> withSplit(InputSplit> split) { List paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); - return new HdfsInputSource(paths, configuration); + return new HdfsInputSource(paths, configuration, inputSourceConfig); } @Override @@ -218,6 +237,7 @@ private void cachePathsIfNeeded() throws IOException } } + @VisibleForTesting static Builder builder() { return new Builder(); @@ -227,6 +247,7 @@ static final class Builder { private Object paths; private Configuration configuration; + private HdfsInputSourceConfig inputSourceConfig; private Builder() { @@ -244,9 +265,19 @@ Builder configuration(Configuration configuration) return this; } + Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig) + { + this.inputSourceConfig = inputSourceConfig; + return this; + } + HdfsInputSource build() { - return new HdfsInputSource(paths, configuration); + return new HdfsInputSource( + Preconditions.checkNotNull(paths, "paths"), + Preconditions.checkNotNull(configuration, "configuration"), + Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig") + ); } } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java new file mode 100644 index 000000000000..c7f43f8b6a99 --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.util.Set; +import java.util.stream.Collectors; + +public class HdfsInputSourceConfig +{ + static final Set DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("hdfs"); + + @JsonProperty + private final Set allowedProtocols; + + @JsonCreator + public HdfsInputSourceConfig( + @JsonProperty("allowedProtocols") @Nullable Set allowedProtocols + ) + { + this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty() + ? DEFAULT_ALLOWED_PROTOCOLS + : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet()); + } + + public Set getAllowedProtocols() + { + return allowedProtocols; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index e89bb0d8000e..3ca8e23535e1 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -35,6 +35,7 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; import org.apache.druid.inputsource.hdfs.HdfsInputSource; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.hadoop.conf.Configuration; @@ -118,5 +119,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class); binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class); LifecycleModule.register(binder, HdfsStorageAuthentication.class); + + JsonConfigProvider.bind(binder, "druid.ingestion.hdfs", HdfsInputSourceConfig.class); } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java index 88daed7a821d..e96a773c0cef 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java @@ -19,30 +19,48 @@ package org.apache.druid.firehose.hdfs; -import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.Collections; public class HdfsFirehoseFactoryTest { + private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null); + private static final Configuration DEFAULT_CONFIGURATION = new Configuration(); + + @BeforeClass + public static void setup() + { + DEFAULT_CONFIGURATION.set("fs.default.name", "hdfs://localhost:7020"); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testArrayPaths() throws IOException { final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( - null, Collections.singletonList("/foo/bar"), null, null, null, null, - null + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG ); final ObjectMapper mapper = createMapper(); @@ -59,7 +77,16 @@ public void testArrayPaths() throws IOException @Test public void testStringPaths() throws IOException { - final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null); + final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( + "/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); final ObjectMapper mapper = createMapper(); final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory) @@ -71,11 +98,121 @@ public void testStringPaths() throws IOException ); } + @Test + public void testConstructorAllowsOnlyDefaultProtocol() + { + new HdfsFirehoseFactory( + "hdfs://localhost:7020/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [hdfs] protocols are allowed"); + new HdfsFirehoseFactory( + "file:/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocol() + { + final Configuration conf = new Configuration(); + conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem"); + new HdfsFirehoseFactory( + "ftp://localhost:21/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + new HdfsInputSourceConfig(ImmutableSet.of("ftp")) + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + new HdfsFirehoseFactory( + "hdfs://localhost:7020/foo/bar", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + new HdfsInputSourceConfig(ImmutableSet.of("druid")) + ); + } + + @Test + public void testConstructorWithDefaultHdfs() + { + new HdfsFirehoseFactory( + "/foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "hdfs:///foo/bar*", + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + + new HdfsFirehoseFactory( + "hdfs://localhost:10020/foo/bar*", // different hdfs + null, + null, + null, + null, + null, + DEFAULT_CONFIGURATION, + DEFAULT_INPUT_SOURCE_CONFIG + ); + } + private static ObjectMapper createMapper() { final ObjectMapper mapper = new ObjectMapper(); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); - mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + mapper.setInjectableValues( + new Std() + .addValue(Configuration.class, DEFAULT_CONFIGURATION) + .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG) + ); return mapper; } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java new file mode 100644 index 000000000000..2e73688cd18f --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.google.common.collect.ImmutableSet; +import org.junit.Assert; +import org.junit.Test; + +public class HdfsInputSourceConfigTest +{ + @Test + public void testNullAllowedProtocolsUseDefault() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(null); + Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testEmptyAllowedProtocolsUseDefault() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of()); + Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols()); + } + + @Test + public void testCustomAllowedProtocols() + { + HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of("druid")); + Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols()); + } +} diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 170791e7d16c..a61a0c6de950 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -20,8 +20,9 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; @@ -68,8 +69,9 @@ @RunWith(Enclosed.class) public class HdfsInputSourceTest extends InitializedNullHandlingTest { - private static final String PATH = "/foo/bar"; + private static final String PATH = "hdfs://localhost:7020/foo/bar"; private static final Configuration CONFIGURATION = new Configuration(); + private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null); private static final String COLUMN = "value"; private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( new TimestampSpec(null, null, null), @@ -84,6 +86,80 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest 0 ); + public static class ConstructorTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testConstructorAllowsOnlyDefaultProtocol() + { + HdfsInputSource.builder() + .paths(PATH + "*") + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [hdfs] protocols are allowed"); + HdfsInputSource.builder() + .paths("file:/foo/bar*") + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocol() + { + final Configuration conf = new Configuration(); + conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem"); + HdfsInputSource.builder() + .paths("ftp://localhost:21/foo/bar") + .configuration(CONFIGURATION) + .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("ftp"))) + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + HdfsInputSource.builder() + .paths(PATH + "*") + .configuration(CONFIGURATION) + .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("druid"))) + .build(); + } + + @Test + public void testConstructorWithDefaultHdfs() + { + final Configuration conf = new Configuration(); + conf.set("fs.default.name", "hdfs://localhost:7020"); + HdfsInputSource.builder() + .paths("/foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("hdfs:///foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + HdfsInputSource.builder() + .paths("hdfs://localhost:10020/foo/bar*") // different hdfs + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + } + } + public static class SerializeDeserializeTest { private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); @@ -98,7 +174,8 @@ public void setup() { hdfsInputSourceBuilder = HdfsInputSource.builder() .paths(PATH) - .configuration(CONFIGURATION); + .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG); } @Test @@ -139,7 +216,11 @@ private static void testSerializesDeserializes(Wrapper hdfsInputSourceWrapper) private static ObjectMapper createObjectMapper() { final ObjectMapper mapper = new ObjectMapper(); - mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + mapper.setInjectableValues( + new Std() + .addValue(Configuration.class, new Configuration()) + .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG) + ); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); return mapper; } @@ -204,6 +285,7 @@ public void setup() throws IOException target = HdfsInputSource.builder() .paths(dfsCluster.getURI() + PATH + "*") .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) .build(); } @@ -304,6 +386,7 @@ public void setup() target = HdfsInputSource.builder() .paths(Collections.emptyList()) .configuration(CONFIGURATION) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) .build(); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java new file mode 100644 index 000000000000..31badedc7e66 --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.hdfs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.JsonConfigurator; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; +import org.junit.Assert; +import org.junit.Test; + +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Properties; + +public class HdfsStorageDruidModuleTest +{ + @Test + public void testHdfsInputSourceConfigDefaultAllowedProtocols() + { + Properties props = new Properties(); + Injector injector = makeInjectorWithProperties(props); + HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class); + Assert.assertEquals( + ImmutableSet.of("hdfs"), + instance.getAllowedProtocols() + ); + } + + @Test + public void testHdfsInputSourceConfigCustomAllowedProtocols() + { + Properties props = new Properties(); + props.setProperty("druid.ingestion.hdfs.allowedProtocols", "[\"webhdfs\"]"); + Injector injector = makeInjectorWithProperties(props); + HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class); + Assert.assertEquals( + ImmutableSet.of("webhdfs"), + instance.getAllowedProtocols() + ); + } + + private Injector makeInjectorWithProperties(final Properties props) + { + return Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new LifecycleModule(), + binder -> { + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bind(JsonConfigurator.class).in(LazySingleton.class); + binder.bind(Properties.class).toInstance(props); + }, + new HdfsStorageDruidModule() + ) + ); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java index 0423af4f9a03..eb612f70958c 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java +++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; +import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.initialization.DruidModule; import java.util.List; @@ -47,5 +49,6 @@ public List getJacksonModules() @Override public void configure(Binder binder) { + JsonConfigProvider.bind(binder, "druid.ingestion.http", HttpInputSourceConfig.class); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 7ff5e957d66a..bbd797f2d3d1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.realtime.firehose; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -26,9 +27,10 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.HttpEntity; +import org.apache.druid.data.input.impl.HttpInputSource; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.utils.CompressionUtils; @@ -43,12 +45,12 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { - private static final Logger log = new Logger(HttpFirehoseFactory.class); private final List uris; @Nullable private final String httpAuthenticationUsername; @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; + private final HttpInputSourceConfig inputSourceConfig; @JsonCreator public HttpFirehoseFactory( @@ -59,14 +61,17 @@ public HttpFirehoseFactory( @JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("maxFetchRetry") Integer maxFetchRetry, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, - @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider + @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, + @JacksonInject HttpInputSourceConfig inputSourceConfig ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); + HttpInputSource.throwIfInvalidProtocols(inputSourceConfig, uris); this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; + this.inputSourceConfig = inputSourceConfig; } @Nullable @@ -120,35 +125,20 @@ public boolean equals(Object o) if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - final HttpFirehoseFactory that = (HttpFirehoseFactory) o; - return Objects.equals(uris, that.uris) && - getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && - getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && - getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && - getFetchTimeout() == that.getFetchTimeout() && - getMaxFetchRetry() == that.getMaxFetchRetry() && - Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) && - Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider()); + HttpFirehoseFactory that = (HttpFirehoseFactory) o; + return uris.equals(that.uris) && + Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && + Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && + inputSourceConfig.equals(that.inputSourceConfig); } @Override public int hashCode() { - return Objects.hash( - uris, - getMaxCacheCapacityBytes(), - getMaxFetchCapacityBytes(), - getPrefetchTriggerBytes(), - getFetchTimeout(), - getMaxFetchRetry(), - httpAuthenticationUsername, - httpAuthenticationPasswordProvider - ); + return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, inputSourceConfig); } @Override @@ -168,7 +158,8 @@ public FiniteFirehoseFactory withSplit(InputSplit { + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bind(JsonConfigurator.class).in(LazySingleton.class); + binder.bind(Properties.class).toInstance(props); + }, + new JacksonModule(), + new InputSourceModule() + )); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java index e44ff7f6391c..867c49778d59 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java @@ -19,22 +19,36 @@ package org.apache.druid.segment.realtime.firehose; +import com.fasterxml.jackson.databind.InjectableValues.Std; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; public class HttpFirehoseFactoryTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testSerde() throws IOException { + final HttpInputSourceConfig inputSourceConfig = new HttpInputSourceConfig(null); final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues(new Std().addValue( + HttpInputSourceConfig.class, + inputSourceConfig + )); + final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword"); final HttpFirehoseFactory factory = new HttpFirehoseFactory( ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")), @@ -44,7 +58,8 @@ public void testSerde() throws IOException 100L, 5, "testUser", - pwProvider + pwProvider, + inputSourceConfig ); final HttpFirehoseFactory outputFact = mapper.readValue( @@ -54,4 +69,77 @@ public void testSerde() throws IOException Assert.assertEquals(factory, outputFact); } + + @Test + public void testConstructorAllowsOnlyDefaultProtocols() + { + new HttpFirehoseFactory( + ImmutableList.of(URI.create("http:///")), + null, + null, + null, + null, + null, + null, + null, + new HttpInputSourceConfig(null) + ); + + new HttpFirehoseFactory( + ImmutableList.of(URI.create("https:///")), + null, + null, + null, + null, + null, + null, + null, + new HttpInputSourceConfig(null) + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [http, https] protocols are allowed"); + new HttpFirehoseFactory( + ImmutableList.of(URI.create("my-protocol:///")), + null, + null, + null, + null, + null, + null, + null, + new HttpInputSourceConfig(null) + ); + } + + @Test + public void testConstructorAllowsOnlyCustomProtocols() + { + final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid")); + new HttpFirehoseFactory( + ImmutableList.of(URI.create("druid:///")), + null, + null, + null, + null, + null, + null, + null, + customConfig + ); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Only [druid] protocols are allowed"); + new HttpFirehoseFactory( + ImmutableList.of(URI.create("https:///")), + null, + null, + null, + null, + null, + null, + null, + customConfig + ); + } }