Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using Elastic.Clients.Elasticsearch.QueryDsl;

namespace Elastic.Clients.Elasticsearch.AsyncSearch
{
public partial class AsyncSearchSubmitRequest
{
// Any request may contain aggregations so we force typed_keys in order to successfully deserialise them.
internal override void BeforeRequest() => TypedKeys = true;
}

public sealed partial class AsyncSearchSubmitRequestDescriptor
{
public AsyncSearchSubmitRequestDescriptor MatchAll(Action<MatchAllQueryDescriptor>? selector = null) => selector is null ? Query(q => q.MatchAll()) : Query(q => q.MatchAll(selector));

internal override void BeforeRequest() => TypedKeys(true);
}

public sealed partial class AsyncSearchSubmitRequestDescriptor<TDocument>
{
public AsyncSearchSubmitRequestDescriptor<TDocument> MatchAll()
{
Query(new MatchAllQuery());
return Self;
}

internal override void BeforeRequest() => TypedKeys(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

namespace Elastic.Clients.Elasticsearch.AsyncSearch
{
public partial class GetAsyncSearchRequest
{
// Any request may contain aggregations so we force typed_keys in order to successfully deserialise them.
internal override void BeforeRequest() => TypedKeys = true;
}

public sealed partial class GetAsyncSearchRequestDescriptor<TDocument>
{
// Any request may contain aggregations so we force typed_keys in order to successfully deserialise them.
internal override void BeforeRequest() => TypedKeys(true);
}

public sealed partial class GetAsyncSearchRequestDescriptor
{
// Any request may contain aggregations so we force typed_keys in order to successfully deserialise them.
internal override void BeforeRequest() => TypedKeys(true);
}
}

This file was deleted.

256 changes: 256 additions & 0 deletions src/Elastic.Clients.Elasticsearch/Api/BulkRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading.Tasks;
using Elastic.Transport;
using System.IO;
using System.Collections.Generic;
using System.Linq;

namespace Elastic.Clients.Elasticsearch
{
public partial class BulkRequest : IStreamSerializable
{
protected IRequest Self => this;

public BulkOperationsCollection Operations { get; set; }

protected override string ContentType => "application/x-ndjson";

protected override string Accept => "application/json";

public void Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
{
if (Operations is null)
return;

var index = Self.RouteValues.Get<IndexName>("index");

foreach (var op in Operations)
{
if (op is not IStreamSerializable serializable)
throw new InvalidOperationException("");

op.PrepareIndex(index);

serializable.Serialize(stream, settings, formatting);
stream.WriteByte((byte)'\n');
}
}

public async Task SerializeAsync(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
{
if (Operations is null)
return;

var index = Self.RouteValues.Get<IndexName>("index");

foreach (var op in Operations)
{
if (op is not IStreamSerializable serializable)
throw new InvalidOperationException("");

op.PrepareIndex(index);

await serializable.SerializeAsync(stream, settings, formatting).ConfigureAwait(false);
stream.WriteByte((byte)'\n');
}
}
}

public sealed partial class BulkRequestDescriptor : IStreamSerializable
{
protected override string ContentType => "application/x-ndjson";

protected override string Accept => "application/json";

private readonly BulkOperationsCollection _operations = new();

public BulkRequestDescriptor Index(string index)
{
RouteValues.Optional("index", IndexName.Parse(index));
return Self;
}

public BulkRequestDescriptor Create<TSource>(TSource document, Action<BulkCreateOperationDescriptor<TSource>> configure = null)
{
var descriptor = new BulkCreateOperationDescriptor<TSource>(document);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Create<TSource>(TSource document, IndexName index, Action<BulkCreateOperationDescriptor<TSource>> configure = null)
{
var descriptor = new BulkCreateOperationDescriptor<TSource>(document, index);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Index<TSource>(TSource document, Action<BulkIndexOperationDescriptor<TSource>> configure = null)
{
var descriptor = new BulkIndexOperationDescriptor<TSource>(document);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Index<TSource>(TSource document, IndexName index, Action<BulkIndexOperationDescriptor<TSource>> configure = null)
{
var descriptor = new BulkIndexOperationDescriptor<TSource>(document, index);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Update(BulkUpdateOperationBase update)
{
_operations.Add(update);
return this;
}

public BulkRequestDescriptor Update<TSource, TPartialDocument>(Action<BulkUpdateOperationDescriptor<TSource, TPartialDocument>> configure)
{
var descriptor = new BulkUpdateOperationDescriptor<TSource, TPartialDocument>();
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Update<T>(Action<BulkUpdateOperationDescriptor<T, T>> configure) =>
Update<T, T>(configure);

public BulkRequestDescriptor Delete(Id id, Action<BulkDeleteOperationDescriptor> configure = null)
{
var descriptor = new BulkDeleteOperationDescriptor(id);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Delete(string id, Action<BulkDeleteOperationDescriptor> configure = null)
{
var descriptor = new BulkDeleteOperationDescriptor(id);
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Delete(Action<BulkDeleteOperationDescriptor> configure)
{
var descriptor = new BulkDeleteOperationDescriptor();
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Delete<TSource>(TSource documentToDelete, Action<BulkDeleteOperationDescriptor> configure = null)
{
var descriptor = new BulkDeleteOperationDescriptor(new Id(documentToDelete));
configure?.Invoke(descriptor);
_operations.Add(descriptor);
return this;
}

public BulkRequestDescriptor Delete<TSource>(Action<BulkDeleteOperationDescriptor> configure) => Delete(configure);

public BulkRequestDescriptor CreateMany<TSource>(IEnumerable<TSource> documents, Action<BulkCreateOperationDescriptor<TSource>, TSource> bulkCreateSelector) =>
AddOperations(documents, bulkCreateSelector, o => new BulkCreateOperationDescriptor<TSource>(o));

public BulkRequestDescriptor CreateMany<TSource>(IEnumerable<TSource> documents) =>
AddOperations(documents, null, o => new BulkCreateOperationDescriptor<TSource>(o));

public BulkRequestDescriptor IndexMany<TSource>(IEnumerable<TSource> documents, Action<BulkIndexOperationDescriptor<TSource>, TSource> bulkIndexSelector) =>
AddOperations(documents, bulkIndexSelector, o => new BulkIndexOperationDescriptor<TSource>(o));

public BulkRequestDescriptor IndexMany<TSource>(IEnumerable<TSource> documents) =>
AddOperations(documents, null, o => new BulkIndexOperationDescriptor<TSource>(o));

public BulkRequestDescriptor UpdateMany<TSource>(IEnumerable<TSource> objects, Action<BulkUpdateOperationDescriptor<TSource, TSource>, TSource> bulkIndexSelector) =>
AddOperations(objects, bulkIndexSelector, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o));

public BulkRequestDescriptor UpdateMany<TSource>(IEnumerable<TSource> objects) =>
AddOperations(objects, null, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o));

public BulkRequestDescriptor DeleteMany<T>(IEnumerable<T> objects, Action<BulkDeleteOperationDescriptor, T> bulkDeleteSelector) =>
AddOperations(objects, bulkDeleteSelector, obj => new BulkDeleteOperationDescriptor(new Id(obj)));

public BulkRequestDescriptor DeleteMany(IEnumerable<Id> ids, Action<BulkDeleteOperationDescriptor, Id> bulkDeleteSelector) =>
AddOperations(ids, bulkDeleteSelector, id => new BulkDeleteOperationDescriptor(id));

public BulkRequestDescriptor DeleteMany<T>(IEnumerable<T> objects) =>
AddOperations(objects, null, obj => new BulkDeleteOperationDescriptor<T>(obj));

public BulkRequestDescriptor DeleteMany(IndexName index, IEnumerable<Id> ids) =>
AddOperations(ids, null, id => new BulkDeleteOperationDescriptor(id).Index(index));

public void Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
{
if (_operations is null)
return;

var index = Self.RouteValues.Get<IndexName>("index");

foreach (var op in _operations)
{
if (op is not IStreamSerializable serializable)
throw new InvalidOperationException("");

op.PrepareIndex(index);

serializable.Serialize(stream, settings, formatting);
stream.WriteByte((byte)'\n');
}
}

public async Task SerializeAsync(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
{
if (_operations is null)
return;

var index = Self.RouteValues.Get<IndexName>("index");

foreach (var op in _operations)
{
if (op is not IStreamSerializable serializable)
throw new InvalidOperationException("");

op.PrepareIndex(index);

await serializable.SerializeAsync(stream, settings, formatting).ConfigureAwait(false);
stream.WriteByte((byte)'\n');
}
}

private BulkRequestDescriptor AddOperations<TSource, TDescriptor>(
IEnumerable<TSource> objects,
Action<TDescriptor, TSource> configureDescriptor,
Func<TSource, TDescriptor> createDescriptor
) where TDescriptor : IBulkOperation
{
if (@objects == null)
return this;

var objectsList = @objects.ToList();
var operations = new List<IBulkOperation>(objectsList.Count());

foreach (var o in objectsList)
{
var descriptor = createDescriptor(o);

if (configureDescriptor is not null)
{
configureDescriptor(descriptor, o);
}

operations.Add(descriptor);
}

_operations.AddRange(operations);
return Self;
}
}
}
34 changes: 34 additions & 0 deletions src/Elastic.Clients.Elasticsearch/Api/BulkResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Text.Json.Serialization;
using System.Text;
using System.Linq;

namespace Elastic.Clients.Elasticsearch
{
public partial class BulkResponse
{
[JsonConverter(typeof(BulkResponseItemConverter)), JsonPropertyName("items")]
public IReadOnlyList<BulkResponseItemBase> Items { get; init; }

[JsonIgnore]
public IEnumerable<BulkResponseItemBase> ItemsWithErrors => !Items.HasAny()
? Enumerable.Empty<BulkResponseItemBase>()
: Items.Where(i => !i.IsValid);

public override bool IsValid => base.IsValid && !Errors && !ItemsWithErrors.HasAny();

protected override void DebugIsValid(StringBuilder sb)
{
if (Items == null)
return;

sb.AppendLine($"# Invalid Bulk items:");
foreach (var i in Items.Select((item, i) => new { item, i }).Where(i => !i.item.IsValid))
sb.AppendLine($" operation[{i.i}]: {i.item}");
}
}
}
27 changes: 27 additions & 0 deletions src/Elastic.Clients.Elasticsearch/Api/BulkResponseItemBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

namespace Elastic.Clients.Elasticsearch
{
public abstract partial class BulkResponseItemBase
{
public abstract string Operation { get; }

public bool IsValid
{
get
{
if (Error is not null)
return false;

return Operation.ToLowerInvariant() switch
{
"delete" => Status == 200 || Status == 404,
"update" or "index" or "create" => Status == 200 || Status == 201,
_ => false,
};
}
}
}
}
Loading