Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 61 additions & 46 deletions src/Nest/ElasticClient-Bulk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,63 +10,78 @@

namespace Nest
{
public partial class ElasticClient
using System.Threading.Tasks;

public partial class ElasticClient
{
public IBulkResponse Bulk(Func<BulkDescriptor, BulkDescriptor> bulkSelector)
{
bulkSelector.ThrowIfNull("bulkSelector");
var bulkDescriptor = bulkSelector(new BulkDescriptor());
return this.Bulk(bulkDescriptor);
}

private void GenerateBulkPathAndJson(BulkDescriptor bulkDescriptor, out string json, out string path)
{
bulkDescriptor.ThrowIfNull("bulkDescriptor");
bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations");
var sb = new StringBuilder();

foreach (var operation in bulkDescriptor._Operations)
{
var command = operation._Operation;
var index = operation._Index ??
bulkDescriptor._FixedIndex ??
new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType);
var typeName = operation._Type
?? bulkDescriptor._FixedType
?? this.Infer.TypeName(operation._ClrType);

var id = operation.GetIdForObject(this.Infer);
operation._Index = index;
operation._Type = typeName;
operation._Id = id;

var opJson = this.Serializer.Serialize(operation, Formatting.None);

var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson);
sb.Append(action);

if (command == "index" || command == "create")
{
string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None);
sb.Append(jsonCommand + "\n");
}
else if (command == "update")
{
string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None);
sb.Append(jsonCommand + "\n");
}
}
json = sb.ToString();
path = "_bulk";
if (!bulkDescriptor._FixedIndex.IsNullOrEmpty())
{
if (!bulkDescriptor._FixedType.IsNullOrEmpty())
path = bulkDescriptor._FixedType + "/" + path;
path = bulkDescriptor._FixedIndex + "/" + path;
}
}
public IBulkResponse Bulk(BulkDescriptor bulkDescriptor)
{
bulkDescriptor.ThrowIfNull("bulkDescriptor");
bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations");
var sb = new StringBuilder();

foreach (var operation in bulkDescriptor._Operations)
{
var command = operation._Operation;
var index = operation._Index ??
bulkDescriptor._FixedIndex ??
new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType);
var typeName = operation._Type
?? bulkDescriptor._FixedType
?? this.Infer.TypeName(operation._ClrType);

var id = operation.GetIdForObject(this.Infer);
operation._Index = index;
operation._Type = typeName;
operation._Id = id;

var opJson = this.Serializer.Serialize(operation, Formatting.None);

var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson);
sb.Append(action);

if (command == "index" || command == "create")
{
string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None);
sb.Append(jsonCommand + "\n");
}
else if (command == "update")
{
string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None);
sb.Append(jsonCommand + "\n");
}
}
var json = sb.ToString();
var path = "_bulk";
if (!bulkDescriptor._FixedIndex.IsNullOrEmpty())
{
if (!bulkDescriptor._FixedType.IsNullOrEmpty())
path = bulkDescriptor._FixedType + "/" + path;
path = bulkDescriptor._FixedIndex + "/" + path;
}
string json, path;
this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path);
var status = this.Connection.PostSync(path, json);
return this.Deserialize<BulkResponse>(status);
}

}
public Task<IBulkResponse> BulkAsync(BulkDescriptor bulkDescriptor)
{
string json, path;
this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path);
var task = this.Connection.Post(path, json);
return task.ContinueWith(t => (IBulkResponse)this.Deserialize<BulkResponse>(t.Result));
}

internal string GenerateBulkIndexCommand<T>(IEnumerable<T> objects) where T : class
{
return this.GenerateBulkCommand<T>(@objects, "index");
Expand Down