-
Notifications
You must be signed in to change notification settings - Fork 623
/
IndexReplicationHandler.cs
345 lines (308 loc) · 14.2 KB
/
IndexReplicationHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
using Lucene.Net.Index;
using Lucene.Net.Store;
using Lucene.Net.Util;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using JCG = J2N.Collections.Generic;
using Directory = Lucene.Net.Store.Directory;
namespace Lucene.Net.Replicator
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// <summary>
/// A <see cref="IReplicationHandler"/> for replication of an index. Implements
/// <see cref="RevisionReady"/> by copying the files pointed by the client resolver to
/// the index <see cref="Store.Directory"/> and then touches the index with
/// <see cref="IndexWriter"/> to make sure any unused files are deleted.
/// </summary>
/// <remarks>
/// <b>NOTE:</b> This handler assumes that <see cref="IndexWriter"/> is not opened by
/// another process on the index directory. In fact, opening an
/// <see cref="IndexWriter"/> on the same directory to which files are copied can lead
/// to undefined behavior, where some or all the files will be deleted, override
/// other files or simply create a mess. When you replicate an index, it is best
/// if the index is never modified by <see cref="IndexWriter"/>, except the one that is
/// open on the source index, from which you replicate.
/// <para/>
/// This handler notifies the application via a provided <see cref="T:Func{bool?}"/> when an
/// updated index commit was made available for it.
/// <para/>
/// @lucene.experimental
/// </remarks>
public class IndexReplicationHandler : IReplicationHandler
{
/// <summary>
/// The component used to log messages to the <see cref="Util.InfoStream.Default"/>
/// <see cref="Util.InfoStream"/>.
/// </summary>
public const string INFO_STREAM_COMPONENT = "IndexReplicationHandler";
private readonly Directory indexDirectory;
private readonly Func<bool?> callback;
private volatile IDictionary<string, IList<RevisionFile>> currentRevisionFiles;
private volatile string currentVersion;
private volatile InfoStream infoStream;
//Note: LUCENENET Specific Utility Method
private void WriteToInfoStream(params string[] messages)
{
if (!InfoStream.IsEnabled(INFO_STREAM_COMPONENT))
return;
foreach (string message in messages)
InfoStream.Message(INFO_STREAM_COMPONENT, message);
}
/// <summary>
/// Returns the last <see cref="IndexCommit"/> found in the <see cref="Directory"/>, or
/// <c>null</c> if there are no commits.
/// </summary>
/// <exception cref="IOException"></exception>
public static IndexCommit GetLastCommit(Directory directory)
{
try
{
// IndexNotFoundException which we handle below
if (DirectoryReader.IndexExists(directory))
{
var commits = DirectoryReader.ListCommits(directory);
return commits[commits.Count - 1];
}
}
catch (IndexNotFoundException)
{
// ignore the exception and return null
}
return null;
}
/// <summary>
/// Verifies that the last file is segments_N and fails otherwise. It also
/// removes and returns the file from the list, because it needs to be handled
/// last, after all files. This is important in order to guarantee that if a
/// reader sees the new segments_N, all other segment files are already on
/// stable storage.
/// <para/>
/// The reason why the code fails instead of putting segments_N file last is
/// that this indicates an error in the <see cref="IRevision"/> implementation.
/// </summary>
public static string GetSegmentsFile(IList<string> files, bool allowEmpty)
{
if (files.Count == 0)
{
if (allowEmpty)
return null;
throw IllegalStateException.Create("empty list of files not allowed");
}
string segmentsFile = files[files.Count - 1];
//NOTE: Relying on side-effects outside?
files.RemoveAt(files.Count - 1);
if (!segmentsFile.StartsWith(IndexFileNames.SEGMENTS, StringComparison.Ordinal) || segmentsFile.Equals(IndexFileNames.SEGMENTS_GEN, StringComparison.Ordinal))
{
throw IllegalStateException.Create(
string.Format("last file to copy+sync must be segments_N but got {0}; check your Revision implementation!", segmentsFile));
}
return segmentsFile;
}
/// <summary>
/// Cleanup the index directory by deleting all given files. Called when file
/// copy or sync failed.
/// </summary>
public static void CleanupFilesOnFailure(Directory directory, IList<string> files)
{
foreach (string file in files)
{
try
{
directory.DeleteFile(file);
}
catch
{
// suppress any exception because if we're here, it means copy
// failed, and we must cleanup after ourselves.
}
}
}
/// <summary>
/// Cleans up the index directory from old index files. This method uses the
/// last commit found by <see cref="GetLastCommit(Directory)"/>. If it matches the
/// expected <paramref name="segmentsFile"/>, then all files not referenced by this commit point
/// are deleted.
/// </summary>
/// <remarks>
/// <b>NOTE:</b> This method does a best effort attempt to clean the index
/// directory. It suppresses any exceptions that occur, as this can be retried
/// the next time.
/// </remarks>
public static void CleanupOldIndexFiles(Directory directory, string segmentsFile)
{
try
{
IndexCommit commit = GetLastCommit(directory);
// commit == null means weird IO errors occurred, ignore them
// if there were any IO errors reading the expected commit point (i.e.
// segments files mismatch), then ignore that commit either.
if (commit != null && commit.SegmentsFileName.Equals(segmentsFile, StringComparison.Ordinal))
{
ISet<string> commitFiles = new JCG.HashSet<string>(commit.FileNames)
{
IndexFileNames.SEGMENTS_GEN
};
Regex matcher = IndexFileNames.CODEC_FILE_PATTERN;
foreach (string file in directory.ListAll())
{
if (!commitFiles.Contains(file) && (matcher.IsMatch(file) || file.StartsWith(IndexFileNames.SEGMENTS, StringComparison.Ordinal)))
{
try
{
directory.DeleteFile(file);
}
catch
{
// suppress, it's just a best effort
}
}
}
}
}
catch
{
// ignore any errors that happens during this state and only log it. this
// cleanup will have a chance to succeed the next time we get a new
// revision.
}
}
/// <summary>
/// Copies the provided list of files from the <paramref name="source"/> <see cref="Directory"/> to the
/// <paramref name="target"/> <see cref="Directory"/>, if they are not the same.
/// </summary>
/// <exception cref="IOException"></exception>
public static void CopyFiles(Directory source, Directory target, IList<string> files)
{
if (source.Equals(target))
return;
foreach (string file in files)
source.Copy(target, file, file, IOContext.READ_ONCE);
}
/// <summary>
/// Writes <see cref="IndexFileNames.SEGMENTS_GEN"/> file to the directory, reading
/// the generation from the given <paramref name="segmentsFile"/>. If it is <c>null</c>,
/// this method deletes segments.gen from the directory.
/// </summary>
public static void WriteSegmentsGen(string segmentsFile, Directory directory)
{
if (segmentsFile != null)
{
SegmentInfos.WriteSegmentsGen(directory, SegmentInfos.GenerationFromSegmentsFileName(segmentsFile));
return;
}
try
{
directory.DeleteFile(IndexFileNames.SEGMENTS_GEN);
}
catch
{
// suppress any errors while deleting this file.
}
}
/// <summary>
/// Constructor with the given index directory and callback to notify when the
/// indexes were updated.
/// </summary>
public IndexReplicationHandler(Directory indexDirectory, Func<bool?> callback) // LUCENENET TODO: API - shouldn't this be Action ?
{
this.InfoStream = InfoStream.Default;
this.callback = callback;
this.indexDirectory = indexDirectory;
currentVersion = null;
currentRevisionFiles = null;
if (DirectoryReader.IndexExists(indexDirectory))
{
IList<IndexCommit> commits = DirectoryReader.ListCommits(indexDirectory);
IndexCommit commit = commits[commits.Count - 1];
currentVersion = IndexRevision.RevisionVersion(commit);
currentRevisionFiles = IndexRevision.RevisionFiles(commit);
WriteToInfoStream(
string.Format("constructor(): currentVersion={0} currentRevisionFiles={1}", currentVersion, currentRevisionFiles),
string.Format("constructor(): commit={0}", commit));
}
}
public virtual string CurrentVersion => currentVersion;
public virtual IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles => currentRevisionFiles;
public virtual void RevisionReady(string version,
IDictionary<string, IList<RevisionFile>> revisionFiles,
IDictionary<string, IList<string>> copiedFiles,
IDictionary<string, Directory> sourceDirectory)
{
if (revisionFiles.Count > 1) throw new ArgumentException(string.Format("this handler handles only a single source; got {0}", revisionFiles.Keys));
Directory clientDirectory = sourceDirectory.Values.First();
IList<string> files = copiedFiles.Values.First();
string segmentsFile = GetSegmentsFile(files, false);
bool success = false;
try
{
// copy files from the client to index directory
CopyFiles(clientDirectory, indexDirectory, files);
// fsync all copied files (except segmentsFile)
indexDirectory.Sync(files);
// now copy and fsync segmentsFile
clientDirectory.Copy(indexDirectory, segmentsFile, segmentsFile, IOContext.READ_ONCE);
indexDirectory.Sync(new[] { segmentsFile });
success = true;
}
finally
{
if (!success)
{
files.Add(segmentsFile); // add it back so it gets deleted too
CleanupFilesOnFailure(indexDirectory, files);
}
}
// all files have been successfully copied + sync'd. update the handler's state
currentRevisionFiles = revisionFiles;
currentVersion = version;
WriteToInfoStream(string.Format("revisionReady(): currentVersion={0} currentRevisionFiles={1}", currentVersion, currentRevisionFiles));
// update the segments.gen file
WriteSegmentsGen(segmentsFile, indexDirectory);
// Cleanup the index directory from old and unused index files.
// NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
// side-effects, e.g. if it hits sudden IO errors while opening the index
// (and can end up deleting the entire index). It is not our job to protect
// against those errors, app will probably hit them elsewhere.
CleanupOldIndexFiles(indexDirectory, segmentsFile);
// successfully updated the index, notify the callback that the index is
// ready.
if (callback != null)
{
try
{
callback.Invoke();
}
catch (Exception e)
{
throw new IOException(e.ToString(), e);
}
}
}
/// <summary>
/// Gets or sets the <see cref="Util.InfoStream"/> to use for logging messages.
/// </summary>
public virtual InfoStream InfoStream
{
get => infoStream;
set => infoStream = value ?? InfoStream.NO_OUTPUT;
}
}
}