diff --git a/entries/abouchez/README.md b/entries/abouchez/README.md index 7541028..e5b4bdb 100644 --- a/entries/abouchez/README.md +++ b/entries/abouchez/README.md @@ -51,15 +51,16 @@ Here are the main ideas behind this implementation proposal: - **mORMot** makes cross-platform and cross-compiler support simple - e.g. `TMemMap`, `TDynArray`,`TTextWriter`, `SetThreadCpuAffinity`, `crc32c`, `ConsoleWrite` or command-line parsing; - The entire 16GB file is `memmap`ed at once into memory - it won't work on 32-bit OS, but avoid any `read` syscall or memory copy; - File is processed in parallel using several threads - configurable via the `-t=` switch, default being the total number of CPUs reported by the OS; -- Input is fed into each thread as 4MB chunks (see also the `-c` command line switch): because thread scheduling is unbalanced, it is inefficient to pre-divide the size of the whole input file into the number of threads; -- Each thread manages its own `Station[]` data, so there is no lock until the thread is finished and data is consolidated; -- Each `Station[]` information is packed into a record of exactly 16 bytes, with no external pointer/string, to leverage the CPU L1 cache size (64 bytes) for efficiency; -- A O(1) hash table is maintained for the name lookup, with crc32c perfect hash function - no name comparison nor storage is needed with a perfect hash (see below); +- Input is fed into each thread as 8MB chunks (see also the `-c` command line switch): because thread scheduling is unbalanced, it is inefficient to pre-divide the size of the whole input file into the number of threads; +- Each thread manages its own `fStation[]` data, so there is no lock until the thread is finished and data is consolidated; +- Each `fStation[]` information is packed into a record of 12 bytes, with no external pointer/string, to leverage the CPU L1 cache size (64 bytes) for efficiency; +- A shared O(1) hash table is maintained for the name lookup, with crc32c perfect hash function - no name comparison nor storage is needed with a perfect hash (see below); - On Intel/AMD/AARCH64 CPUs, *mORMot* offers hardware SSE4.2 opcodes for this crc32c computation; -- The hash table does not directly store the `Station[]` data, but use a separated `StationHash[]` lookup array of 16-bit indexes (as our `TDynArray` does) to leverage the CPU caches; +- The hash table does not directly store the `fStation[]` data, but use a shared `fHash[]` lookup array of 16-bit indexes (as our `TDynArray` does) to leverage the CPU caches; +- This name hash table is shared between threads to favor CPU cache locality, and contention only occurs at startup, the first time a station name is encountered; - Values are stored as 16-bit or 32-bit integers, as temperature multiplied by 10; - Temperatures are parsed with a dedicated code (expects single decimal input values); -- The station names are stored as UTF-8 pointers to the memmap location where they appear first, in `StationName[]`, to be emitted eventually for the final output, not during temperature parsing; +- The station names are stored as UTF-8 pointers to the `memmap`ed location where they appear first, in `fNameText[]`, to be emitted eventually for the final output, not during temperature parsing; - No memory allocation (e.g. no transient `string` or `TBytes`) nor any syscall is done during the parsing process to reduce contention and ensure the process is only CPU-bound and RAM-bound (we checked this with `strace` on Linux); - Pascal code was tuned to generate the best possible asm output on FPC x86_64 (which is our target) - perhaps making it less readable, because we used pointer arithmetics when it matters (I like to think as such low-level pascal code as [portable assembly](https://sqlite.org/whyc.html#performance) similar to "unsafe" code in managed languages); - We even tried an optimized SSE2 asm sub-function for searching the name `';'` delimiter - which is a O(n) part of the process, and in practice... it was slower than a slightly unrolled pure pascal inlined loop; @@ -76,13 +77,13 @@ The L1 cache is well known in the performance hacking litterature to be the main Count and Sum values can fit in 32-bit `integer` fields (with a range of about 2 billions signed values), whereas min/max values have been reduced as 16-bit `smallint` - resulting in temperature range of -3276.7..+3276.8 celsius grads. It seems fair on our galaxy according to the IPCC. ;) -As a result, each `Station[]` entry takes only 16 bytes, so we can fit exactly 4 entries in a single CPU L1 cache line. To be accurate, if we put some more data into the record (e.g. use `Int64` instead of `smallint`/`integer`), the performance degrades only for a few percents. The main fact seems to be that the entry is likely to fit into a single cache line, even if filling two cache lines may be sometimes needed for misaligned data. +As a result, each `fStation[]` entry takes only 12 bytes, so we can fit exactly several entries in a single CPU L1 cache line. To be accurate, if we put some more data into the record (e.g. use `Int64` instead of `smallint`/`integer`), the performance degrades only for a few percents. The main fact seems to be that the entry is likely to fit into a single cache line, even if filling two cache lines may be sometimes needed for misaligned data. -In our first attempt (see "Old Version" below), we stored the name into the `Station[]` array, so that each entry is 64 bytes long exactly. But since `crc32c` is a perfect hash function for our dataset, it is enough to just store the 32-bit hash instead, and not the actual name. Less data would mean less cache size involved. +In our first attempt (see "Old Version" below), we stored the name into the `Station[]` array, so that each entry is 64 bytes long exactly. But since `crc32c` is a perfect hash function for our dataset, it is enough to just store the 32-bit hash instead in a shared `fNameHash[]` array, and not the actual name. Less data would mean less cache size involved. -We tried to remove the `StationHash[]` array of `word` lookup table. It made one data read less, but performed almost three times slower. Data locality and cache pollution prevails on absolute number of memory reads. It is faster to access twice the memory, if this memory could remain in the CPU caches. Only profiling and timing would show this. The shortest code is not the fastest with modern CPUs. +We tried to remove the `fHash[]` array of `word` lookup table. It made one data read less, but performed almost three times slower. Data locality and cache pollution prevails on absolute number of memory reads. It is faster to access twice the memory, if this memory could remain in the CPU caches. Only profiling and timing would show this. The shortest code is not the fastest with modern CPUs. -Note that if we reduce the number of stations from 41343 to 400 (as other languages 1brc projects do), the performance is much higher, also with a 16GB file as input. My guess is that since 400x16 = 6400, each dataset could fit entirely in each core L1 cache. No slower L2/L3 cache is involved, therefore performance is better. +Note that if we reduce the number of stations from 41343 to 400 (as other languages 1brc projects do), the performance is higher, also with a 16GB file as input. My guess is that since 400x12 = 4800, each dataset could fit entirely in each core L1 cache. No slower L2/L3 cache is involved, therefore performance is better. Once again, some reference material is available at https://en.algorithmica.org/hpc/cpu-cache/ including some mind-blowing experiment [about cache associativity](https://en.algorithmica.org/hpc/cpu-cache/associativity/). I told you CPUs were complex! :D @@ -110,14 +111,14 @@ Options: Params: -t, --threads (default 20) number of threads to run - -c, --chunk (default 4) + -c, --chunk (default 8) size in megabytes used for per-thread chunking ``` We will use these command-line switches for local (dev PC), and benchmark (challenge HW) analysis. ## Local Analysis -On my PC, it takes less than 3 seconds to process the 16GB file with 8/10 threads. +On my PC, it takes less than 2 seconds to process the 16GB file. Numbers below were with the initial version of our code. Current trunk is faster. But the analysis is still accurate. Let's compare `abouchez` with a solid multi-threaded entry using file buffer reads and no memory map (like `sbalazs`), using the `time` command on Linux: @@ -138,7 +139,7 @@ We defined 20 threads for both executables, because our PC CPU has 20 threads in Apart from the obvious global "wall" time reduction (`real` numbers), the raw parsing and data gathering in the threads match the number of threads and the running time (`user` numbers), and no syscall is involved by `abouchez` thanks to the memory mapping of the whole file (`sys` numbers, which contain only memory page faults, is much lower). -The `memmap()` feature makes the initial/cold `abouchez` call slower, because it needs to cache all measurements data from file into RAM (I have 32GB of RAM, so the whole data file will remain in memory, as on the benchmark hardware): +The `memmap()` feature makes the initial/cold `abouchez` call slower, because it needs to cache all measurements data from file into RAM (I have 64GB of RAM, so the whole data file will remain in memory, as on the benchmark hardware): ``` ab@dev:~/dev/github/1brc-ObjectPascal/bin$ time ./abouchez measurements.txt -t=20 >resmormot.txt @@ -164,6 +165,15 @@ Affinity may help on Ryzen 9, because its Zen 3 architecture is made of identica The `-v` verbose mode makes such testing easy. The `hash` value can quickly check that the generated output is correct, and that it is valid `utf8` content (as expected). +To be accurate, here is the current state of our code on my local machine: +``` +ab@dev:~/dev/github/1brc-ObjectPascal/bin$ ./abouchez data.csv -v +Processing data.csv with 20 threads, 8MB chunks and affinity=0 +result hash=85614446, result length=1139413, stations count=41343, valid utf8=1 +done in 1.82s 8.6 GB/s +``` +So we were able to make some progress between iterations - especially by sharing the hash table between threads. + ## Benchmark Integration Every system is quite unique, especially about its CPU multi-thread abilities. For instance, my Intel Core i5 has both P-cores and E-cores so its threading model is pretty unbalanced. The Zen architecture should be more balanced. @@ -232,7 +242,7 @@ The Ryzen CPU has 16 cores with 32 threads, and it makes sense that each thread In the version same `src` sub-folder, you will find our first attempt of this challenge, as `brcmormotold.lpr`. In respect to the "final/new" version, it did store the name as a "shortstring" within its `Station[]` record, to fill exactly the 64-byte cache line size. -It was already very fast, but since `crc32c` is a perfect hash function, we finally decided to just stored the 32-bit hash, and not the name itself. +It was already very fast, but since `crc32c` is a perfect hash function, we finally decided to just stored the 32-bit hash, and not the name itself. Lately, we even shared the name hash table process for all threads. You could disable our tuned asm in the project source code, and loose about 10% by using general purpose *mORMot* `crc32c()` and `CompareMem()` functions, which already runs SSE2/SSE4.2 tune assembly. No custom asm is needed on the "new" version: we directly use the *mORMot* functions. diff --git a/entries/abouchez/src/brcmormot.lpr b/entries/abouchez/src/brcmormot.lpr index a7c4437..c41ba1f 100644 --- a/entries/abouchez/src/brcmormot.lpr +++ b/entries/abouchez/src/brcmormot.lpr @@ -2,7 +2,7 @@ program brcmormot; {.$define NOPERFECTHASH} -// you can define this conditional to force name comparison (2.5x slower) +// you can define this conditional to force name comparison (slower) {$I mormot.defines.inc} @@ -23,38 +23,33 @@ mormot.core.data; type - // a weather station info, using 1/4rd of a CPU L1 cache line (64/4=16 bytes) + // a weather station info, using 12 bytes TBrcStation = packed record - NameHash: cardinal; // crc32c perfect hash of the name Sum, Count: integer; // we ensured no overflow occurs with 32-bit range Min, Max: SmallInt; // 16-bit (-32767..+32768) temperatures * 10 end; PBrcStation = ^TBrcStation; - - TBrcList = record - public - StationHash: array of word; // store 0 if void, or Station[] index + 1 - Station: array of TBrcStation; - StationName: array of PUtf8Char; // directly point to input memmap file - Count: PtrInt; - procedure Init(max: integer); - function Search(name: pointer; namelen: PtrInt): PBrcStation; - end; + TBrcStations = array of TBrcStation; TBrcMain = class protected - fSafe: TLightLock; - fEvent: TSynEvent; - fRunning, fMax: integer; + fSafe: TOSLightLock; // TLightLock seems to make no difference + fHash: array of word; // store 0 if void, or fStation[] index + 1 + fNameHash: array of cardinal; // crc32c perfect hash of the name + fNameText: array of PUtf8Char; // directly point to the input memmaped file + fStation: TBrcStations; // aggregated storage + fCount: PtrInt; // = length(fNameHash/fNameText/fStation) + fEvent: TSynEvent; // to wake up the main thread when finished + fRunning, fCapacity: integer; fCurrentChunk: PByteArray; fCurrentRemain, fChunkSize: PtrUInt; - fList: TBrcList; fMem: TMemoryMap; - procedure Aggregate(const another: TBrcList); function GetChunk(out start, stop: PByteArray): boolean; + function Search(name: pointer; namelen: PtrUInt): PtrInt; + procedure Aggregate(const another: TBrcStations); public - constructor Create(const fn: TFileName; threads, chunkmb, max: integer; - affinity: boolean); + constructor Create(const datafile: TFileName; + threads, chunkmb, capacity: integer; affinity: boolean); destructor Destroy; override; procedure WaitFor; function SortedText: RawUtf8; @@ -63,68 +58,13 @@ TBrcMain = class TBrcThread = class(TThread) protected fOwner: TBrcMain; - fList: TBrcList; // each thread work on its own list + fStation: TBrcStations; // per-thread storage procedure Execute; override; public constructor Create(owner: TBrcMain); end; -{ TBrcList } - -const - HASHSIZE = 1 shl 18; // slightly oversized to avoid most collisions - -procedure TBrcList.Init(max: integer); -begin - assert(max <= high(StationHash[0])); - SetLength(Station, max); - SetLength(StationHash, HASHSIZE); - SetLength(StationName, max); -end; - -function TBrcList.Search(name: pointer; namelen: PtrInt): PBrcStation; -var - h32: cardinal; - h, x: PtrUInt; -begin - h32 := crc32c(0, name, namelen); - h := h32; - repeat - h := h and (HASHSIZE - 1); - x := StationHash[h]; - if x = 0 then - break; // void slot - result := @Station[x - 1]; - if result^.NameHash = h32 then - {$ifdef NOPERFECTHASH} - if MemCmp(pointer(StationName[x - 1]), name, namelen + 1) = 0 then - {$endif NOPERFECTHASH} - exit; // found this perfect hash = found this name - inc(h); // hash modulo collision: linear probing - until false; - assert(Count < length(Station)); - StationName[Count] := name; - result := @Station[Count]; - inc(Count); - StationHash[h] := Count; - result^.NameHash := h32; - result^.Min := high(result^.Min); - result^.Max := low(result^.Max); -end; - - -{ TBrcThread } - -constructor TBrcThread.Create(owner: TBrcMain); -begin - fOwner := owner; - FreeOnTerminate := true; - fList.Init(fOwner.fMax); - InterlockedIncrement(fOwner.fRunning); - inherited Create({suspended=}false); -end; - {$ifdef FPC_CPUX64_disabled_slower} function NameLen(p: PUtf8Char): PtrInt; assembler; nostackframe; asm @@ -176,75 +116,33 @@ function NameLen(p: PUtf8Char): PtrInt; inline; end; {$endif FPC_CPUX64} -procedure TBrcThread.Execute; -var - p, start, stop: PByteArray; - v, m: integer; - l, neg: PtrInt; - s: PBrcStation; -begin - while fOwner.GetChunk(start, stop) do - begin - // parse this thread chunk - p := start; - repeat - // parse the name; - l := NameLen(pointer(p)); - p := @p[l + 1]; // + 1 to ignore ; - // parse the temperature (as -12.3 -3.4 5.6 78.9 patterns) into value * 10 - if p[0] = ord('-') then - begin - neg := -1; - p := @p[1]; - end - else - neg := 1; - if p[2] = ord('.') then // xx.x - begin - // note: the PCardinal(p)^ + "shr and $ff" trick is actually slower - v := (p[0] * 100 + p[1] * 10 + p[3] - (ord('0') * 111)) * neg; - p := @p[6]; // also jump ending $13/$10 - end - else - begin - v := (p[0] * 10 + p[2] - (ord('0') * 11)) * neg; // x.x - p := @p[5]; - end; - // store the value - s := fList.Search(start, l); - inc(s^.Sum, v); - inc(s^.Count); - m := s^.Min; - if v < m then - m := v; // branchless cmovg/cmovl - s^.Min := m; - m := s^.Max; - if v > m then - m := v; - s^.Max := m; - start := p; - until p >= stop; - end; - // aggregate this thread values into the main list - fOwner.Aggregate(fList); -end; - { TBrcMain } -constructor TBrcMain.Create(const fn: TFileName; threads, chunkmb, max: integer; - affinity: boolean); +const + HASHSIZE = 1 shl 19; // slightly oversized to avoid most collisions + +constructor TBrcMain.Create(const datafile: TFileName; + threads, chunkmb, capacity: integer; affinity: boolean); var i, cores, core: integer; one: TBrcThread; begin + // init thread-safety markers + fSafe.Init; fEvent := TSynEvent.Create; - if not fMem.Map(fn) then - raise ESynException.CreateUtf8('Impossible to find %', [fn]); - fMax := max; + // map the file into memory and prepare memory chunks + if not fMem.Map(datafile) then + raise ESynException.CreateUtf8('Impossible to find %', [datafile]); fChunkSize := chunkmb shl 20; fCurrentChunk := pointer(fMem.Buffer); fCurrentRemain := fMem.Size; + // initialize the stations name hash table + fCapacity := capacity; + SetLength(fHash, HASHSIZE); + SetLength(fNameHash, capacity); + SetLength(fNameText, capacity); + // launch the threads with optional thread affinity core := 0; cores := SystemInfo.dwNumberOfProcessors; for i := 0 to threads - 1 do @@ -264,6 +162,45 @@ destructor TBrcMain.Destroy; inherited Destroy; fMem.UnMap; fEvent.Free; + fSafe.Done; +end; + +function TBrcMain.Search(name: pointer; namelen: PtrUInt): PtrInt; +var + h32: cardinal; + h: PtrUInt; +begin + h32 := crc32c(0, name, namelen); + h := h32; + repeat + repeat + h := h and (HASHSIZE - 1); + result := fHash[h]; + if result = 0 then + break; // void slot + dec(result); + if fNameHash[result] = h32 then + {$ifdef NOPERFECTHASH} + if MemCmp(pointer(fNameText[result]), name, namelen + 1) = 0 then + {$endif NOPERFECTHASH} + exit; // found this perfect hash = found this name + inc(h); // hash modulo collision: linear probing + until false; + // void slot: try to add now + fSafe.Lock; + if fHash[h] <> 0 then + result := -1 // race condition + else + begin + result := fCount; + assert(result < fCapacity); + inc(fCount); + fNameHash[result] := h32; + fNameText[result] := name; + fHash[h] := fCount; // should be last + end; + fSafe.UnLock; + until result >= 0; end; function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; @@ -294,30 +231,31 @@ function TBrcMain.GetChunk(out start, stop: PByteArray): boolean; fSafe.UnLock; end; -procedure TBrcMain.Aggregate(const another: TBrcList); +procedure TBrcMain.Aggregate(const another: TBrcStations); var n: integer; s, d: PBrcStation; - p: PPUtf8Char; begin fSafe.Lock; // several TBrcThread may finish at the same time - if fList.Count = 0 then - fList := another + if fStation = nil then + fStation := another else begin - n := another.Count; - s := pointer(another.Station); - p := pointer(another.StationName); + n := fCount; + s := pointer(another); + d := pointer(fStation); repeat - d := fList.Search(p^, NameLen(p^)); - inc(d^.Count, s^.Count); - inc(d^.Sum, s^.Sum); - if s^.Max > d^.Max then - d^.Max := s^.Max; - if s^.Min < d^.Min then - d^.Min := s^.Min; + if s^.Count <> 0 then + begin + inc(d^.Count, s^.Count); + inc(d^.Sum, s^.Sum); + if s^.Max > d^.Max then + d^.Max := s^.Max; + if s^.Min < d^.Min then + d^.Min := s^.Min; + end; inc(s); - inc(p); + inc(d); dec(n); until n = 0; end; @@ -391,10 +329,10 @@ function TBrcMain.SortedText: RawUtf8; tmp: TTextWriterStackBuffer; begin // compute the sorted-by-name indexes of all stations - c := fList.Count; + c := fCount; assert(c <> 0); DynArraySortIndexed( - pointer(fList.StationName), SizeOf(PUtf8Char), c, ndx, ByStationName); + pointer(fNameText), SizeOf(PUtf8Char), c, ndx, ByStationName); try // generate output FastSetString(result, nil, 1200000); // pre-allocate result @@ -405,9 +343,9 @@ function TBrcMain.SortedText: RawUtf8; w.Add('{'); n := ndx.buf; repeat - s := @fList.Station[n^]; + s := @fStation[n^]; assert(s^.Count <> 0); - p := fList.StationName[n^]; + p := fNameText[n^]; w.AddNoJsonEscape(p, NameLen(p)); AddTemp(w, '=', s^.Min); AddTemp(w, '/', ceil(s^.Sum / s^.Count)); // average @@ -432,6 +370,80 @@ function TBrcMain.SortedText: RawUtf8; end; end; + +{ TBrcThread } + +constructor TBrcThread.Create(owner: TBrcMain); +begin + fOwner := owner; + SetLength(fStation, fOwner.fCapacity); + InterlockedIncrement(fOwner.fRunning); + FreeOnTerminate := true; + inherited Create({suspended=}false); +end; + +procedure TBrcThread.Execute; +var + p, start, stop: PByteArray; + v, m: integer; + l, neg: PtrInt; + s: PBrcStation; +begin + while fOwner.GetChunk(start, stop) do + begin + // parse this thread chunk + p := start; + repeat + // parse the name; and find the corresponding station data + l := NameLen(pointer(p)); + p := @p[l + 1]; // + 1 to ignore ; + s := @fStation[fOwner.Search(start, l)]; + // parse the temperature (as -12.3 -3.4 5.6 78.9 patterns) into value * 10 + if p[0] = ord('-') then + begin + neg := -1; + p := @p[1]; + end + else + neg := 1; + if p[2] = ord('.') then // xx.x + begin + // note: the PCardinal(p)^ + "shr and $ff" trick is actually slower + v := (p[0] * 100 + p[1] * 10 + p[3] - (ord('0') * 111)) * neg; + p := @p[6]; // also jump ending $13/$10 + end + else + begin + v := (p[0] * 10 + p[2] - (ord('0') * 11)) * neg; // x.x + p := @p[5]; + end; + // store the value + if s^.Count = 0 then + begin + s^.Min := v; // new entry + s^.Max := v; + end + else + begin + m := s^.Min; + if v < m then + m := v; // branchless cmovg/cmovl + s^.Min := m; + m := s^.Max; + if v > m then + m := v; + s^.Max := m; + end; + inc(s^.Sum, v); + inc(s^.Count); + start := p; + until p >= stop; + end; + // aggregate this thread values into the main list + fOwner.Aggregate(fStation); +end; + + var fn: TFileName; threads, chunkmb: integer; @@ -440,7 +452,6 @@ function TBrcMain.SortedText: RawUtf8; res: RawUtf8; start, stop: Int64; begin - assert(SizeOf(TBrcStation) = 64 div 4); // 64 = CPU L1 cache line size // read command line parameters Executable.Command.ExeDescription := 'The mORMot One Billion Row Challenge'; if Executable.Command.Arg(0, 'the data source #filename') then @@ -453,7 +464,7 @@ function TBrcMain.SortedText: RawUtf8; ['t', 'threads'], threads, '#number of threads to run', SystemInfo.dwNumberOfProcessors); Executable.Command.Get( - ['c', 'chunk'], chunkmb, 'size in #megabytes used for per-thread chunking', 16); + ['c', 'chunk'], chunkmb, 'size in #megabytes used for per-thread chunking', 8); help := Executable.Command.Option(['h', 'help'], 'display this help'); if Executable.Command.ConsoleWriteUnknown then exit @@ -469,15 +480,15 @@ function TBrcMain.SortedText: RawUtf8; chunkmb, 'MB chunks and affinity=', affinity]); QueryPerformanceMicroSeconds(start); try - main := TBrcMain.Create(fn, threads, chunkmb, {max=}45000, affinity); - // note: current stations count = 41343 for 2.5MB of data per thread + main := TBrcMain.Create(fn, threads, chunkmb, {capacity=}45000, affinity); + // note: current stations count = 41343 for 484KB of data per thread try main.WaitFor; res := main.SortedText; if verbose then ConsoleWrite(['result hash=', CardinalToHexShort(crc32cHash(res)), ', result length=', length(res), - ', stations count=', main.fList.Count, + ', stations count=', main.fCount, ', valid utf8=', IsValidUtf8(res)]) else ConsoleWrite(res);