From 7e448c1d854d0e86a7398f4242bba9e29c6dd1a4 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Thu, 24 Apr 2025 09:39:07 +0200 Subject: [PATCH 1/3] Change link index updater lambda to react on SQS events --- .github/workflows/temp.yml | 19 ++ Directory.Packages.props | 48 ++--- .../Links/CrossLinks/CrossLinkResolver.cs | 4 + .../LinkIndexProvider.cs | 80 +++++++++ .../LinkReferenceProvider.cs | 27 +++ .../docs-lambda-index-publisher/Program.cs | 169 ++++++++---------- .../docs-lambda-index-publisher/README.md | 3 +- .../SerializerContext.cs | 14 ++ .../docs-lambda-index-publisher.csproj | 4 +- 9 files changed, 246 insertions(+), 122 deletions(-) create mode 100644 .github/workflows/temp.yml create mode 100644 src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs create mode 100644 src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs create mode 100644 src/infra/docs-lambda-index-publisher/SerializerContext.cs diff --git a/.github/workflows/temp.yml b/.github/workflows/temp.yml new file mode 100644 index 000000000..e2dc0c5a9 --- /dev/null +++ b/.github/workflows/temp.yml @@ -0,0 +1,19 @@ +name: temp + +on: + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: | + docker build . -t publish-links-index:latest -f src/infra/docs-lambda-index-publisher/lambda.DockerFile + docker cp $(docker create --name tc publish-links-index:latest):/app/.artifacts/publish ./.artifacts && docker rm tc + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + retention-days: 1 + name: bootstrap + path: .artifacts/docs-lambda-index-publisher/release_linux-x64/bootstrap diff --git a/Directory.Packages.props b/Directory.Packages.props index a4399b5c4..e5c52c9ea 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -8,12 +8,14 @@ - - - - + + + + + + + - @@ -23,17 +25,16 @@ - - + - + - + - - + + @@ -41,24 +42,23 @@ - - + + - - - - - + + + + - - - - - + + + + + - + \ No newline at end of file diff --git a/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs b/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs index dd5b1fa24..60a070aef 100644 --- a/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs +++ b/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs @@ -38,6 +38,10 @@ public record LinkIndexEntry [JsonPropertyName("etag")] public required string ETag { get; init; } + // TODO can be made required after all doc_sets have published again + [JsonPropertyName("updated_at")] + public DateTime UpdatedAt { get; init; } = DateTime.MinValue; + // TODO can be made required after all doc_sets have published again [JsonPropertyName("ref")] public string GitReference { get; init; } = "unknown"; diff --git a/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs b/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs new file mode 100644 index 000000000..6b00ab82b --- /dev/null +++ b/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs @@ -0,0 +1,80 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Amazon.Lambda.Core; +using Amazon.S3; +using Amazon.S3.Model; +using Elastic.Markdown.Links.CrossLinks; + +namespace Elastic.Documentation.Lambda.LinkIndexUploader; + +/// +/// Gets the link index from S3 once. +/// You can then update the link index with and save it with . +/// If the link index changed in the meantime, will throw an exception, +/// thus all the messages from the queue will be sent back to the queue. +/// +public class LinkIndexProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key) +{ + private string? _etag; + private LinkIndex? _linkIndex; + + private async Task GetLinkIndex() + { + var getObjectRequest = new GetObjectRequest + { + BucketName = bucketName, + Key = key + }; + logger.LogInformation("Getting link index from s3://{bucketName}/{key}", bucketName, key); + var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest); + await using var stream = getObjectResponse.ResponseStream; + _etag = getObjectResponse.ETag; + logger.LogInformation("Successfully got link index from s3://{bucketName}/{key}", bucketName, key); + _linkIndex = LinkIndex.Deserialize(stream); + return _linkIndex; + } + + public async Task UpdateLinkIndexEntry(LinkIndexEntry linkIndexEntry) + { + _linkIndex ??= await GetLinkIndex(); + if (_linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry)) + { + var newEntryIsNewer = DateTime.Compare(linkIndexEntry.UpdatedAt, existingEntry[linkIndexEntry.Branch].UpdatedAt) > 0; + if (newEntryIsNewer) + { + existingEntry[linkIndexEntry.Branch] = linkIndexEntry; + logger.LogInformation("Updated existing entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch); + } + else + logger.LogInformation("Skipping update for {repository}@{branch} because the existing entry is newer", linkIndexEntry.Repository, linkIndexEntry.Branch); + } + else + { + _linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary + { + { linkIndexEntry.Branch, linkIndexEntry } + }); + logger.LogInformation("Added new entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch); + } + } + + public async Task Save() + { + if (_etag == null || _linkIndex == null) + throw new InvalidOperationException("You must call UpdateLinkIndexEntry() before Save()"); + var json = LinkIndex.Serialize(_linkIndex); + logger.LogInformation("Saving link index to s3://{bucketName}/{key}", bucketName, key); + var putObjectRequest = new PutObjectRequest + { + BucketName = bucketName, + Key = key, + ContentBody = json, + ContentType = "application/json", + IfMatch = _etag // Only update if the ETag matches. Meaning the object has not been changed in the meantime. + }; + _ = await s3Client.PutObjectAsync(putObjectRequest); + logger.LogInformation("Successfully saved link index to s3://{bucketName}/{key}", bucketName, key); + } +} diff --git a/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs b/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs new file mode 100644 index 000000000..cafcdb5b0 --- /dev/null +++ b/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Amazon.Lambda.Core; +using Amazon.S3; +using Amazon.S3.Model; +using Elastic.Markdown.IO.State; + +namespace Elastic.Documentation.Lambda.LinkIndexUploader; + +public class LinkReferenceProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName) +{ + public async Task GetLinkReference(string key, Cancel ctx) + { + var getObjectRequest = new GetObjectRequest + { + BucketName = bucketName, + Key = key + }; + logger.LogInformation("Getting object {key} from bucket {bucketName}", key, bucketName); + var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest, ctx); + await using var stream = getObjectResponse.ResponseStream; + logger.LogInformation("Successfully got object {key} from bucket {bucketName}", key, bucketName); + return LinkReference.Deserialize(stream); + } +} diff --git a/src/infra/docs-lambda-index-publisher/Program.cs b/src/infra/docs-lambda-index-publisher/Program.cs index fe3a2a48a..f59269274 100644 --- a/src/infra/docs-lambda-index-publisher/Program.cs +++ b/src/infra/docs-lambda-index-publisher/Program.cs @@ -2,123 +2,102 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System.Diagnostics; -using System.Text; +using System.Collections.Concurrent; using Amazon.Lambda.Core; using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.SQSEvents; using Amazon.S3; -using Amazon.S3.Model; +using Amazon.S3.Util; +using Elastic.Documentation.Lambda.LinkIndexUploader; using Elastic.Markdown.IO.State; using Elastic.Markdown.Links.CrossLinks; const string bucketName = "elastic-docs-link-index"; +const string indexFile = "link-index-test.json"; -await LambdaBootstrapBuilder.Create(Handler) - .Build() +await LambdaBootstrapBuilder.Create(Handler, new SourceGeneratorLambdaJsonSerializer()) + .Build() .RunAsync(); -// Uncomment to test locally without uploading -// await CreateLinkIndex(new AmazonS3Client()); +return; -#pragma warning disable CS8321 // Local function is declared but never used -static async Task Handler(ILambdaContext context) -#pragma warning restore CS8321 // Local function is declared but never used +// The SQS queue is configured to trigger when elastic/*/*/links.json files are created or updated. +static async Task Handler(SQSEvent ev, ILambdaContext context) { - var sw = Stopwatch.StartNew(); - - IAmazonS3 s3Client = new AmazonS3Client(); - var linkIndex = await CreateLinkIndex(s3Client); - if (linkIndex == null) - return $"Error encountered on server. getting list of objects."; - - var json = LinkIndex.Serialize(linkIndex); - - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)); - await s3Client.UploadObjectFromStreamAsync(bucketName, "link-index.json", stream, new Dictionary(), CancellationToken.None); - return $"Finished in {sw}"; -} - - -static async Task CreateLinkIndex(IAmazonS3 s3Client) -{ - var request = new ListObjectsV2Request - { - BucketName = bucketName, - MaxKeys = 1000 //default - }; - - var linkIndex = new LinkIndex - { - Repositories = [] - }; - try + var s3Client = new AmazonS3Client(); + var linkIndexProvider = new LinkIndexProvider(s3Client, context.Logger, bucketName, indexFile); + var batchItemFailures = new List(); + foreach (var message in ev.Records) { - ListObjectsV2Response response; - do + try { - response = await s3Client.ListObjectsV2Async(request, CancellationToken.None); - await Parallel.ForEachAsync(response.S3Objects, async (obj, ctx) => + var s3RecordLinkReferenceTuples = await GetS3RecordLinkReferenceTuples(s3Client, message, context); + foreach (var (s3Record, linkReference) in s3RecordLinkReferenceTuples) { - if (!obj.Key.StartsWith("elastic/", StringComparison.OrdinalIgnoreCase)) - return; - - var tokens = obj.Key.Split('/'); - if (tokens.Length < 3) - return; - - // TODO create a dedicated state file for git configuration - // Deserializing all of the links metadata adds significant overhead - var gitReference = await ReadLinkReferenceSha(s3Client, obj); - - var repository = tokens[1]; - var branch = tokens[2]; - - var entry = new LinkIndexEntry - { - Repository = repository, - Branch = branch, - ETag = obj.ETag.Trim('"'), - Path = obj.Key, - GitReference = gitReference - }; - if (linkIndex.Repositories.TryGetValue(repository, out var existingEntry)) - existingEntry[branch] = entry; - else - { - linkIndex.Repositories.Add(repository, new Dictionary - { - { branch, entry } - }); - } + var newEntry = ConvertToLinkIndexEntry(s3Record, linkReference); + await linkIndexProvider.UpdateLinkIndexEntry(newEntry); + } + } + catch (Exception e) + { + // Add failed message identifier to the batchItemFailures list + context.Logger.LogWarning(e, "Failed to process message {MessageId}", message.MessageId); + batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure + { + ItemIdentifier = message.MessageId }); - - // If the response is truncated, set the request ContinuationToken - // from the NextContinuationToken property of the response. - request.ContinuationToken = response.NextContinuationToken; - } while (response.IsTruncated); + } } - catch + try { - return null; + await linkIndexProvider.Save(); + var response = new SQSBatchResponse(batchItemFailures); + if (batchItemFailures.Count > 0) + context.Logger.LogInformation("Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue.", batchItemFailures.Count, ev.Records.Count); + return response; + } + catch (Exception ex) + { + // If we fail to update the link index, we need to return all messages to the queue + // so that they can be retried later. + context.Logger.LogError("Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue.", bucketName, indexFile, ev.Records.Count); + context.Logger.LogError(ex, ex.Message); + var response = new SQSBatchResponse(ev.Records.Select(r => new SQSBatchResponse.BatchItemFailure + { + ItemIdentifier = r.MessageId + }).ToList()); + return response; } - - return linkIndex; } -static async Task ReadLinkReferenceSha(IAmazonS3 client, S3Object obj) +static LinkIndexEntry ConvertToLinkIndexEntry(S3EventNotification.S3EventNotificationRecord record, LinkReference linkReference) { - try + var s3Object = record.S3.Object; + var keyTokens = s3Object.Key.Split('/'); + var repository = keyTokens[1]; + var branch = keyTokens[2]; + return new LinkIndexEntry { - var contents = await client.GetObjectAsync(obj.Key, obj.Key, CancellationToken.None); - await using var s = contents.ResponseStream; - var linkReference = LinkReference.Deserialize(s); - return linkReference.Origin.Ref; - } - catch (Exception e) + Repository = repository, + Branch = branch, + ETag = s3Object.ETag, + Path = s3Object.Key, + UpdatedAt = record.EventTime, + GitReference = linkReference.Origin.Ref + }; +} + +static async Task> GetS3RecordLinkReferenceTuples(IAmazonS3 s3Client, + SQSEvent.SQSMessage message, ILambdaContext context) +{ + var s3Event = S3EventNotification.ParseJson(message.Body); + var recordLinkReferenceTuples = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>(); + var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, bucketName); + await Parallel.ForEachAsync(s3Event.Records, async (record, ctx) => { - Console.WriteLine(e); - // it's important we don't fail here we need to fallback gracefully from this so we can fix the root cause - // of why a repository is not reporting its git reference properly - return "unknown"; - } + var linkReference = await linkReferenceProvider.GetLinkReference(record.S3.Object.Key, ctx); + recordLinkReferenceTuples.Add((record, linkReference)); + }); + return recordLinkReferenceTuples; } diff --git a/src/infra/docs-lambda-index-publisher/README.md b/src/infra/docs-lambda-index-publisher/README.md index 04b3bd9f4..1e45ca7de 100644 --- a/src/infra/docs-lambda-index-publisher/README.md +++ b/src/infra/docs-lambda-index-publisher/README.md @@ -1,6 +1,6 @@ # Index Registry Update Lambda Function -From a linux `x86_64` machine you can use the followint to build a AOT binary that will run +From a linux `x86_64` machine you can use the following to build a AOT binary that will run on a vanilla `Amazon Linux 2023` without any dependencies. @@ -19,4 +19,3 @@ The `bootstrap` binary should now be available under: ``` .artifacts/publish/docs-lambda-index-publisher/release_linux-x64/bootstrap ``` - diff --git a/src/infra/docs-lambda-index-publisher/SerializerContext.cs b/src/infra/docs-lambda-index-publisher/SerializerContext.cs new file mode 100644 index 000000000..9fe3d8c51 --- /dev/null +++ b/src/infra/docs-lambda-index-publisher/SerializerContext.cs @@ -0,0 +1,14 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text.Json.Serialization; +using Amazon.Lambda.SQSEvents; +using Amazon.S3.Util; + +namespace Elastic.Documentation.Lambda.LinkIndexUploader; + +[JsonSerializable(typeof(SQSEvent))] +[JsonSerializable(typeof(S3EventNotification))] +[JsonSerializable(typeof(SQSBatchResponse))] +public partial class SerializerContext : JsonSerializerContext; diff --git a/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj b/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj index 2022dbed2..a104f5e5d 100644 --- a/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj +++ b/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj @@ -21,11 +21,13 @@ + + + - From 49df98a075607b9594d7239028e09fe362de0147 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Thu, 24 Apr 2025 09:44:41 +0200 Subject: [PATCH 2/3] Remove temporary workflow for building the binary --- .github/workflows/temp.yml | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 .github/workflows/temp.yml diff --git a/.github/workflows/temp.yml b/.github/workflows/temp.yml deleted file mode 100644 index e2dc0c5a9..000000000 --- a/.github/workflows/temp.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: temp - -on: - pull_request: - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - run: | - docker build . -t publish-links-index:latest -f src/infra/docs-lambda-index-publisher/lambda.DockerFile - docker cp $(docker create --name tc publish-links-index:latest):/app/.artifacts/publish ./.artifacts && docker rm tc - - name: Upload artifact - uses: actions/upload-artifact@v4 - with: - retention-days: 1 - name: bootstrap - path: .artifacts/docs-lambda-index-publisher/release_linux-x64/bootstrap From 1aa3e4a6309a125e45e2b2ba9b173b89fe018c5f Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 28 Apr 2025 13:13:18 +0200 Subject: [PATCH 3/3] Update src/infra/docs-lambda-index-publisher/Program.cs --- src/infra/docs-lambda-index-publisher/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/infra/docs-lambda-index-publisher/Program.cs b/src/infra/docs-lambda-index-publisher/Program.cs index f59269274..2ba317e28 100644 --- a/src/infra/docs-lambda-index-publisher/Program.cs +++ b/src/infra/docs-lambda-index-publisher/Program.cs @@ -14,7 +14,7 @@ using Elastic.Markdown.Links.CrossLinks; const string bucketName = "elastic-docs-link-index"; -const string indexFile = "link-index-test.json"; +const string indexFile = "link-index.json"; await LambdaBootstrapBuilder.Create(Handler, new SourceGeneratorLambdaJsonSerializer()) .Build()