Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/Nest/Document/Multiple/Reindex/ReindexObservable.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using Elasticsearch.Net;

Expand All @@ -10,6 +11,7 @@ public class ReindexObservable<T> : IDisposable, IObservable<IReindexResponse<T>
private readonly IReindexRequest _reindexRequest;
private readonly IConnectionSettingsValues _connectionSettings;
private readonly IElasticClient _client;
private static IList<ISort> DocOrderSort = new ReadOnlyCollection<ISort>(new List<ISort> { new SortField { Field = "_doc" } });

private Action<IHit<T>, T, IBulkIndexOperation<T>> Alter { get; set; }

Expand All @@ -19,6 +21,7 @@ public ReindexObservable(IElasticClient client, IConnectionSettingsValues connec
this._reindexRequest = reindexRequest;
this._client = client;
}

public IDisposable Subscribe(ReindexObserver<T> observer)
{
this.Alter = observer.Alter;
Expand Down Expand Up @@ -46,7 +49,8 @@ private void Reindex(IObserver<IReindexResponse<T>> observer)
var toIndex = this._reindexRequest.To.Resolve(this._connectionSettings);
toIndex.ThrowIfNullOrEmpty(nameof(toIndex));

this.CreateIndex(fromIndex, toIndex);
if (!this._reindexRequest.OmitCreateIndex)
this.CreateIndex(fromIndex, toIndex);

var scroll = this._reindexRequest.Scroll ?? TimeSpan.FromMinutes(2);

Expand Down Expand Up @@ -87,19 +91,19 @@ private void ScrollToCompletion(IObserver<IReindexResponse<T>> observer, string
private ISearchResponse<T> InitiateSearch(string fromIndex, string toIndex, Time scroll)
{
var size = this._reindexRequest.Size ?? 100;
var searchResult = this._client.Search<T>(new SearchRequest<T>(fromIndex, this._reindexRequest.Type)
var searchResult = this._client.Search<T>(new SearchRequest<T>(fromIndex, this._reindexRequest.Types)
{
From = 0,
Size = size,
Query = this._reindexRequest.Query,
Scroll = scroll
Scroll = scroll,
Sort = DocOrderSort
});
if (searchResult.Total <= 0)
throw Throw($"Source index {fromIndex} doesn't contain any documents.", searchResult.ApiCall);
return searchResult;
}


private void CreateIndex(string resolvedFrom, string resolvedTo)
{
var originalIndexSettings = this._client.GetIndex(resolvedFrom);
Expand Down
76 changes: 47 additions & 29 deletions src/Nest/Document/Multiple/Reindex/ReindexRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,86 +6,104 @@ public interface IReindexRequest
{
IndexName To { get; set; }
IndexName From { get; set; }
Types Type { get; set; }
Types Types { get; set; }
/// <summary>
/// A search request can be scrolled by specifying the scroll parameter. The scroll parameter is a time value
/// parameter (for example: scroll=5m), indicating for how long the nodes that participate in the search
/// will maintain relevant resources in order to continue and support it. This is very similar in its idea to opening a cursor against a database.
/// </summary>
Time Scroll { get; set; }

/// <summary>
/// The number of hits to return. Defaults to 100. When using scroll search type,
/// size is actually multiplied by the number of shards!
/// </summary>
int? Size { get; set; }

/// <summary>
/// A query to optionally limit the documents to use for the reindex operation.
/// </summary>
QueryContainer Query { get; set; }

ICreateIndexRequest CreateIndexRequest { get; set; }
/// <summary>
/// Do not send a create index call on <see cref="To"/>, assume the index has been created outside of the reindex.
/// </summary>
bool OmitIndexCreation { get; set; }

IPutMappingRequest PutMappingRequest { get; set; }
/// <summary>
/// Describe how the newly created index should be created. Remember you can also register Index Templates for more dynamic usecases.
/// </summary>
ICreateIndexRequest CreateIndexRequest { get; set; }
}

public class ReindexRequest : IReindexRequest
{
/// <inheritdoc/>
public bool OmitCreateIndex { get; set; }
/// <inheritdoc/>
public IndexName To { get; set; }
/// <inheritdoc/>
public IndexName From { get; set; }
public Types Type { get; set; }
/// <inheritdoc/>
public Types Types { get; set; }
/// <inheritdoc/>
public Time Scroll { get; set; }
/// <inheritdoc/>
public int? Size { get; set; }
/// <inheritdoc/>
public QueryContainer Query { get; set; }
/// <inheritdoc/>
public ICreateIndexRequest CreateIndexRequest { get; set; }
public IPutMappingRequest PutMappingRequest { get; set; }
public ReindexRequest(IndexName from, IndexName to, Types type)

public ReindexRequest(IndexName from, IndexName to, Types types)
{
this.To = to;
this.From = from;
this.Type = type;
this.Types = types;
}
}

public class ReindexDescriptor<T> : DescriptorBase<ReindexDescriptor<T>, IReindexRequest>, IReindexRequest where T : class
{
bool IReindexRequest.OmitIndexCreation { get; set; }
IndexName IReindexRequest.To { get; set; }
IndexName IReindexRequest.From { get; set; }
Types IReindexRequest.Type { get; set; }
Types IReindexRequest.Types { get; set; }
Time IReindexRequest.Scroll { get; set; }
int? IReindexRequest.Size { get; set; }
QueryContainer IReindexRequest.Query { get; set; }
ICreateIndexRequest IReindexRequest.CreateIndexRequest { get; set; }
IPutMappingRequest IReindexRequest.PutMappingRequest { get; set; }

public ReindexDescriptor(IndexName from, IndexName to)
{
Assign(a => a.From = from)
.Assign(a => a.To = to)
.Assign(a => a.Type = typeof(T));
.Assign(a => a.Types = typeof(T));
}

/// <summary>
/// A search request can be scrolled by specifying the scroll parameter. The scroll parameter is a time value parameter (for example: scroll=5m), indicating for how long the nodes that participate in the search will maintain relevant resources in order to continue and support it. This is very similar in its idea to opening a cursor against a database.
/// </summary>
/// <param name="scrollTime">The scroll parameter is a time value parameter (for example: scroll=5m)</param>
/// <returns></returns>
/// <inheritdoc/>
public ReindexDescriptor<T> Scroll(Time scrollTime) => Assign(a => a.Scroll = scrollTime);

/// <summary>
/// The number of hits to return. Defaults to 100. When using scroll search type,
/// size is actually multiplied by the number of shards!
/// </summary>
/// <inheritdoc/>
public ReindexDescriptor<T> Size(int? size) => Assign(a => a.Size = size);

/// <summary>
/// The number of hits to return. Defaults to 100.
/// </summary>
/// <inheritdoc/>
public ReindexDescriptor<T> Take(int? take) => Assign(a => a.Size = take);

/// <summary>
/// A query to optionally limit the documents to use for the reindex operation.
/// </summary>
/// <inheritdoc/>
public ReindexDescriptor<T> OmitIndexCreation(bool omit = true) => Assign(a => a.OmitIndexCreation = true);

/// <inheritdoc/>
public ReindexDescriptor<T> Query(Func<QueryContainerDescriptor<T>, QueryContainer> querySelector) =>
Assign(a => a.Query = querySelector?.Invoke(new QueryContainerDescriptor<T>()));

/// <summary>
/// A query to optionally limit the documents to use for the reindex operation.
/// </summary>
/// <inheritdoc/>
public ReindexDescriptor<T> Query(QueryContainer query) => Assign(a => a.Query = query);

/// <summary>
/// Specify the document types to reindex. By default, will be <typeparamref name="T"/>
/// </summary>
public ReindexDescriptor<T> Type(Types type) => Assign(a => a.Type = type);
public ReindexDescriptor<T> Type(Types type) => Assign(a => a.Types = type);

/// <summary>
/// Reindex all document types.
Expand Down