Skip to content

Commit 3740fc1

Browse files
committed
Improve bulk so initial test passes
1 parent 9410d1a commit 3740fc1

File tree

14 files changed

+422
-97
lines changed

14 files changed

+422
-97
lines changed

src/Elastic.Clients.Elasticsearch/Common/Infer/Id/Id.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
using System;
22
using System.Diagnostics;
33
using System.Globalization;
4+
using System.Text.Json;
45
using System.Text.Json.Serialization;
56
using Elastic.Transport;
67

78
namespace Elastic.Clients.Elasticsearch
89
{
910
[DebuggerDisplay("{DebugDisplay,nq}")]
10-
[JsonConverter(typeof(StringAliasConverter<Id>))]
11+
[JsonConverter(typeof(Id))]
1112
public class Id : IEquatable<Id>, IUrlParameter
1213
{
1314
public Id(string id)
@@ -105,4 +106,39 @@ public override int GetHashCode()
105106

106107
public static bool operator !=(Id left, Id right) => !Equals(left, right);
107108
}
109+
110+
internal sealed class IdConverter : JsonConverter<Id>
111+
{
112+
private readonly IElasticsearchClientSettings _settings;
113+
114+
public IdConverter(IElasticsearchClientSettings settings) => _settings = settings;
115+
116+
public override Id? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) =>
117+
reader.TokenType == JsonTokenType.Number
118+
? new Id(reader.GetInt64())
119+
: new Id(reader.GetString());
120+
121+
public override void Write(Utf8JsonWriter writer, Id value, JsonSerializerOptions options)
122+
{
123+
if (value is null)
124+
{
125+
writer.WriteNullValue();
126+
return;
127+
}
128+
129+
if (value.Document is not null)
130+
{
131+
var documentId = _settings.Inferrer.Id(value.Document.GetType(), value.Document);
132+
writer.WriteStringValue(documentId);
133+
}
134+
else if (value.LongValue.HasValue)
135+
{
136+
writer.WriteNumberValue(value.LongValue.Value);
137+
}
138+
else
139+
{
140+
writer.WriteStringValue(value.StringValue);
141+
}
142+
}
143+
}
108144
}

src/Elastic.Clients.Elasticsearch/Common/Infer/JoinFieldRouting/Routing.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
using System.Diagnostics;
33
using System.Globalization;
44
using System.Linq;
5+
using System.Text.Json;
6+
using System.Text.Json.Serialization;
57
using Elastic.Transport;
68

79
namespace Elastic.Clients.Elasticsearch
810
{
11+
[JsonConverter(typeof(RoutingConverter))]
912
[DebuggerDisplay("{" + nameof(DebugDisplay) + ",nq}")]
1013
public class Routing : IEquatable<Routing>, IUrlParameter
1114
{
@@ -167,4 +170,45 @@ public override int GetHashCode()
167170
// return !resolved.IsNullOrEmpty();
168171
//}
169172
}
173+
174+
internal sealed class RoutingConverter : JsonConverter<Routing>
175+
{
176+
private readonly IElasticsearchClientSettings _settings;
177+
178+
public RoutingConverter(IElasticsearchClientSettings settings) => _settings = settings;
179+
180+
public override Routing? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) =>
181+
reader.TokenType == JsonTokenType.Number
182+
? new Routing(reader.GetInt64())
183+
: new Routing(reader.GetString());
184+
185+
public override void Write(Utf8JsonWriter writer, Routing value, JsonSerializerOptions options)
186+
{
187+
if (value is null)
188+
{
189+
writer.WriteNullValue();
190+
return;
191+
}
192+
193+
if (value.Document is not null)
194+
{
195+
var documentId = _settings.Inferrer.Routing(value.Document.GetType(), value.Document);
196+
writer.WriteStringValue(documentId);
197+
}
198+
else if (value.DocumentGetter is not null)
199+
{
200+
var doc = value.DocumentGetter();
201+
var documentId = _settings.Inferrer.Routing(doc.GetType(), doc);
202+
writer.WriteStringValue(documentId);
203+
}
204+
else if (value.LongValue.HasValue)
205+
{
206+
writer.WriteNumberValue(value.LongValue.Value);
207+
}
208+
else
209+
{
210+
writer.WriteStringValue(value.StringValue);
211+
}
212+
}
213+
}
170214
}

src/Elastic.Clients.Elasticsearch/Serialization/DefaultHighLevelSerializer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ public DefaultHighLevelSerializer(IElasticsearchClientSettings settings)
4646
//new ConvertAsConverterFactory(settings),
4747
new IndexNameConverter(settings),
4848
new ObjectToInferredTypesConverter(),
49+
new IdConverter(settings),
4950
new FieldConverter(settings),
5051
new SortCollectionConverter(settings),
5152
//new FieldNameQueryConverterFactory(settings),
5253
new CustomJsonWriterConverterFactory(settings),
54+
new RoutingConverter(settings),
5355
new SelfSerializableConverterFactory(settings),
5456
new IndicesJsonConverter(settings),
5557
//new FieldConverterFactory(settings),

src/Elastic.Clients.Elasticsearch/Types/Bulk/BulkIndexOperation.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public sealed class BulkIndexOperation<T> : BulkOperationBase
3030
public T Document { get; set; }
3131

3232
protected override string Operation => "index";
33-
33+
3434
protected override void Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
3535
{
3636
var requestResponseSerializer = settings.RequestResponseSerializer;
@@ -84,5 +84,9 @@ protected override async Task SerializeAsync(Stream stream, IElasticsearchClient
8484
}
8585

8686
protected override object GetBody() => Document;
87+
88+
protected override Id GetIdForOperation(Inferrer inferrer) => Id ?? new Id(Document);
89+
90+
protected override Routing GetRoutingForOperation(Inferrer inferrer) => Routing ?? new Routing(Document);
8791
}
8892
}

src/Elastic.Clients.Elasticsearch/Types/Bulk/BulkIndexOperationDescriptor.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,9 @@ protected override void SerializeInternal(Utf8JsonWriter writer, JsonSerializerO
106106
}
107107

108108
protected override object GetBody() => _document;
109+
110+
protected override Id GetIdForOperation(Inferrer inferrer) => IdValue ?? new Id(_document);
111+
112+
protected override Routing GetRoutingForOperation(Inferrer inferrer) => RoutingValue ?? new Routing(_document);
109113
}
110114
}

src/Elastic.Clients.Elasticsearch/Types/Bulk/BulkOperationBase.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,21 @@ public abstract class BulkOperationBase : IBulkOperation, IStreamSerializable
3636

3737
protected abstract Task SerializeAsync(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None);
3838

39-
void IStreamSerializable.Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting) =>
39+
void IStreamSerializable.Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting)
40+
{
41+
Id = GetIdForOperation(settings.Inferrer);
42+
Routing = GetRoutingForOperation(settings.Inferrer);
43+
4044
Serialize(stream, settings, formatting);
45+
}
46+
47+
Task IStreamSerializable.SerializeAsync(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting)
48+
{
49+
Id = GetIdForOperation(settings.Inferrer);
50+
Routing = GetRoutingForOperation(settings.Inferrer);
4151

42-
Task IStreamSerializable.SerializeAsync(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting) =>
43-
SerializeAsync(stream, settings, formatting);
52+
return SerializeAsync(stream, settings, formatting);
53+
}
4454

4555
protected abstract string Operation { get; }
4656

src/Elastic.Clients.Elasticsearch/Types/Bulk/BulkOperationDescriptorBase.cs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@ namespace Elastic.Clients.Elasticsearch
1212
{
1313
public abstract class BulkOperationDescriptorBase<TDescriptor> : DescriptorBase<TDescriptor>, IBulkOperation, IStreamSerializable where TDescriptor : BulkOperationDescriptorBase<TDescriptor>
1414
{
15-
private Id _id;
15+
//private Id _id;
1616
private long? _version;
1717
private IndexName _index;
18-
private Routing _routing;
18+
//private Routing _routing;
1919
private VersionType? _versionType;
2020
private long? _ifSequenceNo;
2121
private long? _ifPrimaryTerm;
2222

23+
protected Id IdValue { get; set; }
24+
protected Routing RoutingValue { get; set; }
25+
2326
protected abstract string Operation { get; }
2427

25-
public TDescriptor Id(Id id) => Assign(id, (a, v) => a._id = v);
28+
public TDescriptor Id(Id id) => Assign(id, (a, v) => a.IdValue = v);
2629

2730
public TDescriptor IfSequenceNumber(long? ifSequenceNumber) => Assign(ifSequenceNumber, (a, v) => a._ifSequenceNo = v);
2831

@@ -32,22 +35,25 @@ public abstract class BulkOperationDescriptorBase<TDescriptor> : DescriptorBase<
3235

3336
public TDescriptor Index<T>() => Assign(typeof(T), (a, v) => a._index = v);
3437

35-
public TDescriptor Routing(Routing routing) => Assign(routing, (a, v) => a._routing = v);
38+
public TDescriptor Routing(Routing routing) => Assign(routing, (a, v) => a.RoutingValue = v);
3639

3740
public TDescriptor Version(long version) => Assign(version, (a, v) => a._version = v);
3841

3942
public TDescriptor VersionType(VersionType? versionType) => Assign(versionType, (a, v) => a._versionType = v);
4043

4144
protected override void Serialize(Utf8JsonWriter writer, JsonSerializerOptions options, IElasticsearchClientSettings settings)
4245
{
46+
IdValue = GetIdForOperation(settings.Inferrer);
47+
RoutingValue = GetRoutingForOperation(settings.Inferrer);
48+
4349
writer.WriteStartObject();
4450

4551
SerializeInternal(writer, options, settings);
4652

47-
if (_id is not null)
53+
if (IdValue is not null)
4854
{
4955
writer.WritePropertyName("_id");
50-
JsonSerializer.Serialize(writer, _id, options);
56+
JsonSerializer.Serialize(writer, IdValue, options);
5157
}
5258

5359
if (_ifPrimaryTerm.HasValue)
@@ -68,10 +74,10 @@ protected override void Serialize(Utf8JsonWriter writer, JsonSerializerOptions o
6874
JsonSerializer.Serialize(writer, _index, options);
6975
}
7076

71-
if (_routing is not null)
77+
if (RoutingValue is not null)
7278
{
7379
writer.WritePropertyName("routing");
74-
JsonSerializer.Serialize(writer, _routing, options);
80+
JsonSerializer.Serialize(writer, RoutingValue, options);
7581
}
7682

7783
if (_version.HasValue)
@@ -101,8 +107,8 @@ protected override void Serialize(Utf8JsonWriter writer, JsonSerializerOptions o
101107

102108
protected abstract object GetBody();
103109

104-
protected virtual Id GetIdForOperation(Inferrer inferrer) => _id ?? new Id(GetBody());
110+
protected virtual Id GetIdForOperation(Inferrer inferrer) => IdValue ?? new Id(GetBody());
105111

106-
protected virtual Routing GetRoutingForOperation(Inferrer inferrer) => _routing ?? new Routing(GetBody());
112+
protected virtual Routing GetRoutingForOperation(Inferrer inferrer) => RoutingValue ?? new Routing(GetBody());
107113
}
108114
}

0 commit comments

Comments
 (0)