From ea45bbd331c8b91147437cac76c5ed28df0828fb Mon Sep 17 00:00:00 2001 From: David Pfeffer Date: Wed, 13 Nov 2013 13:58:08 -0500 Subject: [PATCH] added async version of Bulk --- src/Nest/ElasticClient-Bulk.cs | 107 +++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 46 deletions(-) diff --git a/src/Nest/ElasticClient-Bulk.cs b/src/Nest/ElasticClient-Bulk.cs index ce09fd526c0..4178539743f 100644 --- a/src/Nest/ElasticClient-Bulk.cs +++ b/src/Nest/ElasticClient-Bulk.cs @@ -10,7 +10,9 @@ namespace Nest { - public partial class ElasticClient + using System.Threading.Tasks; + + public partial class ElasticClient { public IBulkResponse Bulk(Func bulkSelector) { @@ -18,55 +20,68 @@ public IBulkResponse Bulk(Func bulkSelector) var bulkDescriptor = bulkSelector(new BulkDescriptor()); return this.Bulk(bulkDescriptor); } + + private void GenerateBulkPathAndJson(BulkDescriptor bulkDescriptor, out string json, out string path) + { + bulkDescriptor.ThrowIfNull("bulkDescriptor"); + bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations"); + var sb = new StringBuilder(); + + foreach (var operation in bulkDescriptor._Operations) + { + var command = operation._Operation; + var index = operation._Index ?? + bulkDescriptor._FixedIndex ?? + new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType); + var typeName = operation._Type + ?? bulkDescriptor._FixedType + ?? this.Infer.TypeName(operation._ClrType); + + var id = operation.GetIdForObject(this.Infer); + operation._Index = index; + operation._Type = typeName; + operation._Id = id; + + var opJson = this.Serializer.Serialize(operation, Formatting.None); + + var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson); + sb.Append(action); + + if (command == "index" || command == "create") + { + string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None); + sb.Append(jsonCommand + "\n"); + } + else if (command == "update") + { + string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None); + sb.Append(jsonCommand + "\n"); + } + } + json = sb.ToString(); + path = "_bulk"; + if (!bulkDescriptor._FixedIndex.IsNullOrEmpty()) + { + if (!bulkDescriptor._FixedType.IsNullOrEmpty()) + path = bulkDescriptor._FixedType + "/" + path; + path = bulkDescriptor._FixedIndex + "/" + path; + } + } public IBulkResponse Bulk(BulkDescriptor bulkDescriptor) { - bulkDescriptor.ThrowIfNull("bulkDescriptor"); - bulkDescriptor._Operations.ThrowIfEmpty("Bulk descriptor does not define any operations"); - var sb = new StringBuilder(); - - foreach (var operation in bulkDescriptor._Operations) - { - var command = operation._Operation; - var index = operation._Index ?? - bulkDescriptor._FixedIndex ?? - new IndexNameResolver(this._connectionSettings).GetIndexForType(operation._ClrType); - var typeName = operation._Type - ?? bulkDescriptor._FixedType - ?? this.Infer.TypeName(operation._ClrType); - - var id = operation.GetIdForObject(this.Infer); - operation._Index = index; - operation._Type = typeName; - operation._Id = id; - - var opJson = this.Serializer.Serialize(operation, Formatting.None); - - var action = "{{ \"{0}\" : {1} }}\n".F(command, opJson); - sb.Append(action); - - if (command == "index" || command == "create") - { - string jsonCommand = this.Serializer.Serialize(operation._Object, Formatting.None); - sb.Append(jsonCommand + "\n"); - } - else if (command == "update") - { - string jsonCommand = this.Serializer.Serialize(operation.GetBody(), Formatting.None); - sb.Append(jsonCommand + "\n"); - } - } - var json = sb.ToString(); - var path = "_bulk"; - if (!bulkDescriptor._FixedIndex.IsNullOrEmpty()) - { - if (!bulkDescriptor._FixedType.IsNullOrEmpty()) - path = bulkDescriptor._FixedType + "/" + path; - path = bulkDescriptor._FixedIndex + "/" + path; - } + string json, path; + this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path); var status = this.Connection.PostSync(path, json); return this.Deserialize(status); - } - + } + public Task BulkAsync(BulkDescriptor bulkDescriptor) + { + string json, path; + this.GenerateBulkPathAndJson(bulkDescriptor, out json, out path); + var task = this.Connection.Post(path, json); + return task.ContinueWith(t => (IBulkResponse)this.Deserialize(t.Result)); + } + internal string GenerateBulkIndexCommand(IEnumerable objects) where T : class { return this.GenerateBulkCommand(@objects, "index");