Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 43 additions & 38 deletions entries/abouchez/src/brcmormot.lpr
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,32 @@
Min, Max: SmallInt; // 16-bit (-32767..+32768) temperatures * 10
end;
PBrcStation = ^TBrcStation;
TBrcStations = array of TBrcStation;
TBrcStationDynArray = array of TBrcStation;

TBrcStations = array[word] of TBrcStation;
PBrcStations = ^TBrcStations;

TBrcList = record
public
Station: TBrcStations;
Count: integer;
{$ifdef CUSTOMHASH}
Station: PBrcStations; // perfectly aligned to 64 bytes from StationMem[]
StationHash: array of word; // store 0 if void, or Station[] index + 1
StationMem: TBrcStationDynArray;
function Search(name: pointer; namelen: PtrInt): PBrcStation;
{$else}
Station: TBrcStationDynArray;
Stations: TDynArrayHashed;
function Search(name: PByteArray): PBrcStation;
{$endif CUSTOMHASH}
procedure Init(max: integer);
procedure Init(max: integer; align: boolean);
end;

TBrcMain = class
protected
fSafe: TLightLock;
fEvent: TSynEvent;
fRunning: integer;
fRunning, fMax: integer;
fCurrentChunk: PByteArray;
fCurrentRemain: PtrUInt;
fList: TBrcList;
Expand Down Expand Up @@ -90,10 +95,14 @@ TBrcThread = class(TThread)
const
HASHSIZE = 1 shl 18; // slightly oversized to avoid most collisions

procedure TBrcList.Init(max: integer);
procedure TBrcList.Init(max: integer; align: boolean);
begin
assert(max <= high(StationHash[0]));
SetLength(Station, max);
SetLength(StationMem, max); // RTL won't align by 64 bytes
Station := pointer(StationMem);
if align then
while PtrUInt(Station) and 63 <> 0 do // manual alignment
inc(PByte(Station));
SetLength(StationHash, HASHSIZE);
end;

Expand Down Expand Up @@ -206,10 +215,11 @@ function StationComp(const A, B): integer;
result := MemCmp(@sa.NameLen, @sb.NameLen, sa.NameLen + 1);
end;

procedure TBrcList.Init(max: integer);
procedure TBrcList.Init(max: integer; align: boolean);
begin
// align is just ignored, because TDynArray requires natural alignment
Stations.Init(
TypeInfo(TBrcStations), Station, @StationHash, @StationComp, nil, @Count);
TypeInfo(TBrcStationDynArray), Station, @StationHash, @StationComp, nil, @Count);
Stations.Capacity := max;
end;

Expand All @@ -220,7 +230,7 @@ function TBrcList.Search(name: PByteArray): PBrcStation;
begin
assert(name^[0] < SizeOf(TBrcStation.NameText));
i := Stations.FindHashedForAdding(name^, added);
result := @Station[i]; // in two steps (Station[] may be reallocated if added)
result := @Station[i]; // in two steps (Station[] may be reallocated)
if not added then
exit;
MoveFast(name^, result^.NameLen, name^[0] + 1);
Expand All @@ -237,7 +247,7 @@ constructor TBrcThread.Create(owner: TBrcMain);
begin
fOwner := owner;
FreeOnTerminate := true;
fList.Init(length(fOwner.fList.Station));
fList.Init(fOwner.fMax, {align=}true);
InterlockedIncrement(fOwner.fRunning);
inherited Create({suspended=}false);
end;
Expand Down Expand Up @@ -328,7 +338,8 @@ constructor TBrcMain.Create(const fn: TFileName; threads, max: integer;
fEvent := TSynEvent.Create;
if not fMem.Map(fn) then
raise ESynException.CreateUtf8('Impossible to find %', [fn]);
fList.Init(max);
fMax := max;
fList.Init(fMax, {align=}false); // not aligned for TDynArray.Sort to work
fCurrentChunk := pointer(fMem.Buffer);
fCurrentRemain := fMem.Size;
core := 0;
Expand Down Expand Up @@ -391,30 +402,23 @@ procedure TBrcMain.Aggregate(const another: TBrcList);
n: integer;
begin
fSafe.Lock; // several TBrcThread may finish at the same time
{$ifdef CUSTOMHASH}
if fList.Count = 0 then
fList := another // we can reuse the existing hash table
else
{$endif CUSTOMHASH}
begin
n := another.Count;
s := pointer(another.Station);
repeat
{$ifdef CUSTOMHASH}
d := fList.Search(@s^.NameText, s^.NameLen);
{$else}
d := fList.Search(@s^.NameLen);
{$endif CUSTOMHASH}
inc(d^.Count, s^.Count);
inc(d^.Sum, s^.Sum);
if s^.Max > d^.Max then
d^.Max := s^.Max;
if s^.Min < d^.Min then
d^.Min := s^.Min;
inc(s);
dec(n);
until n = 0;
end;
n := another.Count;
s := pointer(another.Station);
repeat
{$ifdef CUSTOMHASH}
d := fList.Search(@s^.NameText, s^.NameLen);
{$else}
d := fList.Search(@s^.NameLen);
{$endif CUSTOMHASH}
inc(d^.Count, s^.Count);
inc(d^.Sum, s^.Sum);
if s^.Max > d^.Max then
d^.Max := s^.Max;
if s^.Min < d^.Min then
d^.Min := s^.Min;
inc(s);
dec(n);
until n = 0;
fSafe.UnLock;
if InterlockedDecrement(fRunning) = 0 then
fEvent.SetEvent; // all threads finished: release main console thread
Expand Down Expand Up @@ -482,8 +486,8 @@ function TBrcMain.SortedText: RawUtf8;
tmp: TTextWriterStackBuffer;
begin
{$ifdef CUSTOMHASH}
DynArrayFakeLength(pointer(fList.Station), fList.Count);
DynArray(TypeInfo(TBrcStations), fList.Station).Sort(ByStationName);
DynArrayFakeLength(fList.Station, fList.Count);
DynArray(TypeInfo(TBrcStationDynArray), fList.Station).Sort(ByStationName);
{$else}
fList.Stations.Sort(ByStationName);
{$endif CUSTOMHASH}
Expand Down Expand Up @@ -530,7 +534,8 @@ function TBrcMain.SortedText: RawUtf8;
assert(SizeOf(TBrcStation) = 64); // 64 bytes = CPU L1 cache line size
// read command line parameters
Executable.Command.ExeDescription := 'The mORMot One Billion Row Challenge';
fn := Executable.Command.ArgString(0, 'the data source #filename');
if Executable.Command.Arg(0, 'the data source #filename') then
Utf8ToFileName(Executable.Command.Args[0], fn);
verbose := Executable.Command.Option(
['v', 'verbose'], 'generate verbose output with timing');
affinity := Executable.Command.Option(
Expand Down