diff --git a/Directory.Packages.props b/Directory.Packages.props
index 881dd4dad..6803cc34b 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -8,12 +8,14 @@
-
-
-
+
+
+
+
+
+
-
@@ -23,17 +25,16 @@
-
-
+
-
+
+
+
-
-
@@ -41,24 +42,23 @@
-
-
+
+
-
-
-
-
-
+
+
+
+
-
-
-
-
-
+
+
+
+
+
-
+
\ No newline at end of file
diff --git a/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs b/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs
index dd5b1fa24..60a070aef 100644
--- a/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs
+++ b/src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs
@@ -38,6 +38,10 @@ public record LinkIndexEntry
[JsonPropertyName("etag")]
public required string ETag { get; init; }
+ // TODO can be made required after all doc_sets have published again
+ [JsonPropertyName("updated_at")]
+ public DateTime UpdatedAt { get; init; } = DateTime.MinValue;
+
// TODO can be made required after all doc_sets have published again
[JsonPropertyName("ref")]
public string GitReference { get; init; } = "unknown";
diff --git a/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs b/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs
new file mode 100644
index 000000000..6b00ab82b
--- /dev/null
+++ b/src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs
@@ -0,0 +1,80 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using Amazon.Lambda.Core;
+using Amazon.S3;
+using Amazon.S3.Model;
+using Elastic.Markdown.Links.CrossLinks;
+
+namespace Elastic.Documentation.Lambda.LinkIndexUploader;
+
+///
+/// Gets the link index from S3 once.
+/// You can then update the link index with and save it with .
+/// If the link index changed in the meantime, will throw an exception,
+/// thus all the messages from the queue will be sent back to the queue.
+///
+public class LinkIndexProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key)
+{
+ private string? _etag;
+ private LinkIndex? _linkIndex;
+
+ private async Task GetLinkIndex()
+ {
+ var getObjectRequest = new GetObjectRequest
+ {
+ BucketName = bucketName,
+ Key = key
+ };
+ logger.LogInformation("Getting link index from s3://{bucketName}/{key}", bucketName, key);
+ var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest);
+ await using var stream = getObjectResponse.ResponseStream;
+ _etag = getObjectResponse.ETag;
+ logger.LogInformation("Successfully got link index from s3://{bucketName}/{key}", bucketName, key);
+ _linkIndex = LinkIndex.Deserialize(stream);
+ return _linkIndex;
+ }
+
+ public async Task UpdateLinkIndexEntry(LinkIndexEntry linkIndexEntry)
+ {
+ _linkIndex ??= await GetLinkIndex();
+ if (_linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry))
+ {
+ var newEntryIsNewer = DateTime.Compare(linkIndexEntry.UpdatedAt, existingEntry[linkIndexEntry.Branch].UpdatedAt) > 0;
+ if (newEntryIsNewer)
+ {
+ existingEntry[linkIndexEntry.Branch] = linkIndexEntry;
+ logger.LogInformation("Updated existing entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
+ }
+ else
+ logger.LogInformation("Skipping update for {repository}@{branch} because the existing entry is newer", linkIndexEntry.Repository, linkIndexEntry.Branch);
+ }
+ else
+ {
+ _linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary
+ {
+ { linkIndexEntry.Branch, linkIndexEntry }
+ });
+ logger.LogInformation("Added new entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
+ }
+ }
+
+ public async Task Save()
+ {
+ if (_etag == null || _linkIndex == null)
+ throw new InvalidOperationException("You must call UpdateLinkIndexEntry() before Save()");
+ var json = LinkIndex.Serialize(_linkIndex);
+ logger.LogInformation("Saving link index to s3://{bucketName}/{key}", bucketName, key);
+ var putObjectRequest = new PutObjectRequest
+ {
+ BucketName = bucketName,
+ Key = key,
+ ContentBody = json,
+ ContentType = "application/json",
+ IfMatch = _etag // Only update if the ETag matches. Meaning the object has not been changed in the meantime.
+ };
+ _ = await s3Client.PutObjectAsync(putObjectRequest);
+ logger.LogInformation("Successfully saved link index to s3://{bucketName}/{key}", bucketName, key);
+ }
+}
diff --git a/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs b/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs
new file mode 100644
index 000000000..cafcdb5b0
--- /dev/null
+++ b/src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs
@@ -0,0 +1,27 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using Amazon.Lambda.Core;
+using Amazon.S3;
+using Amazon.S3.Model;
+using Elastic.Markdown.IO.State;
+
+namespace Elastic.Documentation.Lambda.LinkIndexUploader;
+
+public class LinkReferenceProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName)
+{
+ public async Task GetLinkReference(string key, Cancel ctx)
+ {
+ var getObjectRequest = new GetObjectRequest
+ {
+ BucketName = bucketName,
+ Key = key
+ };
+ logger.LogInformation("Getting object {key} from bucket {bucketName}", key, bucketName);
+ var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest, ctx);
+ await using var stream = getObjectResponse.ResponseStream;
+ logger.LogInformation("Successfully got object {key} from bucket {bucketName}", key, bucketName);
+ return LinkReference.Deserialize(stream);
+ }
+}
diff --git a/src/infra/docs-lambda-index-publisher/Program.cs b/src/infra/docs-lambda-index-publisher/Program.cs
index fe3a2a48a..2ba317e28 100644
--- a/src/infra/docs-lambda-index-publisher/Program.cs
+++ b/src/infra/docs-lambda-index-publisher/Program.cs
@@ -2,123 +2,102 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
-using System.Diagnostics;
-using System.Text;
+using System.Collections.Concurrent;
using Amazon.Lambda.Core;
using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+using Amazon.Lambda.SQSEvents;
using Amazon.S3;
-using Amazon.S3.Model;
+using Amazon.S3.Util;
+using Elastic.Documentation.Lambda.LinkIndexUploader;
using Elastic.Markdown.IO.State;
using Elastic.Markdown.Links.CrossLinks;
const string bucketName = "elastic-docs-link-index";
+const string indexFile = "link-index.json";
-await LambdaBootstrapBuilder.Create(Handler)
- .Build()
+await LambdaBootstrapBuilder.Create(Handler, new SourceGeneratorLambdaJsonSerializer())
+ .Build()
.RunAsync();
-// Uncomment to test locally without uploading
-// await CreateLinkIndex(new AmazonS3Client());
+return;
-#pragma warning disable CS8321 // Local function is declared but never used
-static async Task Handler(ILambdaContext context)
-#pragma warning restore CS8321 // Local function is declared but never used
+// The SQS queue is configured to trigger when elastic/*/*/links.json files are created or updated.
+static async Task Handler(SQSEvent ev, ILambdaContext context)
{
- var sw = Stopwatch.StartNew();
-
- IAmazonS3 s3Client = new AmazonS3Client();
- var linkIndex = await CreateLinkIndex(s3Client);
- if (linkIndex == null)
- return $"Error encountered on server. getting list of objects.";
-
- var json = LinkIndex.Serialize(linkIndex);
-
- using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));
- await s3Client.UploadObjectFromStreamAsync(bucketName, "link-index.json", stream, new Dictionary(), CancellationToken.None);
- return $"Finished in {sw}";
-}
-
-
-static async Task CreateLinkIndex(IAmazonS3 s3Client)
-{
- var request = new ListObjectsV2Request
- {
- BucketName = bucketName,
- MaxKeys = 1000 //default
- };
-
- var linkIndex = new LinkIndex
- {
- Repositories = []
- };
- try
+ var s3Client = new AmazonS3Client();
+ var linkIndexProvider = new LinkIndexProvider(s3Client, context.Logger, bucketName, indexFile);
+ var batchItemFailures = new List();
+ foreach (var message in ev.Records)
{
- ListObjectsV2Response response;
- do
+ try
{
- response = await s3Client.ListObjectsV2Async(request, CancellationToken.None);
- await Parallel.ForEachAsync(response.S3Objects, async (obj, ctx) =>
+ var s3RecordLinkReferenceTuples = await GetS3RecordLinkReferenceTuples(s3Client, message, context);
+ foreach (var (s3Record, linkReference) in s3RecordLinkReferenceTuples)
{
- if (!obj.Key.StartsWith("elastic/", StringComparison.OrdinalIgnoreCase))
- return;
-
- var tokens = obj.Key.Split('/');
- if (tokens.Length < 3)
- return;
-
- // TODO create a dedicated state file for git configuration
- // Deserializing all of the links metadata adds significant overhead
- var gitReference = await ReadLinkReferenceSha(s3Client, obj);
-
- var repository = tokens[1];
- var branch = tokens[2];
-
- var entry = new LinkIndexEntry
- {
- Repository = repository,
- Branch = branch,
- ETag = obj.ETag.Trim('"'),
- Path = obj.Key,
- GitReference = gitReference
- };
- if (linkIndex.Repositories.TryGetValue(repository, out var existingEntry))
- existingEntry[branch] = entry;
- else
- {
- linkIndex.Repositories.Add(repository, new Dictionary
- {
- { branch, entry }
- });
- }
+ var newEntry = ConvertToLinkIndexEntry(s3Record, linkReference);
+ await linkIndexProvider.UpdateLinkIndexEntry(newEntry);
+ }
+ }
+ catch (Exception e)
+ {
+ // Add failed message identifier to the batchItemFailures list
+ context.Logger.LogWarning(e, "Failed to process message {MessageId}", message.MessageId);
+ batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure
+ {
+ ItemIdentifier = message.MessageId
});
-
- // If the response is truncated, set the request ContinuationToken
- // from the NextContinuationToken property of the response.
- request.ContinuationToken = response.NextContinuationToken;
- } while (response.IsTruncated);
+ }
}
- catch
+ try
{
- return null;
+ await linkIndexProvider.Save();
+ var response = new SQSBatchResponse(batchItemFailures);
+ if (batchItemFailures.Count > 0)
+ context.Logger.LogInformation("Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue.", batchItemFailures.Count, ev.Records.Count);
+ return response;
+ }
+ catch (Exception ex)
+ {
+ // If we fail to update the link index, we need to return all messages to the queue
+ // so that they can be retried later.
+ context.Logger.LogError("Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue.", bucketName, indexFile, ev.Records.Count);
+ context.Logger.LogError(ex, ex.Message);
+ var response = new SQSBatchResponse(ev.Records.Select(r => new SQSBatchResponse.BatchItemFailure
+ {
+ ItemIdentifier = r.MessageId
+ }).ToList());
+ return response;
}
-
- return linkIndex;
}
-static async Task ReadLinkReferenceSha(IAmazonS3 client, S3Object obj)
+static LinkIndexEntry ConvertToLinkIndexEntry(S3EventNotification.S3EventNotificationRecord record, LinkReference linkReference)
{
- try
+ var s3Object = record.S3.Object;
+ var keyTokens = s3Object.Key.Split('/');
+ var repository = keyTokens[1];
+ var branch = keyTokens[2];
+ return new LinkIndexEntry
{
- var contents = await client.GetObjectAsync(obj.Key, obj.Key, CancellationToken.None);
- await using var s = contents.ResponseStream;
- var linkReference = LinkReference.Deserialize(s);
- return linkReference.Origin.Ref;
- }
- catch (Exception e)
+ Repository = repository,
+ Branch = branch,
+ ETag = s3Object.ETag,
+ Path = s3Object.Key,
+ UpdatedAt = record.EventTime,
+ GitReference = linkReference.Origin.Ref
+ };
+}
+
+static async Task> GetS3RecordLinkReferenceTuples(IAmazonS3 s3Client,
+ SQSEvent.SQSMessage message, ILambdaContext context)
+{
+ var s3Event = S3EventNotification.ParseJson(message.Body);
+ var recordLinkReferenceTuples = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>();
+ var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, bucketName);
+ await Parallel.ForEachAsync(s3Event.Records, async (record, ctx) =>
{
- Console.WriteLine(e);
- // it's important we don't fail here we need to fallback gracefully from this so we can fix the root cause
- // of why a repository is not reporting its git reference properly
- return "unknown";
- }
+ var linkReference = await linkReferenceProvider.GetLinkReference(record.S3.Object.Key, ctx);
+ recordLinkReferenceTuples.Add((record, linkReference));
+ });
+ return recordLinkReferenceTuples;
}
diff --git a/src/infra/docs-lambda-index-publisher/README.md b/src/infra/docs-lambda-index-publisher/README.md
index 04b3bd9f4..1e45ca7de 100644
--- a/src/infra/docs-lambda-index-publisher/README.md
+++ b/src/infra/docs-lambda-index-publisher/README.md
@@ -1,6 +1,6 @@
# Index Registry Update Lambda Function
-From a linux `x86_64` machine you can use the followint to build a AOT binary that will run
+From a linux `x86_64` machine you can use the following to build a AOT binary that will run
on a vanilla `Amazon Linux 2023` without any dependencies.
@@ -19,4 +19,3 @@ The `bootstrap` binary should now be available under:
```
.artifacts/publish/docs-lambda-index-publisher/release_linux-x64/bootstrap
```
-
diff --git a/src/infra/docs-lambda-index-publisher/SerializerContext.cs b/src/infra/docs-lambda-index-publisher/SerializerContext.cs
new file mode 100644
index 000000000..9fe3d8c51
--- /dev/null
+++ b/src/infra/docs-lambda-index-publisher/SerializerContext.cs
@@ -0,0 +1,14 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System.Text.Json.Serialization;
+using Amazon.Lambda.SQSEvents;
+using Amazon.S3.Util;
+
+namespace Elastic.Documentation.Lambda.LinkIndexUploader;
+
+[JsonSerializable(typeof(SQSEvent))]
+[JsonSerializable(typeof(S3EventNotification))]
+[JsonSerializable(typeof(SQSBatchResponse))]
+public partial class SerializerContext : JsonSerializerContext;
diff --git a/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj b/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj
index 2022dbed2..a104f5e5d 100644
--- a/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj
+++ b/src/infra/docs-lambda-index-publisher/docs-lambda-index-publisher.csproj
@@ -21,11 +21,13 @@
+
+
+
-