Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delagate creation of segmentPath/LoadSpec to DataSegmentPushers and add S3a support #4116

Merged
merged 15 commits into from
Jun 4, 2017

Conversation

b-slim
Copy link
Contributor

@b-slim b-slim commented Mar 25, 2017

  • Delegation of creation/naming of segments to DataSegmentPusher
  • Add Support of S3a schema
  • Add dependency jars to Hdfs Module
  • Update the hadoop compile version.
  • Propagates some default run time properties to the hadoop indexer jvm. Currently the defaults are druid.storage.* and "druid.javascript.*

@b-slim
Copy link
Contributor Author

b-slim commented Mar 25, 2017

This replaces #3940

@b-slim
Copy link
Contributor Author

b-slim commented Mar 25, 2017

@akashdw can you take a look at this one as well.

@gianm gianm requested a review from dclim March 25, 2017 23:05
@gianm
Copy link
Contributor

gianm commented Mar 25, 2017

@dclim could you please take a look?

Copy link
Member

@nishantmonu51 nishantmonu51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides minor comments, looks good to me, 👍


public interface DataSegmentPusher
{
Joiner JOINER = Joiner.on("/").skipNulls();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static final ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is by definition since it is part of interface

return ImmutableMap.<String, Object>of("type", "hdfs", "path", finalIndexZipFilePath.toString());
}

private static final Joiner JOINER = Joiner.on("/").skipNulls();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: reuse DataSegmentPusher.JOINER

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -115,15 +118,14 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
storageDir,
segment.getShardSpec().getPartitionNum()
Copy link
Contributor

@akashdw akashdw Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outIndexFile could be created using JobHelper.makeFileNamePath(). And to do that, we can move makeFileNamePath() method from JobHelper to DataSegmentPusher. This ensures no separate logic for index file path creation at multiple places.
We can do the same for tmp file and descriptor.json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a good idea but this will require that druid-api has to depend on hadoop APIs which i am afraid it will trigger push back from no-hadoop-fan-club.. most of the logic is encapsulated at dataSegmentPusher.makeIndexPathName so i guess we are ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -115,15 +118,14 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
storageDir,
segment.getShardSpec().getPartitionNum()
));

final Path outDescriptorFile = new Path(String.format(
"%s/%s/%d_descriptor.json",
fullyQualifiedStorageDirectory,
storageDir,
segment.getShardSpec().getPartitionNum()
));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

@@ -23,7 +23,6 @@
import com.google.common.base.Predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 718 of this file: should the URI be created with the scheme written in the loadSpec for the key 'S3Schema' if it exists? (and then default to s3n if it doesn't)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch done

@@ -71,7 +71,7 @@
<netty.version>4.1.6.Final</netty.version>
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.3.0</hadoop.compile.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you encounter the exception: Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V

in your testing? There's some discussion on it here https://issues.apache.org/jira/browse/HADOOP-12420 and the conclusion at the end is that Hadoop 2.7.3 will only work with aws-java-sdk 1.7.4 until Hadoop 2.8 is released (https://issues.apache.org/jira/browse/HADOOP-12269). I was only able to get the s3a scheme working by downgrading Druid's version of aws-java-sdk to 1.7.4 so I'm curious what your experience was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim i havn't run into this issue, maybe because i am not loading the s3a module, in fact i am using hdfs module to read segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim 2.8 is released if that works for you i can update the dependency, please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-slim nice! I don't have any concerns updating our client to 2.8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@b-slim
Copy link
Contributor Author

b-slim commented Mar 31, 2017

Timeout issue closing and opening.

@b-slim b-slim closed this Mar 31, 2017
@b-slim b-slim reopened this Mar 31, 2017
@dclim dclim closed this Apr 3, 2017
@dclim dclim reopened this Apr 3, 2017
@dclim
Copy link
Contributor

dclim commented Apr 3, 2017

👍 after Travis

@fjy
Copy link
Contributor

fjy commented Apr 5, 2017

@b-slim this is failing UT

@b-slim
Copy link
Contributor Author

b-slim commented Apr 11, 2017

@dclim seems like there is a bunch of issue with the 2.8 it self, i am thinking about downgrading it to 2.7 again, what you think ?

@dclim
Copy link
Contributor

dclim commented Apr 11, 2017

@b-slim do you know what is causing the UT to fail? is it a configuration issue or something more fundamental?

downgrading to 2.7 seems reasonable to me if we don't have time to fix the issue, but I expect that people may then run into the aws java sdk incompatibility issue and we may need to walk them through downgrading from 1.10.x to 1.7.4

@dclim
Copy link
Contributor

dclim commented Apr 13, 2017

@b-slim have you tested this on a remote Hadoop cluster? I don't think it actually works as expected. The behavior I'm seeing is that even though the segments are getting pushed to S3, the loadSpec for the descriptor.json and in the druid_segments metadata table are showing type 'local' instead of 's3_zip' or 'hdfs'.

This is happening because the loadSpec is now being generated by the DataSegmentPusher implementation which is being selected based on the value of druid.storage.type. However, the Druid properties are not being passed down into the system properties of the map/reduce JVMs and hence druid.storage.type does not get set and the default LocalDataSegmentPusher get used (even though the segments are actually written to S3 because the pusher isn't used for this in Hadoop jobs).

@b-slim
Copy link
Contributor Author

b-slim commented Apr 14, 2017

@dclim i haven't tested that downstream yet, let me check it out and see how we can propagate those properties to the MM and hadoop JVM opts.

@dclim
Copy link
Contributor

dclim commented Apr 18, 2017

@b-slim cool, also take a look at #4180 - it might be a good idea to see if we can pass down the majority of the druid.* properties into the map and reduce JVMs to mitigate this class of issues - I was playing around with this earlier and ran into problems when I just blanket passed them all in (I think it was related to the extensions loadList probably due to the way we handle extensions in MR jobs) but I didn't investigate any further

@b-slim
Copy link
Contributor Author

b-slim commented Jun 8, 2017

@jon-wei i have run this with 2.7.3 (HDP 2.6) only.

@jon-wei
Copy link
Contributor

jon-wei commented Jun 8, 2017

@b-slim Can you try running with versions < 2.6.0 and see if there's a way to make that work? Otherwise that's a pretty big compatibility breakage

@b-slim
Copy link
Contributor Author

b-slim commented Jun 8, 2017

@jon-wei thanks, can you please share the logs ? you think it is related to this patch ?

@jon-wei
Copy link
Contributor

jon-wei commented Jun 8, 2017

These are the errors I saw running against a sequenceiq 2.3.0 cluster, I tried various classloader options and got different errors:

With mapreduce.job.classloader = true

2017-06-07 20:01:52,673 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1496880055583_0001_000002
2017-06-07 20:01:52,778 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-06-07 20:01:52,782 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-06-07 20:01:52,785 INFO [main] org.apache.hadoop.mapreduce.v2.util.MRApps: Using job classloader
2017-06-07 20:01:52,860 FATAL [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster
java.lang.LinkageError: loader constraint violation: when resolving overridden method "org.apache.xerces.jaxp.DocumentBuilderImpl.newDocument()Lorg/w3c/dom/Document;" the class loader (instance of org/apache/hadoop/yarn/util/ApplicationClassLoader) of the current class, org/apache/xerces/jaxp/DocumentBuilderImpl, and its superclass loader (instance of <bootloader>), have different Class objects for the type org/w3c/dom/Document used in the signature
	at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown Source)
	at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2210)
	at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2172)
	at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2089)
	at org.apache.hadoop.conf.Configuration.get(Configuration.java:838)
	at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:857)
	at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1255)
	at org.apache.hadoop.security.SecurityUtil.<clinit>(SecurityUtil.java:70)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:244)
	at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1430)
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1386)
2017-06-07 20:01:52,863 INFO [main] org.apache.hadoop.util.ExitUtil: Exiting with status 1

With mapreduce.job.user.classpath.first = true

 2017-06-07T21:42:36,949 ERROR [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster - Error starting MRAppMaster
java.lang.NoSuchMethodError: org.apache.hadoop.http.HttpConfig.getSchemePrefix()Ljava/lang/String;
	at org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer.initFilter(AmFilterInitializer.java:41) ~[hadoop-yarn-server-web-proxy-2.3.0.jar:?]
	at org.apache.hadoop.http.HttpServer2.initializeWebServer(HttpServer2.java:400) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.http.HttpServer2.<init>(HttpServer2.java:351) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.http.HttpServer2.<init>(HttpServer2.java:114) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.http.HttpServer2$Builder.build(HttpServer2.java:290) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.yarn.webapp.WebApps$Builder.build(WebApps.java:261) ~[hadoop-yarn-common-2.7.3.jar:?]
	at org.apache.hadoop.yarn.webapp.WebApps$Builder.start(WebApps.java:303) ~[hadoop-yarn-common-2.7.3.jar:?]
	at org.apache.hadoop.mapreduce.v2.app.client.MRClientService.serviceStart(MRClientService.java:144) ~[hadoop-mapreduce-client-app-2.7.3.jar:?]
	at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceStart(MRAppMaster.java:1147) ~[hadoop-mapreduce-client-app-2.7.3.jar:?]
	at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$5.run(MRAppMaster.java:1552) ~[hadoop-mapreduce-client-app-2.7.3.jar:?]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_121]
	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_121]
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1548) ~[hadoop-mapreduce-client-app-2.7.3.jar:?]
	at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1481) [hadoop-mapreduce-client-app-2.7.3.jar:?]
2017-06-07T21:42:36,957 INFO [main] org.apache.hadoop.util.ExitUtil - Exiting with status 1

With neither

Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;

@jon-wei
Copy link
Contributor

jon-wei commented Jun 8, 2017

@b-slim If we can't get <2.6.0 hadoop working, I'm okay with dropping support for those older versions with the 0.10.1 release, but I think we should probably make a community decision, I'll start a druid-dev thread regarding that

@jon-wei
Copy link
Contributor

jon-wei commented Jun 8, 2017

Started a thread on dropping support for Hadoop <2.6.0 if needed:

https://groups.google.com/forum/#!topic/druid-development/vFjR0pdq83U

@b-slim
Copy link
Contributor Author

b-slim commented Jun 8, 2017

@jon-wei thanks looks like it is a long shot to make it work with less than 2.6, will give it a try.

@jon-wei
Copy link
Contributor

jon-wei commented Jun 12, 2017

@b-slim Can you add docs for the changes in this patch?

@dclim
Copy link
Contributor

dclim commented Jun 12, 2017

@b-slim @jon-wei somewhat related - since we're upgrading aws-java-sdk and will be testing the new version, I was thinking to bump it to a later version since 1.10.56 is already over a year old. Any objections to this?

@jon-wei
Copy link
Contributor

jon-wei commented Jun 13, 2017

@dclim I have no objection, but I think we can't upgrade to 1.11+ because Druid is still on Jackson 2.4:

#4313

That PR updated aws-java-sdk to 1.10.77 and also changed Druid to pull in aws-java-sdk-ec2 instead of aws-java-sdk

@dclim
Copy link
Contributor

dclim commented Jun 13, 2017

@jon-wei ah, thanks for pointing out that patch, that sounds good to me

@b-slim
Copy link
Contributor Author

b-slim commented Jun 13, 2017

@jon-wei i will send a followup pr this week thanks !

@himanshug
Copy link
Contributor

continuing.... #4313 (comment) and #4313 (comment)
does anyone know what aws properties are needed to be set to what values in integration testing environment to fix the situation ?

@b-slim
Copy link
Contributor Author

b-slim commented Jun 20, 2017

@himanshug setting the version to 1.10.77 at the hdfs pom file will bring down the correct jar. If that works we can have a global aws-version as fix.

@gianm
Copy link
Contributor

gianm commented Jun 21, 2017

@himanshug @b-slim does that mean we need to change something for 0.10.1 (rc2)?

@himanshug
Copy link
Contributor

@gianm most likely, yes... m gonna test himanshug@9632b0c

@artmoskvin
Copy link

Guys, I'm sorry but I still can't make Hadoop 2.7.3 and aws-java-sdk-s3:1.10.77 work with s3a. Here's what I have. Please tell me what I'm doing wrong.

I'm using Druid 0.10.1 and I use S3 as a deep storage. I put hadoop-aws-2.7.3.jar into hadoop-dependencies/hadoop-client/2.7.3/. Then I put aws-java-sdk-s3-1.10.77.jar in lib/. And here's my core-site.xml:

<configuration>
  <property>
    <name>fs.s3a.endpoint</name>
    <value>s3.eu-central-1.amazonaws.com</value>
  </property>

  <property>
    <name>fs.s3.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>TRATATA</value>
  </property>

  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>BLABLABLA</value>
  </property>
</configuration>

Still I have: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V

@@ -39,6 +39,9 @@
@JsonProperty
@Min(0)
private int maxListingLength = 1000;
// use s3n by default for backward compatibility
@JsonProperty
private boolean useS3aSchema = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add this to documentation ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using S3 as deep storage. The batch ingestion by default uses S3N uri - is this the configuration to set so that it uses S3A url format? If so where should this be set? In the deep storage config along with other properties? Is it druid.storage.useS3aSchema = true?

@b-slim

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@palanieppan-m you have to add this to hadoop config file like core-site.xml. If you don't have any other configurations it would look something like:

<configuration>
  <property>
    <name>fs.s3a.endpoint</name>
    <value>s3.eu-central-1.amazonaws.com</value>
  </property>

  <property>
    <name>fs.s3.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

  <property>
    <name>fs.s3a.access.key</name>
    <value>${S3_ACCESS_KEY}</value>
  </property>

  <property>
    <name>fs.s3a.secret.key</name>
    <value>${S3_SECRET_KEY}</value>
  </property>
</configuration>

Don't forget to change endpoint if you're using different region.

Next create another config file named jets3t.properties with the following:

s3service.s3-endpoint = s3.eu-central-1.amazonaws.com

storage-service.request-signature-version=AWS4-HMAC-SHA256

Again change region if needed.

And lastly all this thing will only work with patched Hadoop libraries. I used Hadoop 2.7.3 from HDP 2.5.3 distribution. It is patched with aws-java-sdk-*:1.10.6 and works perfectly with it. So I replaced hadoop and aws jars and only then it worked. For more info you can take a look at this discussion: #4977

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moscowart, your proposal is workaround for the default behavior of Druid which uses S3N URL scheme. To get that to work, you have to configure Hadoop to support it.

In this PR though, it looks like behavior can be changed such that Druid always uses S3A URL scheme instead, provided you turn on a config. I was enquiring whether my understanding is correct and what's the config name to turn on this behavior.

@samarthjain
Copy link
Contributor

We are currently running into an issue where the hadoop-indexer job is making segment entries in the metadata store using the local load spec. As a result, segments are not getting loaded. The commentary here suggests we may need to pass
druid.storage.type to the indexer job. How do we do this? Or better, has this issue been fixed in later releases?

@samarthjain
Copy link
Contributor

samarthjain commented Jan 26, 2018

Alternatively, is there a way to configure the DataSegmentPusher? Right now, by default, LocalDataSegmentPusher is injected. This issue is preventing us from using druid-10.1 for bulk ingestion. Forgot to mention, we want to push the segments to S3. Things worked fine with 0.9.1, but after the changes made in this pull request, bulk ingestion doesn't work.

@b-slim
Copy link
Contributor Author

b-slim commented Jan 27, 2018

@samarthjain this is not the best place to ask questions, you need to set druid.storage.type=hdfs or any other deep storage type as part of the properties file.

@palanieppan-m
Copy link
Contributor

palanieppan-m commented Feb 14, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet