Author
Immanuel Haffner
immanuel.haffner@bigdata.uni-saarland.de
bigdata.uni-saarland.de/people/haffner.php
Ph.D. Student
Big Data Analytics Group, Saarland Informatics Campus
Advisor:
Prof. Dr. Jens Dittrich
jens.dittrich@bigdata.uni-saarland.de
bigdata.uni-saarland.de/people/dittrich.php
Big Data Analytics Group, Saarland Informatics Campus
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
Copyright 2019 Immanuel Haffner
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
For sake of completeness, the licence is also provided in the licence file LICENCE.txt
.
The poster explaining our solution - which was presented at SIGMOD'19 poster session - can be found
here. For the sake of
completeness, the poster repository is included here as a submodule in folder poster/
.
- Distinguish between data sets that fit entirely into main memory and those that require external sorting.
- Detect dynamically whether the data set is pure ASCII or contains non-ASCII characters.
- Open the output file with
open()
and map it into virtual address space viammap()
.
- Assume a uniform distribution of the keys in their respective domain (ASCII vs. non-ASCII).
- Partition records by their first byte (256 distinct values) into buckets.
- Predict the bucket sizes according to domain and distribution in the virtual address space of the mapped output file.
- Read and simultaneously partition records from the input file into the mapped output file. Save potential overflow in main memory.
- Fix overflow after partitioning.
- Finish the 256 buckets by concurrently sorting buckets with American Flag Sort, which falls back to
std::sort
if the length of the sequence to sort is below a fixed threshold. - Need not explicitly write data to disk, as the kernel will take care of the dirty mapped pages ;)
- Read the first 28 GiB into main meory.
- Start sorting the in-memory data in a separate thread.
- Read remaining data from input file and simultaneously partition it by write records to 256 bucket files on the output disk.
- Wait for sorting and partitioning to complete.
- Build a priority queue of buckets, sorted by the bucket size in ascending order.
- Merge the buckets on disk with the in-memory data, bucket by bucket, according to the priority queue. This happens in a decoupled pipeline, where bucket i+2 is read while bucket i+1 is sorted while bucket i is merged and written to the output file.
- For the remaining 28GiB of the file, don't explicitly write data to the file but instead write it the the mapped memory. Lock the pages in memory on fault. This trades write performance (~477MiB/s) for read performance (~515MiB/s).
"Modern and efficient C++ Thread Pool Library" available on Github
Licence: Apache v2
(Internally, one of the two implementations uses boost's lock free queue.)
"Processor Counter Monitor (PCM) is an application programming interface (API) and a set of tools based on the API to monitor performance and energy metrics of Intel® Core™, Xeon®, Atom™ and Xeon Phi™ processors."
Available on Github
Licence: custom open source licence (see Github repository)
The asmlib
subroutine library by Agner Fog.
Licence: GNU GPL v3
- Indy sort: 100 byte records with 10 bytes key and 90 bytes payload
- ASCII or binary, always 100 byte records
Generate data with checksum:
$ ./gensort -c <NUM_RECORDS> <FILE.bin> 2> <FILE.sum>
A journey on fast reading and writing in Linux.
-
To read a file sequentially, use buffered I/O from glibc with a custom, large buffer (64 KiB?)
- open file with
fopen(path, "rb")
- set buffer with
setvbuf()
andmode
to_IOFBF
(fully buffered) - write the file via
fwrite()
(granularity doesn't really matter here, since it is buffered)
- open file with
-
On SSDs with concurrent operations (multiple lanes), concurrently reading is necessary to achieve maximum throughput
- get the file stats (size, preferred block size, etc) with
fstat()
- divide the file into as many consecutive chunks as you have reader threads
- make sure to align the chunks to the preferred block size (or multiples thereof, called slabs)
- each thread reads its chunk, a slab at a time
- use
pread()
to read the file at a particular offset; from the man page:
"The pread() and pwrite() system calls are especially useful in multithreaded applications. They allow multiple threads to perform I/O on the same file descriptor without being affected by changes to the file offset by other threads."
- use
- get the file stats (size, preferred block size, etc) with
-
To write file sequentially without thrashing the page cache, follow Linus' advice from the Linux Kernel Developer mailing list
- write file in large chunks (he uses 8 MiB)
- after issuing write-out of chunk n, request to sync write-out of chunk n and wait for sync of chunk n-1 to
disk using
sync_file_range()
(blocks until sync is done) - Linus further explains and recommends the use of
posix_fadvise()
to mark pages in the cache that were just synced for removal; this relaxes cache pressure; NOTE thatposix_fadvise()
gives a hint to the OS about the current state and not the future - briefly summarized on StackOverflow
-
If a file must be written randomly, it is worthwhile to issue the writes concurrently.
-
Concurrently writing an entire file (that could very well be written sequentially) is not beneficial.
-
Final write of the result output file can be delayed. (This really feels like cheating.)
- Create fresh output file with
open(path, O_CREAT|O_TRUNC|O_RDWR, 0644)
- Allocate space for the file:
if (fallocate(fd, mode, offset, len) == EOPNOTSUPP) ftruncate(fd, len);
- prefer
fallocate()
, but if not supported fall back toftruncate()
- avoid
posix_fallocate()
; the man page says:
"If the underlying filesystem does not support fallocate(2), then the operation is emulated with the following caveats:
* The emulation is inefficient."
- prefer
mmap()
the output file withPROT_READ|PROT_WRITE
andMAP_SHARED
- the
prot
settings allow us to read and write to the mapped memory region - the flag
MAP_SHARED
tells the OS that changes should be carried through to the underlying file
- the
- To "write" the file eventually, just issue
msync(MS_ASYNC)
- quoting the man page:
"Since Linux 2.6.19, MS_ASYNC is in fact a no-op, since the kernel properly tracks dirty pages and flushes them to storage as necessary."
- quoting the man page:
- This strategy leaves the kernel in duty to properly write the
mmap()
'd file from the page cache to disk. The program is free to terminate (or crash), the kernel will write the most recent state of the memory to the file on disk.
- Create fresh output file with
Linux implements the POSIX asynchronous I/O (aio)
- To satisfy functional completeness of the test harness, for testing, and for simplicity, use STL's
std::sort
on an array of records.- horribly slow
- To exploit the multiple CPU cores with low effort, use GNU's parallel
mode
- fire & forget with
__gnu_parallel::sort()
- exhausts all cores, but still slow
- fire & forget with
Records are 100 bytes long, composed of 10 bytes key and 90 bytes payload. Moving the payload together with the key induces heavy copying during the sort. We can reduce the copy overhead by creating an index into the records array and sorting the index (sometimes called tag sort).
The largest file is 100 GiB, approx. 100 * 1024³ bytes or 1024³ = 2³30 records. Therefore, a 4 byte integer suffices as index to connect key with payload and makes a nice 14 byte key + index package. To index the entire 100 GiB file one needs approximately 100 GiB * (14 B / 100 B) = 14 GiB, which nicely fits into main memory and still leaves approx. 16 GiB memory for other usage.
Such a tag sort requires reconstruction of the tuples. Does the index creation and the sorting of the index outperform a regular sort? How much memory is required and is it even feasible in our setting? For the tag sort, we must store the payloads (90 B), the key-index pairs (14 B), and the reconstructed output data (100 B). This means, we need more than 2x the input data as memory.
Some popular and practically efficient sorting algorithms (like Timsort) exploit existent sorted sequences, or runs, in the input. Since in our scenario data is generated randomly (uniformly or skewed), it is very unlikely to expect long runs. Therefore, it is not in our interest to exploit existent runs.
Radix sort algorithms are generally used to sort long objects, such as strings, where comparisons are expensive. In our scenario, the key is a 10 byte sequence.
To reduce end-to-end times of the application, we should reduce the latency between the first read and sorting, and exploit the computational power of the system. When only reading data, the CPU sits mostly idle, waiting for the I/O of the drive. It seems promising to interleave reading with sorting somehow.
We can fully interleave reading with sorting by performing a tournament sort with every next record read. The locations of the runs must then be saved in some auxiliary data structure and a (recurrent and multi-way) merge must be performed to get the fully-sorted list.
We can interleave the reading with sorting by inserting every read record in sorted order into a sequence. These sequences can have an upper limit for the length, when insertion sort becomes sub par, and a new sequence is started. This way, sequences have the same fixed length.
Instead of fully interleaving the read and the sort, we can perform reading and sorting block-wise and overlap the two operations. After reading a block of records, a sort for this block is initiated while the next block is read from the input. Eventually, we have a sequence of fixed-length blocks of sorted records. Again, we must merge them eventually.
When wiritng data out to disk, we can merge multiple sorted sequences that are kept in memory into one. The write performance is significantly less than the throughput at which we can merge the sequences.
For input sizes that fit the main memory entirely, it is likely to be better to fully sort them and delay and delegate the write out to the kernel.
Some thoughts of boost on radix sort
American Flag Sort is an in-place variant of MSD radix sort. It works by iteratively distributing items into buckets. A histogram generation pass is used to compute the locations of the buckets, such that items can directly be moved to their correct position.
American Flag Sort follows the divide-and-conquer strategy, and hence exposes a high degree of task parallelism. This makes it very suitable for exploiting the many cores of the target system.
A Least Significant Radix Sort sorts items by iteratively stable sorting the items by a single digit, starting with the least significant digit and advancing until the items have been sorted by the most significant digit. Because the sort by digit is stable, sorting by digit n preserves the order of all items that are equal w.r.t. digits 0...n-1. Because every step of the LSD radix sort is stable, the entire sort is stable, too.
LSD Radix Sort performs independently of the data distribution. It's performance is solely determined by data set size and key length. Since there is no best- or worst-case for LSD Radix Sort, it fails to exploit significant structure in the data. However, in our scenario, data is randomly generated with either uniform or skewed distribution. It is unlikely, that there is much structure to exploit. On the other side, the 10 byte, fixed-length keys and the large data set size make LSD an interesting contestant. The fact, that LSD radix sort works out-of-place and requires additional memory linear in the size of the input, could be a dealbreaker.