diff --git a/entries/abouchez/README.md b/entries/abouchez/README.md index e5b4bdb..584be6b 100644 --- a/entries/abouchez/README.md +++ b/entries/abouchez/README.md @@ -52,22 +52,22 @@ Here are the main ideas behind this implementation proposal: - The entire 16GB file is `memmap`ed at once into memory - it won't work on 32-bit OS, but avoid any `read` syscall or memory copy; - File is processed in parallel using several threads - configurable via the `-t=` switch, default being the total number of CPUs reported by the OS; - Input is fed into each thread as 8MB chunks (see also the `-c` command line switch): because thread scheduling is unbalanced, it is inefficient to pre-divide the size of the whole input file into the number of threads; -- Each thread manages its own `fStation[]` data, so there is no lock until the thread is finished and data is consolidated; -- Each `fStation[]` information is packed into a record of 12 bytes, with no external pointer/string, to leverage the CPU L1 cache size (64 bytes) for efficiency; -- A shared O(1) hash table is maintained for the name lookup, with crc32c perfect hash function - no name comparison nor storage is needed with a perfect hash (see below); -- On Intel/AMD/AARCH64 CPUs, *mORMot* offers hardware SSE4.2 opcodes for this crc32c computation; +- Each thread manages its own `fStation[]` data and hash table, so there is no lock until the thread is finished and data is consolidated; +- Each `fStation[]` information is packed into a record of 16 bytes, with no external pointer/string, to leverage the CPU L1 cache size (64 bytes) for efficiency; +- Each thread maintains its own hash table for the name lookup, with crc32c perfect hash function - no name comparison nor storage is required with a perfect hash (see below); +- On Intel/AMD/AARCH64 CPUs, *mORMot* offers hardware SSE4.2 opcodes for this `crc32c` computation, and we wrote a dedicated `asm` function on Linux x86_64; - The hash table does not directly store the `fStation[]` data, but use a shared `fHash[]` lookup array of 16-bit indexes (as our `TDynArray` does) to leverage the CPU caches; -- This name hash table is shared between threads to favor CPU cache locality, and contention only occurs at startup, the first time a station name is encountered; - Values are stored as 16-bit or 32-bit integers, as temperature multiplied by 10; - Temperatures are parsed with a dedicated code (expects single decimal input values); - The station names are stored as UTF-8 pointers to the `memmap`ed location where they appear first, in `fNameText[]`, to be emitted eventually for the final output, not during temperature parsing; - No memory allocation (e.g. no transient `string` or `TBytes`) nor any syscall is done during the parsing process to reduce contention and ensure the process is only CPU-bound and RAM-bound (we checked this with `strace` on Linux); - Pascal code was tuned to generate the best possible asm output on FPC x86_64 (which is our target) - perhaps making it less readable, because we used pointer arithmetics when it matters (I like to think as such low-level pascal code as [portable assembly](https://sqlite.org/whyc.html#performance) similar to "unsafe" code in managed languages); -- We even tried an optimized SSE2 asm sub-function for searching the name `';'` delimiter - which is a O(n) part of the process, and in practice... it was slower than a slightly unrolled pure pascal inlined loop; +- We even tried an optimized SSE2 asm sub-function for searching the name `';'` delimiter - which is a O(n) part of the process, and in practice... it was slower than a naive pure pascal `while` loop; +- We tried to share the name hash table between the threads: it was faster on Intel CPUs, but not on the benchmark AMD Zen 3 hardware (see below); - It can optionally output timing statistics and resultset hash value on the console to debug and refine settings (with the `-v` command line switch); - It can optionally set each thread affinity to a single core (with the `-a` command line switch). -If you are not convinced by the "perfect hash" trick, you can define the `NOPERFECTHASH` conditional, which forces full name comparison, but is noticeably slower. Our algorithm is safe with the official dataset, and gives the expected final result - which was the goal of this challenge: compute the right data reduction with as little time as possible, with all possible hacks and tricks. A "perfect hash" is a well known hacking pattern, when the dataset is validated in advance. And since our CPUs offers `crc32c` which is perfect for our dataset... let's use it! https://en.wikipedia.org/wiki/Perfect_hash_function ;) +If you are not convinced by the "perfect hash" trick, you can define the `NOPERFECTHASH` conditional, which forces full name comparison, but is noticeably slower. Our algorithm is safe with the official dataset, and gives the expected final result - which was the goal of this challenge: compute the right data reduction with as little time as possible, with all possible hacks and tricks. A "perfect hash" is a well known hacking pattern, when the dataset is validated in advance. We can imagine that if a new weather station appear, we can check for any collision. And since our CPUs offers `crc32c` which is perfect for our dataset... let's use it! https://en.wikipedia.org/wiki/Perfect_hash_function ;) ## Why L1 Cache Matters @@ -77,13 +77,13 @@ The L1 cache is well known in the performance hacking litterature to be the main Count and Sum values can fit in 32-bit `integer` fields (with a range of about 2 billions signed values), whereas min/max values have been reduced as 16-bit `smallint` - resulting in temperature range of -3276.7..+3276.8 celsius grads. It seems fair on our galaxy according to the IPCC. ;) -As a result, each `fStation[]` entry takes only 12 bytes, so we can fit exactly several entries in a single CPU L1 cache line. To be accurate, if we put some more data into the record (e.g. use `Int64` instead of `smallint`/`integer`), the performance degrades only for a few percents. The main fact seems to be that the entry is likely to fit into a single cache line, even if filling two cache lines may be sometimes needed for misaligned data. +As a result, each `fStation[]` entry takes only 16 bytes, so we can fit exactly four entries in a single CPU L1 cache line. To be accurate, if we put some more data into the record (e.g. use `Int64` instead of `smallint`/`integer`), the performance degrades only for a few percents. The main fact seems to be that the entry is likely to fit into a single cache line, even if filling two cache lines may be sometimes needed for misaligned data. In our first attempt (see "Old Version" below), we stored the name into the `Station[]` array, so that each entry is 64 bytes long exactly. But since `crc32c` is a perfect hash function for our dataset, it is enough to just store the 32-bit hash instead in a shared `fNameHash[]` array, and not the actual name. Less data would mean less cache size involved. -We tried to remove the `fHash[]` array of `word` lookup table. It made one data read less, but performed almost three times slower. Data locality and cache pollution prevails on absolute number of memory reads. It is faster to access twice the memory, if this memory could remain in the CPU caches. Only profiling and timing would show this. The shortest code is not the fastest with modern CPUs. +We tried to remove the `fHash[]` array of `word` lookup table. It made one data read less, but performed several times slower. Data locality and cache pollution prevails on absolute number of memory reads. It is faster to access twice the memory, if this memory could remain in the CPU caches. Only profiling and timing would show this. The shortest code is not the fastest with modern CPUs. -Note that if we reduce the number of stations from 41343 to 400 (as other languages 1brc projects do), the performance is higher, also with a 16GB file as input. My guess is that since 400x12 = 4800, each dataset could fit entirely in each core L1 cache. No slower L2/L3 cache is involved, therefore performance is better. +Note that if we reduce the number of stations from 41343 to 400 (as other languages 1brc projects do), the performance is higher, also with a 16GB file as input. My guess is that since 400x16 = 6400, each dataset could fit entirely in each core L1 cache. No slower L2/L3 cache is involved, therefore performance is better. Once again, some reference material is available at https://en.algorithmica.org/hpc/cpu-cache/ including some mind-blowing experiment [about cache associativity](https://en.algorithmica.org/hpc/cpu-cache/associativity/). I told you CPUs were complex! :D @@ -165,14 +165,14 @@ Affinity may help on Ryzen 9, because its Zen 3 architecture is made of identica The `-v` verbose mode makes such testing easy. The `hash` value can quickly check that the generated output is correct, and that it is valid `utf8` content (as expected). -To be accurate, here is the current state of our code on my local machine: +To be accurate, here are result of our code on my local machine, when we share the hash table: ``` ab@dev:~/dev/github/1brc-ObjectPascal/bin$ ./abouchez data.csv -v Processing data.csv with 20 threads, 8MB chunks and affinity=0 result hash=85614446, result length=1139413, stations count=41343, valid utf8=1 done in 1.82s 8.6 GB/s ``` -So we were able to make some progress between iterations - especially by sharing the hash table between threads. +So sharing the hash table was really faster on my Intel PC. But it made no difference on the AMD CPU and its 64MB L3 cache, so [we eventually disabled it for this challenge](#why-l3-cache-matters). ## Benchmark Integration @@ -237,6 +237,32 @@ So it sounds like if we could just run the benchmark with the `-t=32` option, an The Ryzen CPU has 16 cores with 32 threads, and it makes sense that each thread only have to manage a small number of data per item (a 16 bytes `Station[]` item), so we could leverage all cores and threads. +## Why L3 Cache Matters + +We tried to share the hash table between the threads, to reduce the cache pollution. In the version same `src` sub-folder, you will find this particular version, as `brcmormotoldsharedht.lpr`. + +On our Intel computer, it was noticeably faster. But on the reference AMD computer used by this benchmark... it was not. + +Theoretically speaking, it may come from cache size differences: on Zen 3 the cache is bigger, so the hash table can stay in cache without being shared. Whereas on our Intel CPUs, smaller cache means sharing the hash table has a benefit. Let's verify. + +In fact, if we compare the two machines: + +Ryzen 9 5950X caches are https://www.techpowerup.com/cpu-specs/ryzen-9-5950x.c2364 +- L1 64KB per core, L2 512KB per core, L3 64MB shared + +Intel core i5 13500 caches are https://www.techpowerup.com/cpu-specs/core-i5-13500.c2851 +- P-cores: L1 80KB per core, L2 1.25MB per core, L3 24MB shared +- E-cores: L1 96KB per core, L2 2MB per module + +So, L1/L2 cache seems to be too small for our process, on both CPUs. So the L3 cache seems to be the bottleneck. And L3 cache is bigger on the AMD, and all our data is likely to fit in it . +- Without sharing, the per-thread temperature data size is 16 bytes * 43413 stations * 32 threads = 21 MB. +- With sharing, the per-thread temperature data size is 12 bytes * 43413 stations * 32 threads = 16 MB. +- If we include the 16-bit jumpers, and cache pollution due to its huge size (there are void slots), we can easily add 4-8 MB of polluted cache, per hash table. + +The work dataset seems to fit within the L3 cache size of 64MB of the Ryzen CPU, if we share the hash table or not. Whereas on our Intel versions, we are likely to saturate the L3 cache if we don't share the hash table. + +My initial guess what that maintaining a hash table for each thread could be a good idea. And it seems it was on the Ryzen CPU, but not on Intels. +So we just disabled this shared hash table name for now. ## Notes about the "Old" Version diff --git a/entries/abouchez/src/brcmormot.lpi b/entries/abouchez/src/brcmormot.lpi index bc69589..7b0d1a1 100644 --- a/entries/abouchez/src/brcmormot.lpi +++ b/entries/abouchez/src/brcmormot.lpi @@ -62,7 +62,6 @@ - diff --git a/entries/abouchez/src/brcmormot.lpr b/entries/abouchez/src/brcmormot.lpr index c41ba1f..f566456 100644 --- a/entries/abouchez/src/brcmormot.lpr +++ b/entries/abouchez/src/brcmormot.lpr @@ -1,9 +1,6 @@ /// MIT code (c) Arnaud Bouchez, using the mORMot 2 framework program brcmormot; -{.$define NOPERFECTHASH} -// you can define this conditional to force name comparison (slower) - {$I mormot.defines.inc} {$ifdef OSWINDOWS} @@ -23,33 +20,39 @@ mormot.core.data; type - // a weather station info, using 12 bytes + // a weather station info, using 1/4rd of a CPU L1 cache line (64/4=16 bytes) TBrcStation = packed record + NameHash: cardinal; // crc32c perfect hash of the name Sum, Count: integer; // we ensured no overflow occurs with 32-bit range Min, Max: SmallInt; // 16-bit (-32767..+32768) temperatures * 10 end; PBrcStation = ^TBrcStation; - TBrcStations = array of TBrcStation; + + TBrcList = record + public + StationHash: array of word; // store 0 if void, or Station[] index + 1 + Station: array of TBrcStation; + StationName: array of PUtf8Char; // directly point to input memmap file + Count: PtrInt; + procedure Init(max: integer); + function Add(h32: PtrUInt; name: PUtf8Char; h: PtrUInt): PBrcStation; + function Search(name: pointer; h32: cardinal): PBrcStation; inline; + end; TBrcMain = class protected - fSafe: TOSLightLock; // TLightLock seems to make no difference - fHash: array of word; // store 0 if void, or fStation[] index + 1 - fNameHash: array of cardinal; // crc32c perfect hash of the name - fNameText: array of PUtf8Char; // directly point to the input memmaped file - fStation: TBrcStations; // aggregated storage - fCount: PtrInt; // = length(fNameHash/fNameText/fStation) - fEvent: TSynEvent; // to wake up the main thread when finished - fRunning, fCapacity: integer; - fCurrentChunk: PByteArray; + fSafe: TOSLightLock; + fEvent: TSynEvent; + fRunning, fMax: integer; + fCurrentChunk: PUtf8Char; fCurrentRemain, fChunkSize: PtrUInt; + fList: TBrcList; fMem: TMemoryMap; - function GetChunk(out start, stop: PByteArray): boolean; - function Search(name: pointer; namelen: PtrUInt): PtrInt; - procedure Aggregate(const another: TBrcStations); + procedure Aggregate(const another: TBrcList); + function GetChunk(out start, stop: PUtf8Char): boolean; public - constructor Create(const datafile: TFileName; - threads, chunkmb, capacity: integer; affinity: boolean); + constructor Create(const fn: TFileName; threads, chunkmb, max: integer; + affinity: boolean); destructor Destroy; override; procedure WaitFor; function SortedText: RawUtf8; @@ -58,91 +61,176 @@ TBrcMain = class TBrcThread = class(TThread) protected fOwner: TBrcMain; - fStation: TBrcStations; // per-thread storage + fList: TBrcList; // each thread work on its own list procedure Execute; override; public constructor Create(owner: TBrcMain); end; -{$ifdef FPC_CPUX64_disabled_slower} -function NameLen(p: PUtf8Char): PtrInt; assembler; nostackframe; +{ optimized crc32c-based hash function } + +const + HASHSIZE = 1 shl 17; // slightly oversized to avoid most collisions + +{$ifdef OSLINUXX64} + +function NameHash(buf, bufend: pointer): PtrUInt; nostackframe; assembler; asm - lea rdx, qword ptr [p + 2] - movaps xmm0, oword ptr [rip + @chr] - movups xmm1, oword ptr [rdx] // check first 16 bytes - pcmpeqb xmm1, xmm0 - pmovmskb eax, xmm1 - bsf eax, eax - jnz @found -@by16: add rdx, 16 - movups xmm1, oword ptr [rdx] // next 16 bytes - pcmpeqb xmm1, xmm0 - pmovmskb eax, xmm1 - bsf eax, eax - jz @by16 -@found: add rax, rdx // point to exact match - sub rax, p // return position - ret - align 16 -@chr: dq $3b3b3b3b3b3b3b3b // xmm0 of ';' - dq $3b3b3b3b3b3b3b3b + // rdi=buf rsi=bufend + xor eax, eax + lea rdx, [rsi - 8] // rdx = last 8 bytes (may overlap) + sub rsi, rdi + mov ecx, esi // esi = ecx = buflen + shr esi, 3 + jz @less8 + crc32 rax, qword ptr [rdi] + crc32 rax, qword ptr [rdx] // branchless for 8..16 bytes + dec esi + ja @more8 +@end8: ret +@more8: crc32 rax, qword ptr [rdi + 8] // 17..23 bytes + dec esi + jbe @end8 + crc32 rax, qword ptr [rdi + 16] // 24..xx bytes + ret +@less8: test cl, 4 + jz @less4 + crc32 eax, dword ptr [rdi] + crc32 eax, dword ptr [rdx + 4] // 4..7 bytes + ret +@less4: crc32 eax, word ptr [rdi] + crc32 eax, word ptr [rdx + 6] // 2..3 bytes end; + {$else} -function NameLen(p: PUtf8Char): PtrInt; inline; + +function NameHash(buf, bufend: pointer): PtrUInt; inline; +begin + result := crc32c(0, buf, PAnsiChar(bufend) - buf); // sse4.2/armv8 mormot code +end; + +{$endif OSLINUXX64} + + +{ TBrcList } + +procedure TBrcList.Init(max: integer); +begin + assert(max <= high(StationHash[0])); + SetLength(Station, max); + SetLength(StationHash, HASHSIZE); + SetLength(StationName, max); +end; + +function TBrcList.Add(h32: PtrUInt; name: PUtf8Char; h: PtrUInt): PBrcStation; +begin + assert(Count < length(Station)); + StationName[Count] := name; + result := @Station[Count]; + inc(Count); + StationHash[h] := Count; + result^.NameHash := h32; + result^.Min := high(result^.Min); + result^.Max := low(result^.Max); +end; + +function TBrcList.Search(name: pointer; h32: cardinal): PBrcStation; +var + h, x: PtrUInt; +begin + h := h32; + repeat + h := h and (HASHSIZE - 1); + x := StationHash[h]; + if x = 0 then + break; // void slot + result := @Station[x - 1]; + if result^.NameHash = h32 then + exit; // found this perfect hash = found this station name + inc(h); // hash modulo collision: linear probing + until false; + result := Add(h32, name, h); +end; + + +{ TBrcThread } + +constructor TBrcThread.Create(owner: TBrcMain); +begin + fOwner := owner; + FreeOnTerminate := true; + fList.Init(fOwner.fMax); + InterlockedIncrement(fOwner.fRunning); + inherited Create({suspended=}false); +end; + +procedure TBrcThread.Execute; +var + start, stop: PUtf8Char; + p: PByteArray; + s: PBrcStation; + neg: PtrInt; + v: integer; begin - result := 2; - while true do - if p[result] <> ';' then - if p[result + 1] <> ';' then - if p[result + 2] <> ';' then - if p[result + 3] <> ';' then - if p[result + 4] <> ';' then - if p[result + 5] <> ';' then - inc(result, 6) - else - exit(result + 5) - else - exit(result + 4) - else - exit(result + 3) - else - exit(result + 2) + while fOwner.GetChunk(start, stop) do + begin + // parse this thread chunk + p := @start[2]; + repeat + // parse the name; and retrieve the corresponding station slot + while p[0] <> ord(';') do + inc(PByte(p)); + s := fList.Search(start, NameHash(start, p)); + inc(PByte(p)); + // parse the temperature (as -12.3 -3.4 5.6 78.9 patterns) into value * 10 + neg := 1; + if p[0] = ord('-') then + begin + inc(PByte(p)); + neg := -1; + end; + if p[2] = ord('.') then // xx.x + begin + v := (p[0] * 100 + p[1] * 10 + p[3] - (ord('0') * 111)) * neg; + inc(PByte(p), 8); + end else - exit(result + 1) - else - exit; - // this small (unrolled) inlined loop is faster than a SSE2 function :) + begin + v := (p[0] * 10 + p[2] - (ord('0') * 11)) * neg; // x.x + inc(PByte(p), 7); + end; + start := PUtf8Char(p) - 2; + // store the value + inc(s^.Sum, v); + inc(s^.Count); + if v < s^.Min then + s^.Min := v; // branchless cmovg/cmovl is not better + if v > s^.Max then + s^.Max := v; + until PUtf8Char(p) >= stop; + end; + // aggregate this thread values into the main list + fOwner.Aggregate(fList); end; -{$endif FPC_CPUX64} { TBrcMain } -const - HASHSIZE = 1 shl 19; // slightly oversized to avoid most collisions - -constructor TBrcMain.Create(const datafile: TFileName; - threads, chunkmb, capacity: integer; affinity: boolean); +constructor TBrcMain.Create(const fn: TFileName; threads, chunkmb, max: integer; + affinity: boolean); var i, cores, core: integer; one: TBrcThread; begin - // init thread-safety markers - fSafe.Init; fEvent := TSynEvent.Create; - // map the file into memory and prepare memory chunks - if not fMem.Map(datafile) then - raise ESynException.CreateUtf8('Impossible to find %', [datafile]); + fSafe.Init; + if not fMem.Map(fn) then + raise ESynException.CreateUtf8('Impossible to find %', [fn]); + fMax := max; fChunkSize := chunkmb shl 20; fCurrentChunk := pointer(fMem.Buffer); fCurrentRemain := fMem.Size; - // initialize the stations name hash table - fCapacity := capacity; - SetLength(fHash, HASHSIZE); - SetLength(fNameHash, capacity); - SetLength(fNameText, capacity); - // launch the threads with optional thread affinity core := 0; cores := SystemInfo.dwNumberOfProcessors; for i := 0 to threads - 1 do @@ -161,49 +249,11 @@ destructor TBrcMain.Destroy; begin inherited Destroy; fMem.UnMap; - fEvent.Free; fSafe.Done; + fEvent.Free; end; -function TBrcMain.Search(name: pointer; namelen: PtrUInt): PtrInt; -var - h32: cardinal; - h: PtrUInt; -begin - h32 := crc32c(0, name, namelen); - h := h32; - repeat - repeat - h := h and (HASHSIZE - 1); - result := fHash[h]; - if result = 0 then - break; // void slot - dec(result); - if fNameHash[result] = h32 then - {$ifdef NOPERFECTHASH} - if MemCmp(pointer(fNameText[result]), name, namelen + 1) = 0 then - {$endif NOPERFECTHASH} - exit; // found this perfect hash = found this name - inc(h); // hash modulo collision: linear probing - until false; - // void slot: try to add now - fSafe.Lock; - if fHash[h] <> 0 then - result := -1 // race condition - else - begin - result := fCount; - assert(result < fCapacity); - inc(fCount); - fNameHash[result] := h32; - fNameText[result] := name; - fHash[h] := fCount; // should be last - end; - fSafe.UnLock; - until result >= 0; -end; - -function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; +function TBrcMain.GetChunk(out start, stop: PUtf8Char): boolean; var chunk: PtrUInt; begin @@ -215,14 +265,14 @@ function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; start := fCurrentChunk; if chunk > fChunkSize then begin - stop := pointer(GotoNextLine(pointer(@start[fChunkSize]))); - chunk := PAnsiChar(stop) - PAnsiChar(start); + stop := GotoNextLine(start + fChunkSize); + chunk := stop - start; end else begin stop := @start[chunk]; - while PAnsiChar(stop)[-1] <= ' ' do - dec(PByte(stop)); // ensure final stop at meaningful char + while stop[-1] <= ' ' do + dec(stop); // ensure final chunk stops at meaningful char end; dec(fCurrentRemain, chunk); fCurrentChunk := @fCurrentChunk[chunk]; @@ -231,37 +281,44 @@ function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; fSafe.UnLock; end; -procedure TBrcMain.Aggregate(const another: TBrcStations); +function NameEnd(p: PUtf8Char): PUtf8Char; inline; +begin + result := p; + repeat + inc(result); + until result^ = ';'; +end; + +procedure TBrcMain.Aggregate(const another: TBrcList); var n: integer; s, d: PBrcStation; + p: PPUtf8Char; begin fSafe.Lock; // several TBrcThread may finish at the same time - if fStation = nil then - fStation := another + if fList.Count = 0 then + fList := another else begin - n := fCount; - s := pointer(another); - d := pointer(fStation); + n := another.Count; + s := pointer(another.Station); + p := pointer(another.StationName); repeat - if s^.Count <> 0 then - begin - inc(d^.Count, s^.Count); - inc(d^.Sum, s^.Sum); - if s^.Max > d^.Max then - d^.Max := s^.Max; - if s^.Min < d^.Min then - d^.Min := s^.Min; - end; + d := fList.Search(p^, NameHash(p^, NameEnd(p^))); + inc(d^.Count, s^.Count); + inc(d^.Sum, s^.Sum); + if s^.Max > d^.Max then + d^.Max := s^.Max; + if s^.Min < d^.Min then + d^.Min := s^.Min; inc(s); - inc(d); + inc(p); dec(n); until n = 0; end; fSafe.UnLock; if InterlockedDecrement(fRunning) = 0 then - fEvent.SetEvent; // all threads finished: release main console thread + fEvent.SetEvent; // all threads finished: release WaitFor method end; procedure TBrcMain.WaitFor; @@ -329,10 +386,10 @@ function TBrcMain.SortedText: RawUtf8; tmp: TTextWriterStackBuffer; begin // compute the sorted-by-name indexes of all stations - c := fCount; + c := fList.Count; assert(c <> 0); DynArraySortIndexed( - pointer(fNameText), SizeOf(PUtf8Char), c, ndx, ByStationName); + pointer(fList.StationName), SizeOf(PUtf8Char), c, ndx, ByStationName); try // generate output FastSetString(result, nil, 1200000); // pre-allocate result @@ -343,10 +400,10 @@ function TBrcMain.SortedText: RawUtf8; w.Add('{'); n := ndx.buf; repeat - s := @fStation[n^]; + s := @fList.Station[n^]; assert(s^.Count <> 0); - p := fNameText[n^]; - w.AddNoJsonEscape(p, NameLen(p)); + p := fList.StationName[n^]; + w.AddNoJsonEscape(p, NameEnd(p) - p); AddTemp(w, '=', s^.Min); AddTemp(w, '/', ceil(s^.Sum / s^.Count)); // average AddTemp(w, '/', s^.Max); @@ -370,80 +427,6 @@ function TBrcMain.SortedText: RawUtf8; end; end; - -{ TBrcThread } - -constructor TBrcThread.Create(owner: TBrcMain); -begin - fOwner := owner; - SetLength(fStation, fOwner.fCapacity); - InterlockedIncrement(fOwner.fRunning); - FreeOnTerminate := true; - inherited Create({suspended=}false); -end; - -procedure TBrcThread.Execute; -var - p, start, stop: PByteArray; - v, m: integer; - l, neg: PtrInt; - s: PBrcStation; -begin - while fOwner.GetChunk(start, stop) do - begin - // parse this thread chunk - p := start; - repeat - // parse the name; and find the corresponding station data - l := NameLen(pointer(p)); - p := @p[l + 1]; // + 1 to ignore ; - s := @fStation[fOwner.Search(start, l)]; - // parse the temperature (as -12.3 -3.4 5.6 78.9 patterns) into value * 10 - if p[0] = ord('-') then - begin - neg := -1; - p := @p[1]; - end - else - neg := 1; - if p[2] = ord('.') then // xx.x - begin - // note: the PCardinal(p)^ + "shr and $ff" trick is actually slower - v := (p[0] * 100 + p[1] * 10 + p[3] - (ord('0') * 111)) * neg; - p := @p[6]; // also jump ending $13/$10 - end - else - begin - v := (p[0] * 10 + p[2] - (ord('0') * 11)) * neg; // x.x - p := @p[5]; - end; - // store the value - if s^.Count = 0 then - begin - s^.Min := v; // new entry - s^.Max := v; - end - else - begin - m := s^.Min; - if v < m then - m := v; // branchless cmovg/cmovl - s^.Min := m; - m := s^.Max; - if v > m then - m := v; - s^.Max := m; - end; - inc(s^.Sum, v); - inc(s^.Count); - start := p; - until p >= stop; - end; - // aggregate this thread values into the main list - fOwner.Aggregate(fStation); -end; - - var fn: TFileName; threads, chunkmb: integer; @@ -452,6 +435,7 @@ procedure TBrcThread.Execute; res: RawUtf8; start, stop: Int64; begin + assert(SizeOf(TBrcStation) = 64 div 4); // 64 = CPU L1 cache line size // read command line parameters Executable.Command.ExeDescription := 'The mORMot One Billion Row Challenge'; if Executable.Command.Arg(0, 'the data source #filename') then @@ -464,7 +448,7 @@ procedure TBrcThread.Execute; ['t', 'threads'], threads, '#number of threads to run', SystemInfo.dwNumberOfProcessors); Executable.Command.Get( - ['c', 'chunk'], chunkmb, 'size in #megabytes used for per-thread chunking', 8); + ['c', 'chunk'], chunkmb, 'size in #megabytes used for per-thread chunking', 16); help := Executable.Command.Option(['h', 'help'], 'display this help'); if Executable.Command.ConsoleWriteUnknown then exit @@ -480,15 +464,15 @@ procedure TBrcThread.Execute; chunkmb, 'MB chunks and affinity=', affinity]); QueryPerformanceMicroSeconds(start); try - main := TBrcMain.Create(fn, threads, chunkmb, {capacity=}45000, affinity); - // note: current stations count = 41343 for 484KB of data per thread + main := TBrcMain.Create(fn, threads, chunkmb, {max=}45000, affinity); + // note: current stations count = 41343 for 2.5MB of data per thread try main.WaitFor; res := main.SortedText; if verbose then ConsoleWrite(['result hash=', CardinalToHexShort(crc32cHash(res)), ', result length=', length(res), - ', stations count=', main.fCount, + ', stations count=', main.fList.Count, ', valid utf8=', IsValidUtf8(res)]) else ConsoleWrite(res); diff --git a/entries/abouchez/src/brcmormotsharedht.lpi b/entries/abouchez/src/brcmormotsharedht.lpi new file mode 100644 index 0000000..c2db320 --- /dev/null +++ b/entries/abouchez/src/brcmormotsharedht.lpi @@ -0,0 +1,136 @@ + + + + + + + + + + + + + + <UseAppBundle Value="False"/> + <ResourceType Value="res"/> + </General> + <BuildModes Count="3"> + <Item1 Name="Default" Default="True"/> + <Item2 Name="Debug"> + <CompilerOptions> + <Version Value="11"/> + <Target> + <Filename Value="../../../bin/abouchez"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <UnitOutputDirectory Value="../../../bin/lib/$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Parsing> + <SyntaxOptions> + <IncludeAssertionCode Value="True"/> + </SyntaxOptions> + </Parsing> + <CodeGeneration> + <Checks> + <IOChecks Value="True"/> + <RangeChecks Value="True"/> + <OverflowChecks Value="True"/> + <StackChecks Value="True"/> + </Checks> + <VerifyObjMethodCallValidity Value="True"/> + </CodeGeneration> + <Linking> + <Debugging> + <DebugInfoType Value="dsDwarf3"/> + <UseHeaptrc Value="True"/> + <TrashVariables Value="True"/> + <UseExternalDbgSyms Value="True"/> + </Debugging> + </Linking> + </CompilerOptions> + </Item2> + <Item3 Name="Release"> + <CompilerOptions> + <Version Value="11"/> + <Target> + <Filename Value="../../../bin/abouchez"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <UnitOutputDirectory Value="../../../bin/lib/$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <CodeGeneration> + <SmartLinkUnit Value="True"/> + <TargetProcessor Value="COREAVX2"/> + <Optimizations> + <OptimizationLevel Value="3"/> + </Optimizations> + </CodeGeneration> + <Linking> + <Debugging> + <GenerateDebugInfo Value="False"/> + </Debugging> + <LinkSmart Value="True"/> + </Linking> + </CompilerOptions> + </Item3> + </BuildModes> + <PublishOptions> + <Version Value="2"/> + <UseFileFilters Value="True"/> + </PublishOptions> + <RunParams> + <FormatVersion Value="2"/> + </RunParams> + <RequiredPackages Count="1"> + <Item1> + <PackageName Value="mormot2"/> + </Item1> + </RequiredPackages> + <Units Count="1"> + <Unit0> + <Filename Value="brcmormotsharedht.lpr"/> + <IsPartOfProject Value="True"/> + </Unit0> + </Units> + </ProjectOptions> + <CompilerOptions> + <Version Value="11"/> + <Target> + <Filename Value="../../../bin/abouchez"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <UnitOutputDirectory Value="../../../bin/lib/$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Parsing> + <SyntaxOptions> + <IncludeAssertionCode Value="True"/> + </SyntaxOptions> + </Parsing> + <CodeGeneration> + <Optimizations> + <OptimizationLevel Value="3"/> + </Optimizations> + </CodeGeneration> + <Linking> + <Debugging> + <DebugInfoType Value="dsDwarf3"/> + </Debugging> + </Linking> + </CompilerOptions> + <Debugging> + <Exceptions Count="3"> + <Item1> + <Name Value="EAbort"/> + </Item1> + <Item2> + <Name Value="ECodetoolError"/> + </Item2> + <Item3> + <Name Value="EFOpenError"/> + </Item3> + </Exceptions> + </Debugging> +</CONFIG> diff --git a/entries/abouchez/src/brcmormotsharedht.lpr b/entries/abouchez/src/brcmormotsharedht.lpr new file mode 100644 index 0000000..c41ba1f --- /dev/null +++ b/entries/abouchez/src/brcmormotsharedht.lpr @@ -0,0 +1,511 @@ +/// MIT code (c) Arnaud Bouchez, using the mORMot 2 framework +program brcmormot; + +{.$define NOPERFECTHASH} +// you can define this conditional to force name comparison (slower) + +{$I mormot.defines.inc} + +{$ifdef OSWINDOWS} + {$apptype console} +{$endif OSWINDOWS} + +uses + {$ifdef UNIX} + cthreads, + {$endif UNIX} + classes, + sysutils, + mormot.core.base, + mormot.core.os, + mormot.core.unicode, + mormot.core.text, + mormot.core.data; + +type + // a weather station info, using 12 bytes + TBrcStation = packed record + Sum, Count: integer; // we ensured no overflow occurs with 32-bit range + Min, Max: SmallInt; // 16-bit (-32767..+32768) temperatures * 10 + end; + PBrcStation = ^TBrcStation; + TBrcStations = array of TBrcStation; + + TBrcMain = class + protected + fSafe: TOSLightLock; // TLightLock seems to make no difference + fHash: array of word; // store 0 if void, or fStation[] index + 1 + fNameHash: array of cardinal; // crc32c perfect hash of the name + fNameText: array of PUtf8Char; // directly point to the input memmaped file + fStation: TBrcStations; // aggregated storage + fCount: PtrInt; // = length(fNameHash/fNameText/fStation) + fEvent: TSynEvent; // to wake up the main thread when finished + fRunning, fCapacity: integer; + fCurrentChunk: PByteArray; + fCurrentRemain, fChunkSize: PtrUInt; + fMem: TMemoryMap; + function GetChunk(out start, stop: PByteArray): boolean; + function Search(name: pointer; namelen: PtrUInt): PtrInt; + procedure Aggregate(const another: TBrcStations); + public + constructor Create(const datafile: TFileName; + threads, chunkmb, capacity: integer; affinity: boolean); + destructor Destroy; override; + procedure WaitFor; + function SortedText: RawUtf8; + end; + + TBrcThread = class(TThread) + protected + fOwner: TBrcMain; + fStation: TBrcStations; // per-thread storage + procedure Execute; override; + public + constructor Create(owner: TBrcMain); + end; + + +{$ifdef FPC_CPUX64_disabled_slower} +function NameLen(p: PUtf8Char): PtrInt; assembler; nostackframe; +asm + lea rdx, qword ptr [p + 2] + movaps xmm0, oword ptr [rip + @chr] + movups xmm1, oword ptr [rdx] // check first 16 bytes + pcmpeqb xmm1, xmm0 + pmovmskb eax, xmm1 + bsf eax, eax + jnz @found +@by16: add rdx, 16 + movups xmm1, oword ptr [rdx] // next 16 bytes + pcmpeqb xmm1, xmm0 + pmovmskb eax, xmm1 + bsf eax, eax + jz @by16 +@found: add rax, rdx // point to exact match + sub rax, p // return position + ret + align 16 +@chr: dq $3b3b3b3b3b3b3b3b // xmm0 of ';' + dq $3b3b3b3b3b3b3b3b +end; +{$else} +function NameLen(p: PUtf8Char): PtrInt; inline; +begin + result := 2; + while true do + if p[result] <> ';' then + if p[result + 1] <> ';' then + if p[result + 2] <> ';' then + if p[result + 3] <> ';' then + if p[result + 4] <> ';' then + if p[result + 5] <> ';' then + inc(result, 6) + else + exit(result + 5) + else + exit(result + 4) + else + exit(result + 3) + else + exit(result + 2) + else + exit(result + 1) + else + exit; + // this small (unrolled) inlined loop is faster than a SSE2 function :) +end; +{$endif FPC_CPUX64} + + +{ TBrcMain } + +const + HASHSIZE = 1 shl 19; // slightly oversized to avoid most collisions + +constructor TBrcMain.Create(const datafile: TFileName; + threads, chunkmb, capacity: integer; affinity: boolean); +var + i, cores, core: integer; + one: TBrcThread; +begin + // init thread-safety markers + fSafe.Init; + fEvent := TSynEvent.Create; + // map the file into memory and prepare memory chunks + if not fMem.Map(datafile) then + raise ESynException.CreateUtf8('Impossible to find %', [datafile]); + fChunkSize := chunkmb shl 20; + fCurrentChunk := pointer(fMem.Buffer); + fCurrentRemain := fMem.Size; + // initialize the stations name hash table + fCapacity := capacity; + SetLength(fHash, HASHSIZE); + SetLength(fNameHash, capacity); + SetLength(fNameText, capacity); + // launch the threads with optional thread affinity + core := 0; + cores := SystemInfo.dwNumberOfProcessors; + for i := 0 to threads - 1 do + begin + one := TBrcThread.Create(self); + if not affinity then + continue; + SetThreadCpuAffinity(one, core); + inc(core, 2); + if core >= cores then + dec(core, cores - 1); // e.g. 0,2,1,3,0,2.. with 4 cpus + end; +end; + +destructor TBrcMain.Destroy; +begin + inherited Destroy; + fMem.UnMap; + fEvent.Free; + fSafe.Done; +end; + +function TBrcMain.Search(name: pointer; namelen: PtrUInt): PtrInt; +var + h32: cardinal; + h: PtrUInt; +begin + h32 := crc32c(0, name, namelen); + h := h32; + repeat + repeat + h := h and (HASHSIZE - 1); + result := fHash[h]; + if result = 0 then + break; // void slot + dec(result); + if fNameHash[result] = h32 then + {$ifdef NOPERFECTHASH} + if MemCmp(pointer(fNameText[result]), name, namelen + 1) = 0 then + {$endif NOPERFECTHASH} + exit; // found this perfect hash = found this name + inc(h); // hash modulo collision: linear probing + until false; + // void slot: try to add now + fSafe.Lock; + if fHash[h] <> 0 then + result := -1 // race condition + else + begin + result := fCount; + assert(result < fCapacity); + inc(fCount); + fNameHash[result] := h32; + fNameText[result] := name; + fHash[h] := fCount; // should be last + end; + fSafe.UnLock; + until result >= 0; +end; + +function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; +var + chunk: PtrUInt; +begin + result := false; + fSafe.Lock; + chunk := fCurrentRemain; + if chunk <> 0 then + begin + start := fCurrentChunk; + if chunk > fChunkSize then + begin + stop := pointer(GotoNextLine(pointer(@start[fChunkSize]))); + chunk := PAnsiChar(stop) - PAnsiChar(start); + end + else + begin + stop := @start[chunk]; + while PAnsiChar(stop)[-1] <= ' ' do + dec(PByte(stop)); // ensure final stop at meaningful char + end; + dec(fCurrentRemain, chunk); + fCurrentChunk := @fCurrentChunk[chunk]; + result := true; + end; + fSafe.UnLock; +end; + +procedure TBrcMain.Aggregate(const another: TBrcStations); +var + n: integer; + s, d: PBrcStation; +begin + fSafe.Lock; // several TBrcThread may finish at the same time + if fStation = nil then + fStation := another + else + begin + n := fCount; + s := pointer(another); + d := pointer(fStation); + repeat + if s^.Count <> 0 then + begin + inc(d^.Count, s^.Count); + inc(d^.Sum, s^.Sum); + if s^.Max > d^.Max then + d^.Max := s^.Max; + if s^.Min < d^.Min then + d^.Min := s^.Min; + end; + inc(s); + inc(d); + dec(n); + until n = 0; + end; + fSafe.UnLock; + if InterlockedDecrement(fRunning) = 0 then + fEvent.SetEvent; // all threads finished: release main console thread +end; + +procedure TBrcMain.WaitFor; +begin + fEvent.WaitForEver; +end; + +procedure AddTemp(w: TTextWriter; sep: AnsiChar; val: PtrInt); +var + d10: PtrInt; +begin + w.Add(sep); + if val < 0 then + begin + w.Add('-'); + val := -val; + end; + d10 := val div 10; // val as temperature * 10 + w.AddString(SmallUInt32Utf8[d10]); // in 0..999 range + w.Add('.'); + w.Add(AnsiChar(val - d10 * 10 + ord('0'))); +end; + +function ByStationName(const A, B): integer; // = StrComp() but ending with ';' +var + pa, pb: PByte; + c: byte; +begin + result := 0; + pa := pointer(A); + pb := pointer(B); + dec(pa, {%H-}PtrUInt(pb)); + if pa = nil then + exit; + repeat + c := PByteArray(pa)[{%H-}PtrUInt(pb)]; + if c <> pb^ then + break + else if c = ord(';') then + exit; // Str1 = Str2 + inc(pb); + until false; + if (c = ord(';')) or + ((pb^ <> ord(';')) and + (c < pb^)) then + result := -1 + else + result := 1; +end; + +function ceil(x: double): PtrInt; // "official" rounding method +begin + result := trunc(x) + ord(frac(x) > 0); // using FPU is fast enough here +end; + +function TBrcMain.SortedText: RawUtf8; +var + c: PtrInt; + n: PCardinal; + s: PBrcStation; + p: PUtf8Char; + st: TRawByteStringStream; + w: TTextWriter; + ndx: TSynTempBuffer; + tmp: TTextWriterStackBuffer; +begin + // compute the sorted-by-name indexes of all stations + c := fCount; + assert(c <> 0); + DynArraySortIndexed( + pointer(fNameText), SizeOf(PUtf8Char), c, ndx, ByStationName); + try + // generate output + FastSetString(result, nil, 1200000); // pre-allocate result + st := TRawByteStringStream.Create(result); + try + w := TTextWriter.Create(st, @tmp, SizeOf(tmp)); + try + w.Add('{'); + n := ndx.buf; + repeat + s := @fStation[n^]; + assert(s^.Count <> 0); + p := fNameText[n^]; + w.AddNoJsonEscape(p, NameLen(p)); + AddTemp(w, '=', s^.Min); + AddTemp(w, '/', ceil(s^.Sum / s^.Count)); // average + AddTemp(w, '/', s^.Max); + dec(c); + if c = 0 then + break; + w.Add(',', ' '); + inc(n); + until false; + w.Add('}'); + w.FlushFinal; + FakeLength(result, w.WrittenBytes); + finally + w.Free; + end; + finally + st.Free; + end; + finally + ndx.Done; + end; +end; + + +{ TBrcThread } + +constructor TBrcThread.Create(owner: TBrcMain); +begin + fOwner := owner; + SetLength(fStation, fOwner.fCapacity); + InterlockedIncrement(fOwner.fRunning); + FreeOnTerminate := true; + inherited Create({suspended=}false); +end; + +procedure TBrcThread.Execute; +var + p, start, stop: PByteArray; + v, m: integer; + l, neg: PtrInt; + s: PBrcStation; +begin + while fOwner.GetChunk(start, stop) do + begin + // parse this thread chunk + p := start; + repeat + // parse the name; and find the corresponding station data + l := NameLen(pointer(p)); + p := @p[l + 1]; // + 1 to ignore ; + s := @fStation[fOwner.Search(start, l)]; + // parse the temperature (as -12.3 -3.4 5.6 78.9 patterns) into value * 10 + if p[0] = ord('-') then + begin + neg := -1; + p := @p[1]; + end + else + neg := 1; + if p[2] = ord('.') then // xx.x + begin + // note: the PCardinal(p)^ + "shr and $ff" trick is actually slower + v := (p[0] * 100 + p[1] * 10 + p[3] - (ord('0') * 111)) * neg; + p := @p[6]; // also jump ending $13/$10 + end + else + begin + v := (p[0] * 10 + p[2] - (ord('0') * 11)) * neg; // x.x + p := @p[5]; + end; + // store the value + if s^.Count = 0 then + begin + s^.Min := v; // new entry + s^.Max := v; + end + else + begin + m := s^.Min; + if v < m then + m := v; // branchless cmovg/cmovl + s^.Min := m; + m := s^.Max; + if v > m then + m := v; + s^.Max := m; + end; + inc(s^.Sum, v); + inc(s^.Count); + start := p; + until p >= stop; + end; + // aggregate this thread values into the main list + fOwner.Aggregate(fStation); +end; + + +var + fn: TFileName; + threads, chunkmb: integer; + verbose, affinity, help: boolean; + main: TBrcMain; + res: RawUtf8; + start, stop: Int64; +begin + // read command line parameters + Executable.Command.ExeDescription := 'The mORMot One Billion Row Challenge'; + if Executable.Command.Arg(0, 'the data source #filename') then + Utf8ToFileName(Executable.Command.Args[0], fn{%H-}); + verbose := Executable.Command.Option( + ['v', 'verbose'], 'generate verbose output with timing'); + affinity := Executable.Command.Option( + ['a', 'affinity'], 'force thread affinity to a single CPU core'); + Executable.Command.Get( + ['t', 'threads'], threads, '#number of threads to run', + SystemInfo.dwNumberOfProcessors); + Executable.Command.Get( + ['c', 'chunk'], chunkmb, 'size in #megabytes used for per-thread chunking', 8); + help := Executable.Command.Option(['h', 'help'], 'display this help'); + if Executable.Command.ConsoleWriteUnknown then + exit + else if help or + (fn = '') then + begin + ConsoleWrite(Executable.Command.FullDescription); + exit; + end; + // actual process + if verbose then + ConsoleWrite(['Processing ', fn, ' with ', threads, ' threads, ', + chunkmb, 'MB chunks and affinity=', affinity]); + QueryPerformanceMicroSeconds(start); + try + main := TBrcMain.Create(fn, threads, chunkmb, {capacity=}45000, affinity); + // note: current stations count = 41343 for 484KB of data per thread + try + main.WaitFor; + res := main.SortedText; + if verbose then + ConsoleWrite(['result hash=', CardinalToHexShort(crc32cHash(res)), + ', result length=', length(res), + ', stations count=', main.fCount, + ', valid utf8=', IsValidUtf8(res)]) + else + ConsoleWrite(res); + finally + main.Free; + end; + except + on E: Exception do + ConsoleShowFatalException(E); + end; + // optional timing output + if verbose then + begin + QueryPerformanceMicroSeconds(stop); + dec(stop, start); + ConsoleWrite(['done in ', MicroSecToString(stop), ' ', + KB((FileSize(fn) * 1000000) div stop), '/s']); + end; +end. +