Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ Elasticsearch Document Provider。
- 不要把默认 tie-breaker 改回 `_id`:Elastic 官方文档明确说明 `_id` 不能用于 sorting,若确实要按 id 排序,应该复制到另一个启用 `doc_values` 的字段
- 也不要把这里改成 `_doc`:`_doc`/扫描顺序适合底层迭代,不是当前 read-model 查询的稳定业务分页键;当前查询路径需要一个显式、唯一、可复用 cursor 的排序字段
- `ProjectionDocumentId` 是当前 provider 的硬约束,不提供 `_id`/`_doc` fallback,也不为旧索引或旧文档做兼容兜底
- 如果索引由外部预建,必须保留 `ProjectionDocumentId` 的 `keyword` 映射,否则视为配置错误并应直接修正索引定义
- 如果索引由外部预建,必须匹配当前 provider mapping 契约,包括 `ProjectionDocumentId` 的 `keyword` 映射以及 descriptor 派生出的稳定 `keyword` / `date` 字段;不匹配视为配置错误并应直接修正或重建索引

## 自动索引映射

- 新建索引时,provider 会基于 read model 的 protobuf descriptor 补齐低风险稳定字段映射:root-level `google.protobuf.Timestamp` 映射为 `date`,root-level 稳定字符串标识字段(如 `id`、`actor_id`、`last_event_id`、`*_id`、`*_key`、`*_hash`、`*_status`、`*_kind`、`*_type`、`*_type_url`)映射为 `keyword`
- `DocumentIndexMetadata` 中显式声明的 mapping 优先,provider 不覆盖自定义 `text`、analyzer、object、nested 或其他业务 mapping
- `google.protobuf.Any`、`google.protobuf.Struct`、map、repeated message 与 repeated scalar 字段默认保持开放,不由通用 helper 递归展开
- mapping 契约变更不兼容旧 Elasticsearch index 时,直接清空或重建 projection index;provider 不做旧索引在线修复、双读 fallback 或 query-time mapping repair
- `AutoCreateIndex=true` 只会在缺失 index 时按当前契约创建新 index;如果需要保留数据,应通过 projection 重放或外部重建流程恢复数据

参考:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using Google.Protobuf.Reflection;
using Google.Protobuf.WellKnownTypes;

namespace Aevatar.CQRS.Projection.Providers.Elasticsearch.Stores;

internal static class ElasticsearchProjectionDescriptorMappingSupport
{
internal static DocumentIndexMetadata AugmentMetadata(
DocumentIndexMetadata metadata,
MessageDescriptor descriptor)
{
ArgumentNullException.ThrowIfNull(metadata);
ArgumentNullException.ThrowIfNull(descriptor);

var mappings = new Dictionary<string, object?>(metadata.Mappings, StringComparer.Ordinal);
var properties = ResolveProperties(mappings);

foreach (var field in descriptor.Fields.InDeclarationOrder())
{
if (ShouldSkipField(field) || properties.ContainsKey(field.Name))
continue;

if (IsTimestampField(field))
{
properties[field.Name] = CreateTypeMapping("date");
continue;
}

if (field.FieldType == FieldType.String && IsStableKeywordFieldName(field.Name))
properties[field.Name] = CreateTypeMapping("keyword");
}

mappings["properties"] = properties;
return metadata with { Mappings = mappings };
}

private static Dictionary<string, object?> ResolveProperties(Dictionary<string, object?> mappings)
{
if (!mappings.TryGetValue("properties", out var propertiesValue) || propertiesValue == null)
return new Dictionary<string, object?>(StringComparer.Ordinal);

if (propertiesValue is not IReadOnlyDictionary<string, object?> properties)
{
throw new InvalidOperationException(
"DocumentIndexMetadata.Mappings['properties'] must be an object map.");
}

return new Dictionary<string, object?>(properties, StringComparer.Ordinal);
}

private static bool ShouldSkipField(FieldDescriptor field)
{
if (field.IsMap || field.IsRepeated)
return true;

return field.FieldType == FieldType.Message &&
field.MessageType != null &&
(field.MessageType.FullName == Any.Descriptor.FullName ||
field.MessageType.FullName == Struct.Descriptor.FullName);
}

private static bool IsTimestampField(FieldDescriptor field)
{
return field.FieldType == FieldType.Message &&
field.MessageType != null &&
field.MessageType.FullName == Timestamp.Descriptor.FullName;
}

private static bool IsStableKeywordFieldName(string fieldName)
{
return string.Equals(fieldName, "id", StringComparison.Ordinal) ||
string.Equals(fieldName, "actor_id", StringComparison.Ordinal) ||
string.Equals(fieldName, "last_event_id", StringComparison.Ordinal) ||
fieldName.EndsWith("_id", StringComparison.Ordinal) ||
fieldName.EndsWith("_actor_id", StringComparison.Ordinal) ||
fieldName.EndsWith("_key", StringComparison.Ordinal) ||
fieldName.EndsWith("_hash", StringComparison.Ordinal) ||
fieldName.EndsWith("_revision", StringComparison.Ordinal) ||
fieldName.EndsWith("_revision_id", StringComparison.Ordinal) ||
fieldName.EndsWith("_status", StringComparison.Ordinal) ||
fieldName.EndsWith("_kind", StringComparison.Ordinal) ||
fieldName.EndsWith("_type", StringComparison.Ordinal) ||
fieldName.EndsWith("_type_url", StringComparison.Ordinal);
}

private static Dictionary<string, object?> CreateTypeMapping(string type)
{
return new Dictionary<string, object?>(StringComparer.Ordinal)
{
["type"] = type,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,26 @@ public ElasticsearchProjectionDocumentStore(
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", token);
}

var descriptor = new TReadModel().Descriptor;
var normalizedMetadata = ElasticsearchProjectionDocumentStoreMetadataSupport.NormalizeMetadata(indexMetadata);
var augmentedMetadata = ElasticsearchProjectionDescriptorMappingSupport.AugmentMetadata(
normalizedMetadata,
descriptor);
var finalMetadata = ElasticsearchProjectionDocumentStoreMetadataSupport.NormalizeMetadata(augmentedMetadata);
_indexPrefix = options.IndexPrefix?.Trim() ?? "";
var normalizedScope = ElasticsearchProjectionDocumentStoreNamingSupport.NormalizeToken(normalizedMetadata.IndexName);
var normalizedScope = ElasticsearchProjectionDocumentStoreNamingSupport.NormalizeToken(finalMetadata.IndexName);
if (normalizedScope.Length == 0)
normalizedScope = "readmodel";
_indexName = ElasticsearchProjectionDocumentStoreNamingSupport.BuildIndexName(_indexPrefix, normalizedScope);
_queryTakeMax = options.QueryTakeMax > 0 ? options.QueryTakeMax : 200;
_autoCreateIndex = options.AutoCreateIndex;
_missingIndexBehavior = options.MissingIndexBehavior;
_supportsDynamicIndexing = indexScopeSelector is not null;
_indexMetadata = normalizedMetadata with { IndexName = _indexName };
_indexMetadata = finalMetadata with { IndexName = _indexName };
_keySelector = keySelector;
_keyFormatter = keyFormatter ?? (key => key?.ToString() ?? "");
_indexScopeSelector = indexScopeSelector;
_defaultSortField = options.DefaultSortField?.Trim() ?? "";
var descriptor = new TReadModel().Descriptor;
_fieldPathResolver = BuildFieldPathResolver(descriptor);
_exactMatchFieldPathResolver = BuildExactMatchFieldPathResolver(descriptor, _indexMetadata);
_logger = logger ?? NullLogger<ElasticsearchProjectionDocumentStore<TReadModel, TKey>>.Instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,6 @@ public sealed class ScriptCatalogEntryDocumentMetadataProvider
Mappings: new Dictionary<string, object?>(StringComparer.Ordinal)
{
["dynamic"] = true,
["properties"] = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["created_at_utc_value"] = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["type"] = "date",
},
["updated_at_utc_value"] = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["type"] = "date",
},
},
},
Settings: new Dictionary<string, object?>(StringComparer.Ordinal),
Aliases: new Dictionary<string, object?>(StringComparer.Ordinal));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ public sealed class ServiceCatalogReadModelMetadataProvider : IProjectionDocumen
{
public DocumentIndexMetadata Metadata { get; } = new(
"gagent-service-catalog",
Mappings: new Dictionary<string, object?>(),
Mappings: new Dictionary<string, object?>
{
["properties"] = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["namespace"] = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["type"] = "keyword",
},
},
},
Settings: new Dictionary<string, object?>(),
Aliases: new Dictionary<string, object?>());
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Net;
using System.Text;
using System.Text.Json;
using Aevatar.CQRS.Projection.Providers.Elasticsearch.Configuration;
using Aevatar.CQRS.Projection.Providers.Elasticsearch.Stores;
using Aevatar.CQRS.Projection.Stores.Abstractions;
Expand Down Expand Up @@ -117,12 +118,164 @@ public async Task QueryAsync_WhenUsingClrFieldPaths_ShouldTranslateToProtoFieldN
});

var searchRequest = handler.CapturedRequests.Should().ContainSingle().Subject;
searchRequest.Body.Should().Contain("\"actor_id.keyword\":\"actor-1\"");
searchRequest.Body.Should().Contain("\"actor_id\":\"actor-1\"");
searchRequest.Body.Should().Contain("\"updated_at_utc_value\"");
searchRequest.Body.Should().NotContain("\"ActorId\"");
searchRequest.Body.Should().NotContain("\"UpdatedAt\"");
}

[Fact]
public async Task UpsertAsync_WhenTimestampDescriptorFieldIsUnmapped_ShouldInitializeItAsDate()
{
var handler = CreateSuccessfulUpsertHandler();

using var store = CreateStore(
new ElasticsearchProjectionDocumentStoreOptions
{
AutoCreateIndex = true,
},
handler);

await store.UpsertAsync(new TestStoreReadModel
{
Id = "actor-1",
ActorId = "actor-1",
UpdatedAt = DateTimeOffset.Parse("2026-05-15T00:00:00Z"),
});

var indexPayload = ParseJson(handler.CapturedRequests[0].Body);
GetMappingType(indexPayload, "updated_at_utc_value").Should().Be("date");
}

[Fact]
public async Task UpsertAsync_WhenStableIdentifierStringDescriptorFieldsAreUnmapped_ShouldInitializeThemAsKeyword()
{
var handler = CreateSuccessfulUpsertHandler();

using var store = CreateStore(
new ElasticsearchProjectionDocumentStoreOptions
{
AutoCreateIndex = true,
},
handler);

await store.UpsertAsync(new TestStoreReadModel
{
Id = "actor-1",
ActorId = "actor-1",
LastEventId = "event-1",
});

var indexPayload = ParseJson(handler.CapturedRequests[0].Body);
GetMappingType(indexPayload, "id").Should().Be("keyword");
GetMappingType(indexPayload, "actor_id").Should().Be("keyword");
GetMappingType(indexPayload, "last_event_id").Should().Be("keyword");
GetProperties(indexPayload).Should().NotContainKey("value");
}

[Fact]
public async Task UpsertAsync_WhenProviderDeclaresExplicitMapping_ShouldPreserveIt()
{
var handler = CreateSuccessfulUpsertHandler();
var options = new ElasticsearchProjectionDocumentStoreOptions
{
AutoCreateIndex = true,
};
options.Endpoints = ["http://localhost:9200"];

using var store = new ElasticsearchProjectionDocumentStore<TestStoreReadModel, string>(
options,
new DocumentIndexMetadata(
IndexName: "projection-core-tests",
Mappings: new Dictionary<string, object?>
{
["properties"] = new Dictionary<string, object?>
{
["actor_id"] = new Dictionary<string, object?>
{
["type"] = "text",
["analyzer"] = "standard",
},
},
},
Settings: new Dictionary<string, object?>(),
Aliases: new Dictionary<string, object?>()),
keySelector: model => model.Id,
keyFormatter: key => key,
httpMessageHandler: handler);

await store.UpsertAsync(new TestStoreReadModel
{
Id = "actor-1",
ActorId = "actor-1",
});

var indexPayload = ParseJson(handler.CapturedRequests[0].Body);
var actorIdMapping = GetFieldMapping(indexPayload, "actor_id");
actorIdMapping.GetProperty("type").GetString().Should().Be("text");
actorIdMapping.GetProperty("analyzer").GetString().Should().Be("standard");
}

[Fact]
public async Task UpsertAsync_WhenDescriptorContainsOpenFields_ShouldNotInitializeStaticMappingsForThem()
{
var handler = CreateSuccessfulUpsertHandler();
var options = new ElasticsearchProjectionDocumentStoreOptions
{
AutoCreateIndex = true,
};
options.Endpoints = ["http://localhost:9200"];

using var store = new ElasticsearchProjectionDocumentStore<TestRecursiveWellKnownReadModel, string>(
options,
new DocumentIndexMetadata(
IndexName: "projection-core-tests",
Mappings: new Dictionary<string, object?>(),
Settings: new Dictionary<string, object?>(),
Aliases: new Dictionary<string, object?>()),
keySelector: model => model.Id,
keyFormatter: key => key,
httpMessageHandler: handler);

await store.UpsertAsync(new TestRecursiveWellKnownReadModel
{
Id = "actor-1",
ActorId = "actor-1",
UpdatedAt = DateTimeOffset.Parse("2026-05-15T00:00:00Z"),
});

var indexPayload = ParseJson(handler.CapturedRequests[0].Body);
var properties = GetProperties(indexPayload);
properties.Should().NotContainKey("fields_value");
properties.Should().NotContainKey("open_payload");
properties.Should().NotContainKey("labels");
properties.Should().NotContainKey("entries");
properties.Should().NotContainKey("tags");
GetMappingType(indexPayload, "updated_at_utc_value").Should().Be("date");
}

[Fact]
public async Task UpsertAsync_WhenMetadataOmitsProjectionDocumentId_ShouldInitializeItAsKeyword()
{
var handler = CreateSuccessfulUpsertHandler();

using var store = CreateStore(
new ElasticsearchProjectionDocumentStoreOptions
{
AutoCreateIndex = true,
},
handler);

await store.UpsertAsync(new TestStoreReadModel
{
Id = "actor-1",
ActorId = "actor-1",
});

var indexPayload = ParseJson(handler.CapturedRequests[0].Body);
GetMappingType(indexPayload, "ProjectionDocumentId").Should().Be("keyword");
}

[Fact]
public async Task QueryAsync_WhenUsingExplicitTimestampSort_ShouldIncludeMissingAndUnmappedHints()
{
Expand Down Expand Up @@ -721,6 +874,40 @@ private static ElasticsearchProjectionDocumentStore<TestStoreReadModel, string>
httpMessageHandler: handler);
}

private static ScriptedHttpMessageHandler CreateSuccessfulUpsertHandler()
{
var handler = new ScriptedHttpMessageHandler();
handler.EnqueueResponse(_ => CreateJsonResponse(HttpStatusCode.OK, """{"acknowledged":true}"""));
handler.EnqueueResponse(_ => CreateJsonResponse(HttpStatusCode.NotFound, """{"found":false}"""));
handler.EnqueueResponse(_ => CreateJsonResponse(HttpStatusCode.OK, """{"result":"created"}"""));
return handler;
}

private static JsonElement ParseJson(string json)
{
using var document = System.Text.Json.JsonDocument.Parse(json);
return document.RootElement.Clone();
}

private static IReadOnlyDictionary<string, JsonElement> GetProperties(JsonElement indexPayload)
{
return indexPayload
.GetProperty("mappings")
.GetProperty("properties")
.EnumerateObject()
.ToDictionary(x => x.Name, x => x.Value.Clone(), StringComparer.Ordinal);
}

private static JsonElement GetFieldMapping(JsonElement indexPayload, string fieldName)
{
return GetProperties(indexPayload)[fieldName];
}

private static string? GetMappingType(JsonElement indexPayload, string fieldName)
{
return GetFieldMapping(indexPayload, fieldName).GetProperty("type").GetString();
}

private static HttpResponseMessage CreateJsonResponse(HttpStatusCode statusCode, string json)
{
return new HttpResponseMessage(statusCode)
Expand Down
Loading
Loading