From 9f7b70a487ccdad2f09877a93aed6a5f4af38d26 Mon Sep 17 00:00:00 2001 From: Gonzalo Gallotti Date: Fri, 8 Jul 2022 14:57:20 -0300 Subject: [PATCH 1/3] AWS S3: Do not use MemoryStream to upload objects. --- .../Storage/GXAmazonS3/GXAmazonS3.csproj | 1 + .../Storage/GXAmazonS3/ExternalProviderS3.cs | 18 +- .../Storage/GXAmazonS3/S3UploadStream.cs | 210 ++++++++++++++++++ 3 files changed, 216 insertions(+), 13 deletions(-) create mode 100644 dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/S3UploadStream.cs diff --git a/dotnet/src/dotnetcore/Providers/Storage/GXAmazonS3/GXAmazonS3.csproj b/dotnet/src/dotnetcore/Providers/Storage/GXAmazonS3/GXAmazonS3.csproj index 927009f41..b11dd9a68 100644 --- a/dotnet/src/dotnetcore/Providers/Storage/GXAmazonS3/GXAmazonS3.csproj +++ b/dotnet/src/dotnetcore/Providers/Storage/GXAmazonS3/GXAmazonS3.csproj @@ -9,6 +9,7 @@ + diff --git a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs index 91faea310..290f557e4 100644 --- a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs +++ b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs @@ -2,10 +2,8 @@ using Amazon.S3; using Amazon.S3.IO; using Amazon.S3.Model; -using GeneXus.Encryption; using GeneXus.Services; using GeneXus.Utils; -using log4net; using System; using System.Collections.Generic; using System.IO; @@ -364,19 +362,13 @@ private S3CannedACL GetCannedACL(GxFileType acl) public string Upload(string fileName, Stream stream, GxFileType destFileType) { - MemoryStream ms = new MemoryStream(); - stream.CopyTo(ms);//can determine PutObjectRequest.Headers.ContentLength. Avoid error Could not determine content length - PutObjectRequest objectRequest = new PutObjectRequest() + string contentType = null; + TryGetContentType(fileName, out contentType); + + using (S3UploadStream s = new S3UploadStream(Client, Bucket, fileName, GetCannedACL(destFileType), contentType)) { - BucketName = Bucket, - Key = fileName, - InputStream = ms, - CannedACL = GetCannedACL(destFileType) - }; - if (TryGetContentType(fileName, out string mimeType)) { - objectRequest.ContentType = mimeType; + stream.CopyTo(s); } - PutObjectResponse result = PutObject(objectRequest); return Get(fileName, destFileType); } diff --git a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/S3UploadStream.cs b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/S3UploadStream.cs new file mode 100644 index 000000000..c3d195403 --- /dev/null +++ b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/S3UploadStream.cs @@ -0,0 +1,210 @@ +using Amazon.S3; +using Amazon.S3.Model; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; + +namespace GeneXus.Storage.GXAmazonS3 +{ + public class S3UploadStream : Stream + { + /* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't + * safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT + * is ~50TB, which is too big for S3. */ + const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater + const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT + const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total + const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH; + + internal class Metadata + { + public string BucketName { get; set; } + public string Key { get; set; } + public long PartLength { get; set; } = DEFAULT_PART_LENGTH; + + public int PartCount { get; set; } = 0; + public string UploadId { get; set; } + public MemoryStream CurrentStream { get; set; } + public S3CannedACL Acl { get; set; } + public string ContentType { get; set; } + + public long Position { get; set; } = 0; + public long Length { get; set; } = 0; + + public List Tasks = new List(); + public ConcurrentDictionary PartETags = new ConcurrentDictionary(); + } + + Metadata _metadata = new Metadata(); + IAmazonS3 _s3 = null; + + public S3UploadStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH) + : this(s3, new Uri(s3uri), partLength) + { + } + + public S3UploadStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH) + : this(s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength) + { + } + public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH) + : this(s3, bucket, key, null, null, partLength) + { + + } + public S3UploadStream(IAmazonS3 s3, string bucket, string key, S3CannedACL acl, string cType = null, long partLength = DEFAULT_PART_LENGTH) + { + _s3 = s3; + _metadata.BucketName = bucket; + _metadata.Key = key; + _metadata.PartLength = partLength; + _metadata.Acl = acl; + _metadata.ContentType = cType; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (_metadata != null) + { + Flush(true); + CompleteUpload(); + } + } + _metadata = null; + base.Dispose(disposing); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position); + + public override long Position + { + get => _metadata.Position; + set => throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) + { + _metadata.Length = Math.Max(_metadata.Length, value); + _metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT)); + } + + private void StartNewPart() + { + if (_metadata.CurrentStream != null) + { + Flush(false); + } + _metadata.CurrentStream = new MemoryStream(); + _metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH)); + } + + public override void Flush() + { + Flush(false); + } + + private void Flush(bool disposing) + { + if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) && + !disposing) + return; + + if (_metadata.UploadId == null) + { + + InitiateMultipartUploadRequest uploadRequest = new InitiateMultipartUploadRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key + }; + + if (_metadata.Acl != null) + { + uploadRequest.CannedACL = _metadata.Acl; + } + + if (!string.IsNullOrEmpty(_metadata.ContentType)) + { + uploadRequest.ContentType = _metadata.ContentType; + } + _metadata.UploadId = _s3.InitiateMultipartUploadAsync(uploadRequest).GetAwaiter().GetResult().UploadId; + } + + if (_metadata.CurrentStream != null) + { + int i = ++_metadata.PartCount; + + _metadata.CurrentStream.Seek(0, SeekOrigin.Begin); + var request = new UploadPartRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key, + UploadId = _metadata.UploadId, + PartNumber = i, + IsLastPart = disposing, + InputStream = _metadata.CurrentStream + }; + _metadata.CurrentStream = null; + + var upload = Task.Run(() => + { + UploadPartResponse response = _s3.UploadPartAsync(request).GetAwaiter().GetResult(); + _metadata.PartETags.AddOrUpdate(i, response.ETag, + (n, s) => response.ETag); + request.InputStream.Dispose(); + }); + _metadata.Tasks.Add(upload); + } + } + + private void CompleteUpload() + { + Task.WaitAll(_metadata.Tasks.ToArray()); + + if (Length > 0) + { + _s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key, + PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(), + UploadId = _metadata.UploadId + }).GetAwaiter().GetResult(); + } + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (count == 0) return; + + // write as much of the buffer as will fit to the current part, and if needed + // allocate a new part and continue writing to it (and so on). + int o = offset; + int c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to + do + { + if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength) + StartNewPart(); + + long remaining = _metadata.PartLength - _metadata.CurrentStream.Length; + int w = Math.Min(c, (int)remaining); + _metadata.CurrentStream.Write(buffer, o, w); + + _metadata.Position += w; + c -= w; + o += w; + } while (c > 0); + } + } +} \ No newline at end of file From ae40aa40de21e375498daedaf3f3c4b729ab929f Mon Sep 17 00:00:00 2001 From: Gonzalo Gallotti Date: Fri, 8 Jul 2022 15:31:02 -0300 Subject: [PATCH 2/3] keep uploading simple if file is lower than 5MB --- .../Storage/GXAmazonS3/ExternalProviderS3.cs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs index 290f557e4..ab6e84eac 100644 --- a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs +++ b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs @@ -360,7 +360,22 @@ private S3CannedACL GetCannedACL(GxFileType acl) } } + const long MIN_MULTIPART_POST = 5L * 1024 * 1024; + public string Upload(string fileName, Stream stream, GxFileType destFileType) + { + bool doSimpleUpload = stream.CanSeek && stream.Length <= MIN_MULTIPART_POST; + if (doSimpleUpload) + { + return UploadSimple(fileName, stream, destFileType); + } + else + { + return UploadMultipart(fileName, stream, destFileType); + } + } + + private string UploadMultipart(string fileName, Stream stream, GxFileType destFileType) { string contentType = null; TryGetContentType(fileName, out contentType); @@ -372,6 +387,23 @@ public string Upload(string fileName, Stream stream, GxFileType destFileType) return Get(fileName, destFileType); } + private string UploadSimple(string fileName, Stream stream, GxFileType destFileType) + { + PutObjectRequest objectRequest = new PutObjectRequest() + { + BucketName = Bucket, + Key = fileName, + InputStream = stream, + CannedACL = GetCannedACL(destFileType) + }; + if (TryGetContentType(fileName, out string mimeType)) + { + objectRequest.ContentType = mimeType; + } + PutObjectResponse result = PutObject(objectRequest); + return Get(fileName, destFileType); + } + public string Copy(string url, string newName, string tableName, string fieldName, GxFileType destFileType) { url = StorageUtils.DecodeUrl(url); From 65c0e964cc07989ee808eb7d4aa57b7f5061928e Mon Sep 17 00:00:00 2001 From: Gonzalo Date: Thu, 14 Jul 2022 09:37:10 -0300 Subject: [PATCH 3/3] Increase to 10MB min upload limit --- .../Providers/Storage/GXAmazonS3/ExternalProviderS3.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs index ab6e84eac..bacfba907 100644 --- a/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs +++ b/dotnet/src/dotnetframework/Providers/Storage/GXAmazonS3/ExternalProviderS3.cs @@ -360,7 +360,7 @@ private S3CannedACL GetCannedACL(GxFileType acl) } } - const long MIN_MULTIPART_POST = 5L * 1024 * 1024; + const long MIN_MULTIPART_POST = 10L * 1024 * 1024; public string Upload(string fileName, Stream stream, GxFileType destFileType) {