Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<Version Value="11"/>
<PathDelim Value="\"/>
<Target>
<Filename Value="..\..\..\bin\ghatem-dirty"/>
<Filename Value="..\..\..\bin\ghatem-largerec"/>
</Target>
<SearchPaths>
<IncludeFiles Value="$(ProjOutDir)"/>
Expand Down Expand Up @@ -60,7 +60,7 @@
<Version Value="11"/>
<PathDelim Value="\"/>
<Target>
<Filename Value="..\..\..\bin\ghatem-dirty"/>
<Filename Value="..\..\..\bin\ghatem-largerec"/>
</Target>
<SearchPaths>
<IncludeFiles Value="$(ProjOutDir)"/>
Expand Down Expand Up @@ -89,7 +89,7 @@
<Version Value="11"/>
<PathDelim Value="\"/>
<Target>
<Filename Value="..\..\..\bin\ghatem-dirty"/>
<Filename Value="..\..\..\bin\ghatem-largerec"/>
</Target>
<SearchPaths>
<IncludeFiles Value="$(ProjOutDir)"/>
Expand Down Expand Up @@ -126,7 +126,7 @@
</RequiredPackages>
<Units Count="1">
<Unit0>
<Filename Value="OneBRC-dirty.lpr"/>
<Filename Value="OneBRC-largerec.lpr"/>
<IsPartOfProject Value="True"/>
</Unit0>
</Units>
Expand All @@ -135,7 +135,7 @@
<Version Value="11"/>
<PathDelim Value="\"/>
<Target>
<Filename Value="..\..\..\bin\ghatem-dirty"/>
<Filename Value="..\..\..\bin\ghatem-largerec"/>
</Target>
<SearchPaths>
<IncludeFiles Value="$(ProjOutDir)"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
Baseline.Console;

const
cNumStations = 41343;
cNumStations = 41343; // as per the input file
cDictSize = 248071; // numstations * 6, next prime number
cThreadCount = 32;

Expand Down Expand Up @@ -41,7 +41,7 @@ TOneBRCApp = class(TCustomApplication)
TStationData = packed record
Min: SmallInt;
Max: SmallInt;
Count: UInt16;
Count: UInt32;
Sum: Integer;
end;
PStationData = ^TStationData;
Expand All @@ -59,18 +59,42 @@ TOneBRCApp = class(TCustomApplication)

TBRCDictionary = class
private
// keys/values for a Large dictionary:
// values will be the index where the actual record is stored
FHashes: THashes;
FIndexes: TIndexes;
FStationNames: TStationNames;

// where the actual records are stored:
// - each thread holds its own data
// - for each thread, pre-allocate as much space as needed
// - all threads store their data at the exact same index
FThreadData: array [0..cThreadCount-1] of array [0..cNumStations-1] of TStationData;

// station names are also shared, not lock-protected (in the worst case, the value is written twice)
// stored separately as it is rarely needed
FStationNames: TStationNames;

// points to the next slot in FThreadData where we should fill a newly encountered station
FCounter: TStationCount;

// exclusively to protect FCounter from concurrent-writes
FCS: TCriticalSection;

// searches for a given key, returns if found the key and the storage index
// (or, if not found, which index to use next)
procedure InternalFind(const aKey: Cardinal; out aFound: Boolean; out aIndex: THashSize);

public
constructor Create;
destructor Destroy; override;

// simple wrapper to find station-record pointers
function TryGetValue (const aKey: Cardinal; const aThreadNb: TThreadCount; out aValue: PStationData): Boolean; inline;

// multithread-unprotected: adds a firstly-encountered station-data (temp, name)
procedure Add (const aHashIdx: THashSize; const aThreadNb: TThreadCount; const aTemp: SmallInt; const aStationName: AnsiString); inline;

// multithread-protected: safely assign a slot for a given key
function AtomicRegisterHash (const aKey: Cardinal): THashSize;
end;

Expand All @@ -89,17 +113,27 @@ TOneBRC = class
FThreads: array of TThread;
FDictionary: TBRCDictionary;

// for a line between idx [aStart; aEnd], returns the station-name length, and the integer-value of temperature
procedure ExtractLineData(const aStart: Int64; const aEnd: Int64; out aLength: ShortInt; out aTemp: SmallInt); inline;

public
constructor Create (const aThreadCount: TThreadCount);
destructor Destroy; override;
function mORMotMMF (const afilename: string): Boolean;

// initial thread-spawn
procedure DispatchThreads;

// await for all threads to complete work
procedure WaitAll;

// executed by each thread to process data in the given range
procedure ProcessData (aThreadNb: TThreadCount; aStartIdx: Int64; aEndIdx: Int64);

// merge data from all threads
procedure Merge (aLeft: TThreadCount; aRight: TThreadCount);
procedure MergeAll;

procedure GenerateOutput;
property DataSize: Int64 read FDataSize;
end;
Expand Down Expand Up @@ -160,6 +194,9 @@ function TBRCDictionary.AtomicRegisterHash(const aKey: Cardinal): THashSize;
var
vFound: Boolean;
begin
// must call InternalFind again, within the critical-section,
// to ensure the slot was not taken by another thread
// this function should execute only once per station, so at most 41343 times
FCS.Acquire;
try
InternalFind (aKey, vFound, Result);
Expand All @@ -177,14 +214,19 @@ procedure TBRCDictionary.InternalFind(const aKey: Cardinal; out aFound: Boolean;
var vIdx: Integer;
vOffset: Integer;
begin
// Lemire hashing: faster to ocmpute than modulus, but more collisions from trials
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
// thanks Arnaud for the suggestion
vIdx := aKey * cDictSize shr 32;

if FHashes[vIdx] = 0 then begin
// found match
aIndex := vIdx;
aFound := False;
exit;
end;
if FHashes[vIdx] = aKey then begin
// found empty bucket to use
aIndex := vIdx;
aFound := True;
exit;
Expand All @@ -193,7 +235,7 @@ procedure TBRCDictionary.InternalFind(const aKey: Cardinal; out aFound: Boolean;
vOffset := 1;

while True do begin
// quadratic probing, by incrementing vOffset
// linear (not quadratic) probing, with continous increments to minimize clusters
Inc (vIdx, vOffset);
Inc (vOffset);

Expand Down Expand Up @@ -293,29 +335,35 @@ procedure TOneBRC.ExtractLineData(const aStart: Int64; const aEnd: Int64; out aL
// repeat with the remaining digits, multiplying by 10^x (skip the '.')
// multiply by -1 upon reaching a '-'

//aTemp := (Ord(FData[aEnd]) - c0ascii)
// + 10 *(Ord(FData[aEnd-2]) - c0ascii);
//vDigit := Ord(FData[aEnd-3]);
//if (vDigit >= c0ascii) and (vDigit <= c9ascii) then begin
// aTemp := aTemp + 100*(Ord(FData[aEnd-3]) - c0ascii);
// vDigit := Ord(FData[aEnd-4]);
// if vDigit = cNegAscii then
// aTemp := -aTemp;
//end
//else if vDigit = cNegAscii then
// aTemp := -aTemp;

{
aTemp := (Ord(FData[aEnd]) - c0ascii)
+ 10 *(Ord(FData[aEnd-2]) - c0ascii);
vDigit := Ord(FData[aEnd-3]);
if (vDigit >= c0ascii) and (vDigit <= c9ascii) then begin
aTemp := aTemp + 100*(Ord(FData[aEnd-3]) - c0ascii);
vDigit := Ord(FData[aEnd-4]);
if vDigit = cNegAscii then
aTemp := -aTemp;
end
else if vDigit = cNegAscii then
aTemp := -aTemp;
}

//==========
// entire computation is branchless (for readability, see version above)
// no intermediary results also showed better performance

// 0 if -
// 1 if +
// convert range [0;1] to [-1;1] for branchless negation when needed
// if there is a 3rd digit (*100), add it, otherwise multiply by 0 to cancel it out
vIsNeg := Ord (FData[J+1] <> '-');

aTemp := (
(Ord(FData[aEnd]) - c0ascii)
+ 10 *(Ord(FData[aEnd-2]) - c0ascii)
+ Ord ((J+4 - vIsNeg < aEnd)) * 100*(Ord(FData[aEnd-3]) - c0ascii)
) * (vIsNeg * 2 - 1);
//if (J+4 - vIsNeg < aEnd) then begin
//aTemp := aTemp
//end;
//aTemp := (vIsNeg * 2 - 1) * aTemp;
end;

//---------------------------------------------------
Expand Down Expand Up @@ -350,6 +398,7 @@ procedure TOneBRC.DispatchThreads;
I: TThreadCount;
vRange: Int64;
begin
// distribute input equally across available threads
vRange := Trunc (FDataSize / FThreadCount);

for I := 0 to FThreadCount - 1 do begin
Expand Down Expand Up @@ -380,6 +429,7 @@ procedure TOneBRC.ProcessData (aThreadNb: TThreadCount; aStartIdx: Int64; aEndId
vLenStationName: ShortInt;
vFound: Boolean;
begin
// initialize min/max, else we may get zeroes (due to our Add that fires once per station across all threads)
for I := 0 to cNumStations - 1 do begin
FDictionary.FThreadData[aThreadNb][I].Max := -2000;
FDictionary.FThreadData[aThreadNb][I].Min := 2000;
Expand Down Expand Up @@ -422,7 +472,6 @@ procedure TOneBRC.ProcessData (aThreadNb: TThreadCount; aStartIdx: Int64; aEndId

FDictionary.InternalFind (vHash, vFound, vHashIdx);


if vFound then begin
vData := @FDictionary.FThreadData[aThreadNb][FDictionary.FIndexes[vHashIdx]];
if vTemp < vData^.Min then
Expand All @@ -435,9 +484,11 @@ procedure TOneBRC.ProcessData (aThreadNb: TThreadCount; aStartIdx: Int64; aEndId
else begin
// pre-allocated array of records instead of on-the-go allocation
vHashIdx := FDictionary.AtomicRegisterHash (vHash);

// SetString done only once per station name, for later sorting
SetString(vStation, pAnsiChar(@FData[vLineStart]), vLenStationName);

// data can be safely added at the given index, without locking
FDictionary.Add(vHashIdx, aThreadNb, vTemp, vStation);
end;

Expand All @@ -460,6 +511,7 @@ procedure TOneBRC.Merge(aLeft: TThreadCount; aRight: TThreadCount);
vDataL: PStationData;
I: Integer;
begin
// accumulate data into Left
for I := 0 to cNumStations - 1 do begin
vDataR := @FDictionary.FThreadData[aRight][I];
vDataL := @FDictionary.FThreadData[aLeft][I];
Expand All @@ -478,6 +530,7 @@ procedure TOneBRC.MergeAll;
var
I: TThreadCount;
begin
// all thread-data is accumulated into index 0
for I := 1 to FThreadCount - 1 do begin
Merge (0, I);
end;
Expand All @@ -487,6 +540,8 @@ procedure TOneBRC.MergeAll;

function MyFormatInt (const aIn: SmallInt): AnsiString; inline;
begin
// much faster than FormatFloat
// oddly, IntToStr does not include leading zeroes for both pos and neg numbers
Result := IntToStr(aIn);
Insert ('.', Result, Length(Result));

Expand Down
Loading