Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
</ItemGroup>
<!-- AWS -->
<ItemGroup>
<PackageVersion Include="Amazon.Lambda.RuntimeSupport" Version="1.13.0"/>
<PackageVersion Include="Amazon.Lambda.Core" Version="2.5.1"/>
<PackageVersion Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
<PackageVersion Include="Amazon.Lambda.RuntimeSupport" Version="1.13.0" />
<PackageVersion Include="Amazon.Lambda.Core" Version="2.5.1" />
<PackageVersion Include="Amazon.Lambda.S3Events" Version="3.1.0" />
<PackageVersion Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4" />
<PackageVersion Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.400.135" />
<PackageVersion Include="AWSSDK.S3" Version="3.7.416.16"/>
</ItemGroup>

<!-- Build -->
<ItemGroup>
<PackageVersion Include="Argu" Version="6.2.5" />
Expand All @@ -23,42 +25,40 @@
<PackageVersion Include="Fake.IO.Zip" Version="6.1.3" />
<PackageVersion Include="FSharp.Core" Version="9.0.202" />
</ItemGroup>

<ItemGroup>
<PackageVersion Include="ConsoleAppFramework" Version="5.4.1" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive"/>
<PackageVersion Include="ConsoleAppFramework" Version="5.4.1" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive" />
<PackageVersion Include="ConsoleAppFramework.Abstractions" Version="5.4.1" />
<PackageVersion Include="Crayon" Version="2.0.69"/>
<PackageVersion Include="Crayon" Version="2.0.69" />
<PackageVersion Include="DotNet.Glob" Version="3.1.3" />
<PackageVersion Include="Errata" Version="0.14.0" />
<PackageVersion Include="Github.Actions.Core" Version="9.0.0"/>
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.4" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.4" />
<PackageVersion Include="Markdig" Version="0.41.1" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.4"/>
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.4"/>
<PackageVersion Include="NetEscapades.EnumGenerators" Version="1.0.0-beta12" PrivateAssets="all" ExcludeAssets="runtime" />
<PackageVersion Include="Proc" Version="0.9.1" />
<PackageVersion Include="RazorSlices" Version="0.8.1" />
<PackageVersion Include="Samboy063.Tomlet" Version="6.0.0" />
<PackageVersion Include="Slugify.Core" Version="4.0.1" />
<PackageVersion Include="SoftCircuits.IniFileParser" Version="2.7.0" />
<PackageVersion Include="System.IO.Abstractions" Version="21.0.29" />
<PackageVersion Include="Utf8StreamReader" Version="1.3.2"/>
<PackageVersion Include="Vecc.YamlDotNet.Analyzers.StaticGenerator" Version="16.1.3" PrivateAssets="All"/>
<PackageVersion Include="Utf8StreamReader" Version="1.3.2" />
<PackageVersion Include="Vecc.YamlDotNet.Analyzers.StaticGenerator" Version="16.1.3" PrivateAssets="All" />
<PackageVersion Include="Westwind.AspNetCore.LiveReload" Version="0.5.2" />
<PackageVersion Include="YamlDotNet" Version="16.3.0" />
</ItemGroup>

<!-- Test packages -->
<ItemGroup>
<PackageVersion Include="AngleSharp.Diffing" Version="1.0.0"/>
<PackageVersion Include="DiffPlex" Version="1.7.2"/>
<PackageVersion Include="FluentAssertions" Version="6.12.1"/>
<PackageVersion Include="FsUnit.xUnit" Version="7.0.1"/>
<PackageVersion Include="AngleSharp.Diffing" Version="1.0.0" />
<PackageVersion Include="DiffPlex" Version="1.7.2" />
<PackageVersion Include="FluentAssertions" Version="6.12.1" />
<PackageVersion Include="FsUnit.xUnit" Version="7.0.1" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="JetBrains.Annotations" Version="2024.3.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0"/>
<PackageVersion Include="System.IO.Abstractions.TestingHelpers" Version="21.0.29"/>
<PackageVersion Include="Unquote" Version="7.0.1"/>
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2"/>
<PackageVersion Include="xunit.v3" Version="1.1.0"/>
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageVersion Include="System.IO.Abstractions.TestingHelpers" Version="21.0.29" />
<PackageVersion Include="Unquote" Version="7.0.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
<PackageVersion Include="xunit.v3" Version="1.1.0" />
</ItemGroup>
</Project>
</Project>
4 changes: 4 additions & 0 deletions src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public record LinkIndexEntry
[JsonPropertyName("etag")]
public required string ETag { get; init; }

// TODO can be made required after all doc_sets have published again
[JsonPropertyName("updated_at")]
public DateTime UpdatedAt { get; init; } = DateTime.MinValue;

// TODO can be made required after all doc_sets have published again
[JsonPropertyName("ref")]
public string GitReference { get; init; } = "unknown";
Expand Down
80 changes: 80 additions & 0 deletions src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Amazon.Lambda.Core;
using Amazon.S3;
using Amazon.S3.Model;
using Elastic.Markdown.Links.CrossLinks;

namespace Elastic.Documentation.Lambda.LinkIndexUploader;

/// <summary>
/// Gets the link index from S3 once.
/// You can then update the link index with <see cref="UpdateLinkIndexEntry(LinkIndexEntry)"/> and save it with <see cref="Save()"/>.
/// If the link index changed in the meantime, <see cref="Save()"/> will throw an exception,
/// thus all the messages from the queue will be sent back to the queue.
/// </summary>
public class LinkIndexProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key)
{
private string? _etag;
private LinkIndex? _linkIndex;

private async Task<LinkIndex> GetLinkIndex()
{
var getObjectRequest = new GetObjectRequest
{
BucketName = bucketName,
Key = key
};
logger.LogInformation("Getting link index from s3://{bucketName}/{key}", bucketName, key);
var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest);
await using var stream = getObjectResponse.ResponseStream;
_etag = getObjectResponse.ETag;
logger.LogInformation("Successfully got link index from s3://{bucketName}/{key}", bucketName, key);
_linkIndex = LinkIndex.Deserialize(stream);
return _linkIndex;
}

public async Task UpdateLinkIndexEntry(LinkIndexEntry linkIndexEntry)
{
_linkIndex ??= await GetLinkIndex();
if (_linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry))
{
var newEntryIsNewer = DateTime.Compare(linkIndexEntry.UpdatedAt, existingEntry[linkIndexEntry.Branch].UpdatedAt) > 0;
if (newEntryIsNewer)
{
existingEntry[linkIndexEntry.Branch] = linkIndexEntry;
logger.LogInformation("Updated existing entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
}
else
logger.LogInformation("Skipping update for {repository}@{branch} because the existing entry is newer", linkIndexEntry.Repository, linkIndexEntry.Branch);
}
else
{
_linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary<string, LinkIndexEntry>
{
{ linkIndexEntry.Branch, linkIndexEntry }
});
logger.LogInformation("Added new entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
}
}

public async Task Save()
{
if (_etag == null || _linkIndex == null)
throw new InvalidOperationException("You must call UpdateLinkIndexEntry() before Save()");
var json = LinkIndex.Serialize(_linkIndex);
logger.LogInformation("Saving link index to s3://{bucketName}/{key}", bucketName, key);
var putObjectRequest = new PutObjectRequest
{
BucketName = bucketName,
Key = key,
ContentBody = json,
ContentType = "application/json",
IfMatch = _etag // Only update if the ETag matches. Meaning the object has not been changed in the meantime.
};
_ = await s3Client.PutObjectAsync(putObjectRequest);
logger.LogInformation("Successfully saved link index to s3://{bucketName}/{key}", bucketName, key);
}
}
27 changes: 27 additions & 0 deletions src/infra/docs-lambda-index-publisher/LinkReferenceProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Amazon.Lambda.Core;
using Amazon.S3;
using Amazon.S3.Model;
using Elastic.Markdown.IO.State;

namespace Elastic.Documentation.Lambda.LinkIndexUploader;

public class LinkReferenceProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName)
{
public async Task<LinkReference> GetLinkReference(string key, Cancel ctx)
{
var getObjectRequest = new GetObjectRequest
{
BucketName = bucketName,
Key = key
};
logger.LogInformation("Getting object {key} from bucket {bucketName}", key, bucketName);
var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest, ctx);
await using var stream = getObjectResponse.ResponseStream;
logger.LogInformation("Successfully got object {key} from bucket {bucketName}", key, bucketName);
return LinkReference.Deserialize(stream);
}
}
169 changes: 74 additions & 95 deletions src/infra/docs-lambda-index-publisher/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,123 +2,102 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using System.Text;
using System.Collections.Concurrent;
using Amazon.Lambda.Core;
using Amazon.Lambda.RuntimeSupport;
using Amazon.Lambda.Serialization.SystemTextJson;
using Amazon.Lambda.SQSEvents;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Util;
using Elastic.Documentation.Lambda.LinkIndexUploader;
using Elastic.Markdown.IO.State;
using Elastic.Markdown.Links.CrossLinks;

const string bucketName = "elastic-docs-link-index";
const string indexFile = "link-index.json";

await LambdaBootstrapBuilder.Create(Handler)
.Build()
await LambdaBootstrapBuilder.Create<SQSEvent, SQSBatchResponse>(Handler, new SourceGeneratorLambdaJsonSerializer<SerializerContext>())
.Build()
.RunAsync();

// Uncomment to test locally without uploading
// await CreateLinkIndex(new AmazonS3Client());
return;

#pragma warning disable CS8321 // Local function is declared but never used
static async Task<string> Handler(ILambdaContext context)
#pragma warning restore CS8321 // Local function is declared but never used
// The SQS queue is configured to trigger when elastic/*/*/links.json files are created or updated.
static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
{
var sw = Stopwatch.StartNew();

IAmazonS3 s3Client = new AmazonS3Client();
var linkIndex = await CreateLinkIndex(s3Client);
if (linkIndex == null)
return $"Error encountered on server. getting list of objects.";

var json = LinkIndex.Serialize(linkIndex);

using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));
await s3Client.UploadObjectFromStreamAsync(bucketName, "link-index.json", stream, new Dictionary<string, object>(), CancellationToken.None);
return $"Finished in {sw}";
}


static async Task<LinkIndex?> CreateLinkIndex(IAmazonS3 s3Client)
{
var request = new ListObjectsV2Request
{
BucketName = bucketName,
MaxKeys = 1000 //default
};

var linkIndex = new LinkIndex
{
Repositories = []
};
try
var s3Client = new AmazonS3Client();
var linkIndexProvider = new LinkIndexProvider(s3Client, context.Logger, bucketName, indexFile);
var batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
foreach (var message in ev.Records)
{
ListObjectsV2Response response;
do
try
{
response = await s3Client.ListObjectsV2Async(request, CancellationToken.None);
await Parallel.ForEachAsync(response.S3Objects, async (obj, ctx) =>
var s3RecordLinkReferenceTuples = await GetS3RecordLinkReferenceTuples(s3Client, message, context);
foreach (var (s3Record, linkReference) in s3RecordLinkReferenceTuples)
{
if (!obj.Key.StartsWith("elastic/", StringComparison.OrdinalIgnoreCase))
return;

var tokens = obj.Key.Split('/');
if (tokens.Length < 3)
return;

// TODO create a dedicated state file for git configuration
// Deserializing all of the links metadata adds significant overhead
var gitReference = await ReadLinkReferenceSha(s3Client, obj);

var repository = tokens[1];
var branch = tokens[2];

var entry = new LinkIndexEntry
{
Repository = repository,
Branch = branch,
ETag = obj.ETag.Trim('"'),
Path = obj.Key,
GitReference = gitReference
};
if (linkIndex.Repositories.TryGetValue(repository, out var existingEntry))
existingEntry[branch] = entry;
else
{
linkIndex.Repositories.Add(repository, new Dictionary<string, LinkIndexEntry>
{
{ branch, entry }
});
}
var newEntry = ConvertToLinkIndexEntry(s3Record, linkReference);
await linkIndexProvider.UpdateLinkIndexEntry(newEntry);
}
}
catch (Exception e)
{
// Add failed message identifier to the batchItemFailures list
context.Logger.LogWarning(e, "Failed to process message {MessageId}", message.MessageId);
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure
{
ItemIdentifier = message.MessageId
});

// If the response is truncated, set the request ContinuationToken
// from the NextContinuationToken property of the response.
request.ContinuationToken = response.NextContinuationToken;
} while (response.IsTruncated);
}
}
catch
try
{
return null;
await linkIndexProvider.Save();
var response = new SQSBatchResponse(batchItemFailures);
if (batchItemFailures.Count > 0)
context.Logger.LogInformation("Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue.", batchItemFailures.Count, ev.Records.Count);
return response;
}
catch (Exception ex)
{
// If we fail to update the link index, we need to return all messages to the queue
// so that they can be retried later.
context.Logger.LogError("Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue.", bucketName, indexFile, ev.Records.Count);
context.Logger.LogError(ex, ex.Message);
var response = new SQSBatchResponse(ev.Records.Select(r => new SQSBatchResponse.BatchItemFailure
{
ItemIdentifier = r.MessageId
}).ToList());
return response;
}

return linkIndex;
}

static async Task<string> ReadLinkReferenceSha(IAmazonS3 client, S3Object obj)
static LinkIndexEntry ConvertToLinkIndexEntry(S3EventNotification.S3EventNotificationRecord record, LinkReference linkReference)
{
try
var s3Object = record.S3.Object;
var keyTokens = s3Object.Key.Split('/');
var repository = keyTokens[1];
var branch = keyTokens[2];
return new LinkIndexEntry
{
var contents = await client.GetObjectAsync(obj.Key, obj.Key, CancellationToken.None);
await using var s = contents.ResponseStream;
var linkReference = LinkReference.Deserialize(s);
return linkReference.Origin.Ref;
}
catch (Exception e)
Repository = repository,
Branch = branch,
ETag = s3Object.ETag,
Path = s3Object.Key,
UpdatedAt = record.EventTime,
GitReference = linkReference.Origin.Ref
};
}

static async Task<IReadOnlyCollection<(S3EventNotification.S3EventNotificationRecord, LinkReference)>> GetS3RecordLinkReferenceTuples(IAmazonS3 s3Client,
SQSEvent.SQSMessage message, ILambdaContext context)
{
var s3Event = S3EventNotification.ParseJson(message.Body);
var recordLinkReferenceTuples = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>();
var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, bucketName);
await Parallel.ForEachAsync(s3Event.Records, async (record, ctx) =>
{
Console.WriteLine(e);
// it's important we don't fail here we need to fallback gracefully from this so we can fix the root cause
// of why a repository is not reporting its git reference properly
return "unknown";
}
var linkReference = await linkReferenceProvider.GetLinkReference(record.S3.Object.Key, ctx);
recordLinkReferenceTuples.Add((record, linkReference));
});
return recordLinkReferenceTuples;
}
Loading
Loading