11using System . Diagnostics ;
22using System . IO . MemoryMappedFiles ;
3- using System . Numerics ;
43using System . Runtime . CompilerServices ;
54using System . Runtime . InteropServices ;
65using System . Runtime . Intrinsics ;
1110
1211namespace _1brc
1312{
14- public enum ProcessMode
15- {
16- Default ,
17- MmapSingle ,
18- MmapSingleSharedPos ,
19- MmapViewPerChunk ,
20- MmapViewPerChunkRandom ,
21- RandomAccess ,
22- RandomAccessAsync
23- }
24-
2513 public class App : IDisposable
2614 {
27- private readonly ProcessMode _processMode ;
2815 private readonly FileStream _fileStream ;
2916 private readonly MemoryMappedFile _mmf ;
3017 private readonly MemoryMappedViewAccessor ? _va ;
@@ -41,6 +28,7 @@ public class App : IDisposable
4128 private int _threadsFinished ;
4229 private readonly SafeFileHandle _fileHandle ;
4330 private readonly List < ( long start , long length ) > _chunks ;
31+ private readonly bool _useMmap ;
4432
4533 private const byte LF = ( byte ) '\n ' ;
4634
@@ -52,13 +40,10 @@ public class App : IDisposable
5240
5341 public string FilePath { get ; }
5442
55- public unsafe App ( string filePath , int ? threadCount = null , ProcessMode processMode = ProcessMode . Default )
43+ public unsafe App ( string filePath , int ? threadCount = null , bool ? useMmap = null )
5644 {
57- if ( processMode == default )
58- processMode = RuntimeInformation . IsOSPlatform ( OSPlatform . Windows ) ? ProcessMode . RandomAccessAsync : ProcessMode . MmapSingleSharedPos ;
59-
60- _processMode = processMode ;
61-
45+ _useMmap = useMmap ?? ! RuntimeInformation . IsOSPlatform ( OSPlatform . Windows ) ;
46+
6247 _threadCount = Math . Max ( 1 , threadCount ?? Environment . ProcessorCount ) ;
6348#if DEBUG
6449 _threadCount = 1 ;
@@ -73,7 +58,7 @@ public unsafe App(string filePath, int? threadCount = null, ProcessMode processM
7358
7459 _leftoverPtr = Marshal . AllocHGlobal ( LEFTOVER_CHUNK_ALLOC ) ;
7560
76- if ( _processMode == ProcessMode . MmapSingle || _processMode == ProcessMode . MmapSingleSharedPos )
61+ if ( _useMmap )
7762 {
7863 byte * ptr = ( byte * ) 0 ;
7964 _va = _mmf . CreateViewAccessor ( 0 , fileLength , MemoryMappedFileAccess . Read ) ;
@@ -160,33 +145,14 @@ public unsafe App(string filePath, int? threadCount = null, ProcessMode processM
160145 /// <summary>
161146 /// PLINQ Entry point
162147 /// </summary>
163- public unsafe FixedDictionary < Utf8Span , Summary > ThreadProcessChunk ( int id , long start , long length )
148+ public unsafe FixedDictionary < Utf8Span , Summary > ThreadProcessChunk ( long start , long length )
164149 {
165150 var threadResult = new FixedDictionary < Utf8Span , Summary > ( ) ;
166151
167- switch ( _processMode )
168- {
169- case ProcessMode . MmapSingle :
170- ProcessChunkMmapSingle ( threadResult , start , length ) ;
171- break ;
172- case ProcessMode . MmapSingleSharedPos :
173- ProcessChunkMmapSingleSharedPos ( threadResult , start , length ) ;
174- break ;
175- case ProcessMode . MmapViewPerChunk :
176- ProcessChunkMmapViewPerChunk ( threadResult , start , length ) ;
177- break ;
178- case ProcessMode . MmapViewPerChunkRandom :
179- ProcessChunkMmapViewPerChunkRandom ( threadResult , start , length , id ) ;
180- break ;
181- case ProcessMode . RandomAccess :
182- ProcessChunkRandomAccess ( threadResult , start , length ) ;
183- break ;
184- case ProcessMode . RandomAccessAsync :
185- ProcessChunkRandomAccessAsync ( threadResult , start , length ) ;
186- break ;
187- default :
188- throw new ArgumentOutOfRangeException ( ) ;
189- }
152+ if ( _useMmap )
153+ ProcessChunkMmap ( threadResult , start , length ) ;
154+ else
155+ ProcessChunkRandomAccess ( threadResult , start , length ) ;
190156
191157#if DEBUG
192158 Console . WriteLine ( $ "Thread { id } finished at: { DateTime . UtcNow : hh:mm:ss.ffffff} ") ;
@@ -205,13 +171,8 @@ public unsafe FixedDictionary<Utf8Span, Summary> ThreadProcessChunk(int id, long
205171
206172 return threadResult ;
207173 }
208-
209- public unsafe void ProcessChunkMmapSingle ( FixedDictionary < Utf8Span , Summary > resultAcc , long start , long length )
210- {
211- ProcessSpan ( resultAcc , new Utf8Span ( ( byte * ) _pointer + start , ( nuint ) length ) ) ;
212- }
213-
214- public unsafe void ProcessChunkMmapSingleSharedPos ( FixedDictionary < Utf8Span , Summary > resultAcc , long _ , long __ )
174+
175+ public unsafe void ProcessChunkMmap ( FixedDictionary < Utf8Span , Summary > resultAcc , long _ , long __ )
215176 {
216177 const int SEGMENT_SIZE = 4 * 1024 * 1024 ;
217178
@@ -251,45 +212,6 @@ public unsafe void ProcessChunkMmapSingleSharedPos(FixedDictionary<Utf8Span, Sum
251212 }
252213 }
253214
254- public unsafe void ProcessChunkMmapViewPerChunk( FixedDictionary < Utf8Span , Summary > resultAcc , long start , long length )
255- {
256- using var accessor = _mmf. CreateViewAccessor ( start , length + _leftoverLength , MemoryMappedFileAccess . Read ) ;
257- byte * ptr = default ;
258- accessor. SafeMemoryMappedViewHandle . AcquirePointer ( ref ptr ) ;
259- ProcessSpan( resultAcc , new Utf8Span ( ptr + accessor . PointerOffset , ( nuint ) length ) ) ;
260- accessor. SafeMemoryMappedViewHandle . ReleasePointer ( ) ;
261- }
262-
263- public unsafe void ProcessChunkMmapViewPerChunkRandom( FixedDictionary < Utf8Span , Summary > resultAcc , long start , long length , int id )
264- {
265- var ratio = ( double ) ( id + 1 ) / _chunks . Count ;
266- var delta = ( long ) ( 0.49 * length * ratio ) ;
267- var length0 = length / 2 + delta ;
268- using ( var accessor = _mmf. CreateViewAccessor ( start , length0 + 1024 , MemoryMappedFileAccess . Read ) )
269- {
270- byte * ptr = default ;
271- accessor. SafeMemoryMappedViewHandle. AcquirePointer( ref ptr ) ;
272- ptr += accessor . PointerOffset ;
273-
274- var span = new Span < byte > ( ptr + length0 - 1024 , 1024 ) ;
275- length0 = length0 - 1024 + span . LastIndexOf ( LF ) + 1 ;
276-
277- ProcessSpan( resultAcc , new Utf8Span ( ptr , ( nuint ) length0 ) ) ;
278- accessor. SafeMemoryMappedViewHandle . ReleasePointer ( ) ;
279- }
280-
281- using ( var accessor = _mmf. CreateViewAccessor ( start + length0 , ( length - length0 ) + _leftoverLength , MemoryMappedFileAccess . Read ) )
282- {
283- length0 = ( length - length0) ;
284- byte * ptr = default ;
285- accessor. SafeMemoryMappedViewHandle. AcquirePointer( ref ptr) ;
286- ptr += accessor. PointerOffset;
287-
288- ProcessSpan( resultAcc, new Utf8Span( ptr, ( nuint ) length0) ) ;
289- accessor. SafeMemoryMappedViewHandle. ReleasePointer( ) ;
290- }
291- }
292-
293215 public unsafe void ProcessChunkRandomAccess( FixedDictionary < Utf8Span , Summary > resultAcc , long start , long length )
294216 {
295217 using var fileHandle = File. OpenHandle ( FilePath , FileMode . Open , FileAccess . Read , FileShare . ReadWrite , FileOptions . SequentialScan ) ;
@@ -320,49 +242,6 @@ public unsafe void ProcessChunkRandomAccess(FixedDictionary<Utf8Span, Summary> r
320242 }
321243 }
322244
323- public void ProcessChunkRandomAccessAsync ( FixedDictionary < Utf8Span , Summary > resultAcc , long start , long length )
324- {
325- using var fileHandle = File . OpenHandle ( FilePath , FileMode . Open , FileAccess . Read , FileShare . ReadWrite , FileOptions . Asynchronous | FileOptions . SequentialScan ) ;
326- const int SEGMENT_SIZE = 512 * 1024 ;
327-
328- byte [ ] buffer0 = GC. AllocateArray< byte > ( SEGMENT_SIZE + MAX_LINE_SIZE , pinned : true) ;
329- byte [ ] buffer1 = GC . AllocateArray < byte > ( SEGMENT_SIZE + MAX_LINE_SIZE , pinned : true ) ;
330-
331- Task. Run ( async ( ) =>
332- {
333- var chunkRemaining = length ;
334-
335- ValueTask < int > segmentReadTask = ValueTask . FromResult ( 0 ) ;
336-
337- var awaitedZero = true ;
338-
339- while ( chunkRemaining > 0 )
340- {
341- var bufferReadLen = await segmentReadTask ;
342-
343- Memory < byte > memoryRead = ( awaitedZero ? buffer0 : buffer1 ) . AsMemory ( 0 , bufferReadLen ) ;
344-
345- var segmentConsumed = memoryRead . Span . Slice ( 0 , bufferReadLen ) . LastIndexOf ( LF ) + 1 ;
346- chunkRemaining -= segmentConsumed ;
347- start += segmentConsumed ;
348-
349- int bufferLengthToRead = ( int ) Math . Min ( SEGMENT_SIZE , chunkRemaining ) ;
350-
351- awaitedZero = ! awaitedZero ;
352- var memoryToRead = ( awaitedZero ? buffer0 : buffer1 ) . AsMemory ( 0 , bufferLengthToRead ) ;
353-
354- segmentReadTask = RandomAccess . ReadAsync ( fileHandle , memoryToRead , start ) ;
355-
356- if ( segmentConsumed > 0 )
357- {
358- var remaining = new Utf8Span ( ref memoryRead . Span [ 0 ] , ( uint ) ( segmentConsumed ) ) ;
359- ProcessSpan ( resultAcc , remaining ) ;
360-
361- }
362- }
363- } ) . Wait ( ) ;
364- }
365-
366245 [ MethodImpl ( MethodImplOptions . NoInlining ) ]
367246 public static unsafe void ProcessSpan( FixedDictionary < Utf8Span , Summary > result , Utf8Span remaining )
368247 {
@@ -429,7 +308,7 @@ public FixedDictionary<Utf8Span, Summary> Process()
429308#if DEBUG
430309 . WithDegreeOfParallelism ( 1 )
431310#endif
432- . Select( ( tuple, id ) => ThreadProcessChunk ( id , tuple . start , tuple . length ) )
311+ . Select ( ( tuple ) => ThreadProcessChunk( tuple. start, tuple . length ) )
433312 . Aggregate( ( result, chunk ) =>
434313 {
435314 foreach ( KeyValuePair < Utf8Span , Summary > pair in chunk )
0 commit comments