-
Notifications
You must be signed in to change notification settings - Fork 624
/
CachingCollector.cs
542 lines (473 loc) · 20.7 KB
/
CachingCollector.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
using Lucene.Net.Support;
using System;
using System.Collections.Generic;
namespace Lucene.Net.Search
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using AtomicReaderContext = Lucene.Net.Index.AtomicReaderContext;
using RamUsageEstimator = Lucene.Net.Util.RamUsageEstimator;
/// <summary>
/// Caches all docs, and optionally also scores, coming from
/// a search, and is then able to replay them to another
/// collector. You specify the max RAM this class may use.
/// Once the collection is done, call <see cref="IsCached"/>. If
/// this returns <c>true</c>, you can use <see cref="Replay(ICollector)"/>
/// against a new collector. If it returns <c>false</c>, this means
/// too much RAM was required and you must instead re-run the
/// original search.
///
/// <para/><b>NOTE</b>: this class consumes 4 (or 8 bytes, if
/// scoring is cached) per collected document. If the result
/// set is large this can easily be a very substantial amount
/// of RAM!
///
/// <para/><b>NOTE</b>: this class caches at least 128 documents
/// before checking RAM limits.
///
/// <para>See the Lucene <c>modules/grouping</c> module for more
/// details including a full code example.</para>
///
/// @lucene.experimental
/// </summary>
public abstract class CachingCollector : ICollector
{
// Max out at 512K arrays
private const int MAX_ARRAY_SIZE = 512 * 1024;
private const int INITIAL_ARRAY_SIZE = 128;
/// <summary>
/// NOTE: This was EMPTY_INT_ARRAY in Lucene
/// </summary>
private static readonly int[] EMPTY_INT32_ARRAY = Arrays.Empty<int>();
private class SegStart
{
public AtomicReaderContext ReaderContext { get; private set; }
public int End { get; private set; }
public SegStart(AtomicReaderContext readerContext, int end)
{
this.ReaderContext = readerContext;
this.End = end;
}
}
private sealed class CachedScorer : Scorer
{
// NOTE: these members are package-private b/c that way accessing them from
// the outer class does not incur access check by the JVM. The same
// situation would be if they were defined in the outer class as private
// members.
internal int doc;
internal float score;
internal CachedScorer()
: base(null)
{
}
public override float GetScore()
{
return score;
}
public override int Advance(int target)
{
throw new NotSupportedException();
}
public override int DocID => doc;
public override int Freq => throw new NotSupportedException();
public override int NextDoc()
{
throw new NotSupportedException();
}
public override long GetCost()
{
return 1;
}
}
/// <summary>
/// A <see cref="CachingCollector"/> which caches scores
/// </summary>
private sealed class ScoreCachingCollector : CachingCollector
{
private readonly CachedScorer cachedScorer;
private readonly IList<float[]> cachedScores;
private Scorer scorer;
private float[] curScores;
internal ScoreCachingCollector(ICollector other, double maxRAMMB)
: base(other, maxRAMMB, true)
{
cachedScorer = new CachedScorer();
cachedScores = new List<float[]>();
curScores = new float[INITIAL_ARRAY_SIZE];
cachedScores.Add(curScores);
}
internal ScoreCachingCollector(ICollector other, int maxDocsToCache)
: base(other, maxDocsToCache)
{
cachedScorer = new CachedScorer();
cachedScores = new List<float[]>();
curScores = new float[INITIAL_ARRAY_SIZE];
cachedScores.Add(curScores);
}
public override void Collect(int doc)
{
if (m_curDocs == null)
{
// Cache was too large
cachedScorer.score = scorer.GetScore();
cachedScorer.doc = doc;
m_other.Collect(doc);
return;
}
// Allocate a bigger array or abort caching
if (m_upto == m_curDocs.Length)
{
m_base += m_upto;
// Compute next array length - don't allocate too big arrays
int nextLength = 8 * m_curDocs.Length;
if (nextLength > MAX_ARRAY_SIZE)
{
nextLength = MAX_ARRAY_SIZE;
}
if (m_base + nextLength > m_maxDocsToCache)
{
// try to allocate a smaller array
nextLength = m_maxDocsToCache - m_base;
if (nextLength <= 0)
{
// Too many docs to collect -- clear cache
m_curDocs = null;
curScores = null;
m_cachedSegs.Clear();
m_cachedDocs.Clear();
cachedScores.Clear();
cachedScorer.score = scorer.GetScore();
cachedScorer.doc = doc;
m_other.Collect(doc);
return;
}
}
m_curDocs = new int[nextLength];
m_cachedDocs.Add(m_curDocs);
curScores = new float[nextLength];
cachedScores.Add(curScores);
m_upto = 0;
}
m_curDocs[m_upto] = doc;
cachedScorer.score = curScores[m_upto] = scorer.GetScore();
m_upto++;
cachedScorer.doc = doc;
m_other.Collect(doc);
}
public override void Replay(ICollector other)
{
ReplayInit(other);
int curUpto = 0;
int curBase = 0;
int chunkUpto = 0;
m_curDocs = EMPTY_INT32_ARRAY;
foreach (SegStart seg in m_cachedSegs)
{
other.SetNextReader(seg.ReaderContext);
other.SetScorer(cachedScorer);
while (curBase + curUpto < seg.End)
{
if (curUpto == m_curDocs.Length)
{
curBase += m_curDocs.Length;
m_curDocs = m_cachedDocs[chunkUpto];
curScores = cachedScores[chunkUpto];
chunkUpto++;
curUpto = 0;
}
cachedScorer.score = curScores[curUpto];
cachedScorer.doc = m_curDocs[curUpto];
other.Collect(m_curDocs[curUpto++]);
}
}
}
public override void SetScorer(Scorer scorer)
{
this.scorer = scorer;
m_other.SetScorer(cachedScorer);
}
public override string ToString()
{
if (IsCached)
{
return "CachingCollector (" + (m_base + m_upto) + " docs & scores cached)";
}
else
{
return "CachingCollector (cache was cleared)";
}
}
}
/// <summary>
/// A <see cref="CachingCollector"/> which does not cache scores
/// </summary>
private sealed class NoScoreCachingCollector : CachingCollector
{
internal NoScoreCachingCollector(ICollector other, double maxRAMMB)
: base(other, maxRAMMB, false)
{
}
internal NoScoreCachingCollector(ICollector other, int maxDocsToCache)
: base(other, maxDocsToCache)
{
}
public override void Collect(int doc)
{
if (m_curDocs == null)
{
// Cache was too large
m_other.Collect(doc);
return;
}
// Allocate a bigger array or abort caching
if (m_upto == m_curDocs.Length)
{
m_base += m_upto;
// Compute next array length - don't allocate too big arrays
int nextLength = 8 * m_curDocs.Length;
if (nextLength > MAX_ARRAY_SIZE)
{
nextLength = MAX_ARRAY_SIZE;
}
if (m_base + nextLength > m_maxDocsToCache)
{
// try to allocate a smaller array
nextLength = m_maxDocsToCache - m_base;
if (nextLength <= 0)
{
// Too many docs to collect -- clear cache
m_curDocs = null;
m_cachedSegs.Clear();
m_cachedDocs.Clear();
m_other.Collect(doc);
return;
}
}
m_curDocs = new int[nextLength];
m_cachedDocs.Add(m_curDocs);
m_upto = 0;
}
m_curDocs[m_upto] = doc;
m_upto++;
m_other.Collect(doc);
}
public override void Replay(ICollector other)
{
ReplayInit(other);
int curUpto = 0;
int curbase = 0;
int chunkUpto = 0;
m_curDocs = EMPTY_INT32_ARRAY;
foreach (SegStart seg in m_cachedSegs)
{
other.SetNextReader(seg.ReaderContext);
while (curbase + curUpto < seg.End)
{
if (curUpto == m_curDocs.Length)
{
curbase += m_curDocs.Length;
m_curDocs = m_cachedDocs[chunkUpto];
chunkUpto++;
curUpto = 0;
}
other.Collect(m_curDocs[curUpto++]);
}
}
}
public override void SetScorer(Scorer scorer)
{
m_other.SetScorer(scorer);
}
public override string ToString()
{
if (IsCached)
{
return "CachingCollector (" + (m_base + m_upto) + " docs cached)";
}
else
{
return "CachingCollector (cache was cleared)";
}
}
}
// TODO: would be nice if a collector defined a
// needsScores() method so we can specialize / do checks
// up front. this is only relevant for the ScoreCaching
// version -- if the wrapped Collector does not need
// scores, it can avoid cachedScorer entirely.
protected readonly ICollector m_other;
protected readonly int m_maxDocsToCache;
private readonly IList<SegStart> m_cachedSegs = new List<SegStart>();
protected readonly IList<int[]> m_cachedDocs;
private AtomicReaderContext lastReaderContext;
protected int[] m_curDocs;
protected int m_upto;
protected int m_base;
protected int m_lastDocBase;
/// <summary>
/// Creates a <see cref="CachingCollector"/> which does not wrap another collector.
/// The cached documents and scores can later be replayed (<see cref="Replay(ICollector)"/>).
/// </summary>
/// <param name="acceptDocsOutOfOrder">
/// whether documents are allowed to be collected out-of-order </param>
public static CachingCollector Create(bool acceptDocsOutOfOrder, bool cacheScores, double maxRAMMB)
{
ICollector other = new CollectorAnonymousClass(acceptDocsOutOfOrder);
return Create(other, cacheScores, maxRAMMB);
}
private class CollectorAnonymousClass : ICollector
{
private readonly bool acceptDocsOutOfOrder;
public CollectorAnonymousClass(bool acceptDocsOutOfOrder)
{
this.acceptDocsOutOfOrder = acceptDocsOutOfOrder;
}
public virtual bool AcceptsDocsOutOfOrder => acceptDocsOutOfOrder;
public virtual void SetScorer(Scorer scorer)
{
}
public virtual void Collect(int doc)
{
}
public virtual void SetNextReader(AtomicReaderContext context)
{
}
}
/// <summary>
/// Create a new <see cref="CachingCollector"/> that wraps the given collector and
/// caches documents and scores up to the specified RAM threshold.
/// </summary>
/// <param name="other">
/// The <see cref="ICollector"/> to wrap and delegate calls to. </param>
/// <param name="cacheScores">
/// Whether to cache scores in addition to document IDs. Note that
/// this increases the RAM consumed per doc. </param>
/// <param name="maxRAMMB">
/// The maximum RAM in MB to consume for caching the documents and
/// scores. If the collector exceeds the threshold, no documents and
/// scores are cached. </param>
public static CachingCollector Create(ICollector other, bool cacheScores, double maxRAMMB)
{
return cacheScores ? (CachingCollector)new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
}
/// <summary>
/// Create a new <see cref="CachingCollector"/> that wraps the given collector and
/// caches documents and scores up to the specified max docs threshold.
/// </summary>
/// <param name="other">
/// The <see cref="ICollector"/> to wrap and delegate calls to. </param>
/// <param name="cacheScores">
/// Whether to cache scores in addition to document IDs. Note that
/// this increases the RAM consumed per doc. </param>
/// <param name="maxDocsToCache">
/// The maximum number of documents for caching the documents and
/// possible the scores. If the collector exceeds the threshold,
/// no documents and scores are cached. </param>
public static CachingCollector Create(ICollector other, bool cacheScores, int maxDocsToCache)
{
return cacheScores ? (CachingCollector)new ScoreCachingCollector(other, maxDocsToCache) : new NoScoreCachingCollector(other, maxDocsToCache);
}
// Prevent extension from non-internal classes
private CachingCollector(ICollector other, double maxRAMMB, bool cacheScores)
{
this.m_other = other;
m_cachedDocs = new List<int[]>();
m_curDocs = new int[INITIAL_ARRAY_SIZE];
m_cachedDocs.Add(m_curDocs);
int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT32;
if (cacheScores)
{
bytesPerDoc += RamUsageEstimator.NUM_BYTES_SINGLE;
}
m_maxDocsToCache = (int)((maxRAMMB * 1024 * 1024) / bytesPerDoc);
}
private CachingCollector(ICollector other, int maxDocsToCache)
{
this.m_other = other;
m_cachedDocs = new List<int[]>();
m_curDocs = new int[INITIAL_ARRAY_SIZE];
m_cachedDocs.Add(m_curDocs);
this.m_maxDocsToCache = maxDocsToCache;
}
public virtual bool AcceptsDocsOutOfOrder => m_other.AcceptsDocsOutOfOrder;
public virtual bool IsCached => m_curDocs != null;
public virtual void SetNextReader(AtomicReaderContext context)
{
m_other.SetNextReader(context);
if (lastReaderContext != null)
{
m_cachedSegs.Add(new SegStart(lastReaderContext, m_base + m_upto));
}
lastReaderContext = context;
}
// LUCENENET specific - we need to implement these here, since our abstract base class
// is now an interface.
/// <summary>
/// Called before successive calls to <see cref="Collect(int)"/>. Implementations
/// that need the score of the current document (passed-in to
/// <also cref="Collect(int)"/>), should save the passed-in <see cref="Scorer"/> and call
/// <see cref="Scorer.GetScore()"/> when needed.
/// </summary>
public abstract void SetScorer(Scorer scorer);
/// <summary>
/// Called once for every document matching a query, with the unbased document
/// number.
/// <para/>Note: The collection of the current segment can be terminated by throwing
/// a <see cref="CollectionTerminatedException"/>. In this case, the last docs of the
/// current <see cref="AtomicReaderContext"/> will be skipped and <see cref="IndexSearcher"/>
/// will swallow the exception and continue collection with the next leaf.
/// <para/>
/// Note: this is called in an inner search loop. For good search performance,
/// implementations of this method should not call <see cref="IndexSearcher.Doc(int)"/> or
/// <see cref="Lucene.Net.Index.IndexReader.Document(int)"/> on every hit.
/// Doing so can slow searches by an order of magnitude or more.
/// </summary>
public abstract void Collect(int doc);
/// <summary>
/// Reused by the specialized inner classes. </summary>
internal virtual void ReplayInit(ICollector other)
{
if (!IsCached)
{
throw new InvalidOperationException("cannot replay: cache was cleared because too much RAM was required");
}
if (!other.AcceptsDocsOutOfOrder && this.m_other.AcceptsDocsOutOfOrder)
{
throw new ArgumentException("cannot replay: given collector does not support " + "out-of-order collection, while the wrapped collector does. " + "Therefore cached documents may be out-of-order.");
}
//System.out.println("CC: replay totHits=" + (upto + base));
if (lastReaderContext != null)
{
m_cachedSegs.Add(new SegStart(lastReaderContext, m_base + m_upto));
lastReaderContext = null;
}
}
/// <summary>
/// Replays the cached doc IDs (and scores) to the given <see cref="ICollector"/>. If this
/// instance does not cache scores, then <see cref="Scorer"/> is not set on
/// <c>other.SetScorer(Scorer)</c> as well as scores are not replayed.
/// </summary>
/// <exception cref="InvalidOperationException">
/// If this collector is not cached (i.e., if the RAM limits were too
/// low for the number of documents + scores to cache). </exception>
/// <exception cref="ArgumentException">
/// If the given Collect's does not support out-of-order collection,
/// while the collector passed to the ctor does. </exception>
public abstract void Replay(ICollector other);
}
}