-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use the official aws-sdk instead of jet3t #5382
Conversation
@jihoonson does this means we are dropping support for Hadoop < 2.9.0 ? |
@b-slim yes. I tested with 2.7.5, but it didn't work because of the mismatched version of aws-sdk library. |
I added |
Also, there are some |
Is it possible to keep the same version of aws-java-sdk as we have now, and do this patch without changing the Hadoop version? Or is there a reason they both need to be upgraded? I am just asking because people struggle with upgrades every time we change the Hadoop version, so I was hoping it wouldn't be necessary (I was hoping the next upgrade after 2.7.3 would be to 3.x, sometime in the future after its deployment becomes more widespread). |
@gianm yes, there is a reason. To use aws-java-sdk, we need to match its version to that used by Hadoop as well as other libraries' versions like jackson, httpclient, and so on. (Unfortunately, I lost my note listing the libraries needing version change.) From my testing, 2.9.0 was the oldest working version. |
I don't understand why we need to use aws-java-sdk from Hadoop at all. I thought that when we run Hadoop jobs, we use its FileSystem implementations both for reading (possibly from S3) and writing (to a possibly S3 deep storage) and do not call jets3t directly. Could we solve this by excluding the java sdk from the main classloader when we are running Hadoop jobs, meaning we only load the one provided in |
This should say: I don't understand why we need to use Druid's version of aws-java-sdk from Hadoop at all. |
FYI am all in to move to 3.0 but I really think we should be able to support hadoop 2.7 line (if possible) 2.9 is not really popular and I don't think it will be, the most likely, the users will stick with 2.7 line or do the leap and move to 3.0. |
Yes, this is correct. What I meant is the old implementation of S3AFileSystem uses some deprecated APIs of aws-sdk which are not in the recent versions. This is not just an issue of aws-sdk, but also some other libraries like jackson. |
@b-slim when I tested with hadoop 2.7.5, I could see the below error.
This error means that S3AFileSystem of 2.7.5 is using a constructor of TransferManager which is not in the recent version of aws-sdk. So, I added an older version (1.7.4) of aws-sdk to the classpath, and changed the dependency order in the classpath for aws-sdk 1.7.4 to be appeared earlier. This caused another error shown below.
This error came from the mismatched version of Jackson, so I also added an older version of Jackson to the classpath as well and adjusted the order. This causes the below error.
I guess the last error is because of the mismatched Jackson version when the peon tries to initialize some variables using Guice injection which means a newer version of Jackson is required. I'm not sure how the second and the third errors can be fixed (maybe by shading?). I understand what @gianm and @b-slim are concerned with. I'm doing further testing to figure out what's the best way. I'll post the result here once my testing is done. |
@jihoonson does this deprecate https://github.com/druid-io/druid/blob/druid-0.12.0-rc1/pom.xml#L1170-L1176 the |
Related: #5288 |
<groupId>net.java.dev.jets3t</groupId> | ||
<artifactId>jets3t</artifactId> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-s3</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still needed since aws-common pulls in the bundle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check this.
Please add the removal of |
final S3ObjectSummary objectSummary = listResult.getObjectSummaries().get(0); | ||
if (objectSummary.getStorageClass() != null && | ||
StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) { | ||
throw new ISE( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to keep it from being caught in a retry loop? can a comment be added to the code here as such?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like my fault. I don't want to change any logic in this PR. Changed to AmazonServiceException.
@Override | ||
public void close() throws IOException | ||
{ | ||
delegate.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s3Object
should still be closed even if delegate close has an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
} | ||
|
||
return new InputStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would FilterInputStream
with a close
override work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} else { | ||
s3Client.putObject(bucketName, object); | ||
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit-pick) would this be better if the s3client
had a logger?
We're not really adding any information here that couldn't be gathered from reasonable logging in the s3client
I don't think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. I just moved this log which was outside of the if clause to its inside.
} | ||
|
||
log.info("Deleting file [%s]", file.getAbsolutePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest adding in info as to why, like Deleting temporary cached file [%s]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
log.info("Deleting file [%s]", file.getAbsolutePath()); | ||
file.delete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we care if it was successful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. The original code didn't care about it.
while (objectSummaryIterator.hasNext()) { | ||
final S3ObjectSummary objectSummary = objectSummaryIterator.next(); | ||
// TODO: what is going on here? | ||
String keyString = objectSummary.getKey().substring(coords.path.length()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is taking the path beyond the "search" prefix, and checking the regex to see if it matches the remainder (after the prefix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
}; | ||
} | ||
|
||
public static String constructSegmentPath(String baseKey, String storageDir) | ||
static String constructSegmentPath(String baseKey, String storageDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't know if these were used in extensions, can the things that were public remain so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
if (key.endsWith("/") && objectMetadata.getContentLength() == 0) { | ||
return true; | ||
} | ||
// Recognize s3sync.rb directory placeholders by MD5/ETag value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this string in source somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was in the original code.
} | ||
final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0); | ||
if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) { | ||
throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that if there are multiple keys that match the prefix, then if the returned value isn't the first one in the result this will fail? that seems kind of fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added javadoc for this method.
Yes, maybe. I didn't test yet.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any configs of plain text should be compatible with a PasswordProvider once it goes in, so I'm ok with that. Please file the issue before merging
@drcrallen thanks. Raised #5507. |
<artifactId>jets3t</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation looks off here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
log.info("Deleting temporary cached file [%s]", file.getAbsolutePath()); | ||
file.delete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to clean this file up in a finally block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Fixed.
if (result.getKeyCount() == 0) { | ||
throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key); | ||
} | ||
final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for result.getKeyCount() to be > 0 but result.getObjectSummaries() to have size 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the javadoc of ListObjectsV2Result, getKeyCount()
returns number of keys returned with this response, and getObjectSummaries()
returns a list of the object summaries describing the objects stored in the S3 bucket. Since each object associates with a key in S3, it shouldn't happen.
{ | ||
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' | ||
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end | ||
return filename; | ||
} | ||
|
||
static AccessControlList grantFullControlToBucketOwver(AmazonS3 s3Client, String bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Owver -> Owner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
/** | ||
* Gets a single {@link S3ObjectSummary} from s3. Since this method might throw an exception if there are multiple | ||
* objets that match the given key, this method should be used only when it's guaranteed that the given key is unique |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objets -> objects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@jon-wei thanks for the review. Addressed your comments. |
I'm going to merge this PR shortly. @b-slim @drcrallen @jon-wei thanks for the review! |
@jihoonson i see this PR is merged so what is the story now? are we dropping support for Hadoop < 2.8.3 if, that is the case did we have a round of vote at mailing list? is this suppose to be 0.13.0 ? |
@b-slim we still support Hadoop 2.7 line. I tested Hadoop 2.7.3 and 2.7.5. When Hadoop 2.7 is used with s3 deep storage together, S3N still works but S3A doesn't work which is same for the master before this PR. To use S3A, they need a custom |
@jihoonson, |
I tested Apache Hadoop 2.6.0, 2.7.1, 2.7.3, 2.7.5, 2.8.3, 2.9.0, 3.0.0, Cloudera 5.7.0, and HDP 5.7. But, it would be great if you can test more Apache Hadoop and HDP versions as well.
I've tested four cases when deep storage is HDFS, S3A via HDFS, S3N via HDFS, and direct S3.
Well, at first, I wanted to use Hadoop 2.9 to use more recent libraries especially Jackson. But, it broke the support for Hadoop 2.7, so I needed to change it again. IMO, it doesn't make sense to use Hadoop 2.7.3 as our default Hadoop version because we don't support the whole features for that version like S3A. |
Deprecated due to apache#5382
Deprecated due to apache#5382
* Update defaultHadoopCoordinates in documentation. To match changes applied in #5382. * Remove a parameter with defaults from example configuration file. If it has reasonable defaults, then why would it be in an example config file? Also, it is yet another place that has been forgotten to be updated and will be forgotten in the future. Also, if someone is running different hadoop version, then there's much more work to be done than just changing this property, so why give users false hopes? * Fix typo in documentation.
* This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR #5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR #5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (#5581) Deprecated due to #5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586) Also switch various firehoses to the new method. Fixes #5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
Deprecated due to apache#5382
* Update defaultHadoopCoordinates in documentation. To match changes applied in apache#5382. * Remove a parameter with defaults from example configuration file. If it has reasonable defaults, then why would it be in an example config file? Also, it is yet another place that has been forgotten to be updated and will be forgotten in the future. Also, if someone is running different hadoop version, then there's much more work to be done than just changing this property, so why give users false hopes? * Fix typo in documentation.
…e#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (apache#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (apache#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (apache#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (apache#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR apache#5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR apache#5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (apache#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (apache#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (apache#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (apache#5581) Deprecated due to apache#5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (apache#5586) Also switch various firehoses to the new method. Fixes apache#5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
Fixes #4289.
In this PR, the version of the below libraries are changed.
2.9.0 to match library versions2.8.32.8.102.6.7Also, I removed
AWSCredentialsProvider
which is not used anymore.I've done the below tests.
hadoopDependencyCoordinates
was set to 2.9.0)insert-segment-to-db
tool: this tool finds segments under the given directory of deep storageThis change is![Reviewable](https://camo.githubusercontent.com/23b05f5fb48215c989e92cc44cf6512512d083132bd3daf689867c8d9d386888/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)