Skip to content

Commit

Permalink
shuf: use reservoir-sampling for large or unknown sized inputs
Browse files Browse the repository at this point in the history
Reservoir sampling optimizes selecting K random lines from large or
unknown-sized input: http://en.wikipedia.org/wiki/Reservoir_sampling
Note this also avoids reading any input when -n0 is specified.

* src/shuf.c (main): Use reservoir-sampling when the number of output
lines is known, and the input size is large or unknown.
(input_size): A new function to get the input size for regular files.
(read_input_reservoir_sampling): New function to read lines from input,
keeping only K lines in memory, replacing lines with decreasing prob.
(write_permuted_output_reservoir): New function to output reservoir.
* tests/misc/shuf-reservoir.sh: An expensive_ test using valgrind to
exercise the reservoir-sampling code.
* tests/local.mk: Reference new test.
* NEWS: Mention the improvement.
  • Loading branch information
agordon authored and pixelb committed Mar 25, 2013
1 parent 4c49dc8 commit 20d7bce
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 7 deletions.
4 changes: 4 additions & 0 deletions NEWS
Expand Up @@ -25,6 +25,10 @@ GNU coreutils NEWS -*- outline -*-
inotify for files on those file systems, rather than the default (for unknown
file system types) of issuing a warning and reverting to polling.

shuf outputs subsets of large inputs much more efficiently.
Reservoir sampling is used to limit memory usage based on the number of
outputs, rather than the number of inputs.

** Build-related

factor now builds on aarch64 based systems [bug introduced in coreutils-8.20]
Expand Down
177 changes: 170 additions & 7 deletions src/shuf.c
Expand Up @@ -25,6 +25,7 @@
#include "error.h"
#include "fadvise.h"
#include "getopt.h"
#include "linebuffer.h"
#include "quote.h"
#include "quotearg.h"
#include "randint.h"
Expand All @@ -38,6 +39,18 @@

#define AUTHORS proper_name ("Paul Eggert")

/* For reservoir-sampling, allocate the reservoir lines in batches. */
enum { RESERVOIR_LINES_INCREMENT = 1024 };

/* reservoir-sampling introduces CPU overhead for small inputs.
So only enable it for inputs >= this limit.
This limit was determined using these commands:
$ for p in $(seq 7); do src/seq $((10**$p)) > 10p$p.in; done
$ for p in $(seq 7); do time shuf-nores -n10 10p$p.in >/dev/null; done
$ for p in $(seq 7); do time shuf -n10 10p$p.in >/dev/null; done .*/
enum { RESERVOIR_MIN_INPUT = 8192 * 1024 };


void
usage (int status)
{
Expand Down Expand Up @@ -135,6 +148,114 @@ next_line (char *line, char eolbyte, size_t n)
return p + 1;
}

/* Return the size of the input if possible or OFF_T_MAX if not. */

static off_t
input_size (void)
{
off_t file_size;

struct stat stat_buf;
if (fstat (STDIN_FILENO, &stat_buf) != 0)
return OFF_T_MAX;
if (usable_st_size (&stat_buf))
file_size = stat_buf.st_size;
else
return OFF_T_MAX;

off_t input_offset = lseek (STDIN_FILENO, 0, SEEK_CUR);
if (input_offset < 0)
return OFF_T_MAX;

file_size -= input_offset;

return file_size;
}

/* Read all lines and store up to K permuted lines in *OUT_RSRV.
Return the number of lines read, up to a maximum of K. */

static size_t
read_input_reservoir_sampling (FILE *in, char eolbyte, size_t k,
struct randint_source *s,
struct linebuffer **out_rsrv)
{
randint n_lines = 0;
size_t n_alloc_lines = MIN (k, RESERVOIR_LINES_INCREMENT);
struct linebuffer *line = NULL;
struct linebuffer *rsrv;

rsrv = xcalloc (n_alloc_lines, sizeof (struct linebuffer));

/* Fill the first K lines, directly into the reservoir. */
while (n_lines < k
&& (line =
readlinebuffer_delim (&rsrv[n_lines], in, eolbyte)) != NULL)
{
n_lines++;

/* Enlarge reservoir. */
if (n_lines >= n_alloc_lines)
{
n_alloc_lines += RESERVOIR_LINES_INCREMENT;
rsrv = xnrealloc (rsrv, n_alloc_lines, sizeof (struct linebuffer));
memset (&rsrv[n_lines], 0,
RESERVOIR_LINES_INCREMENT * sizeof (struct linebuffer));
}
}

/* last line wasn't NULL - so there may be more lines to read. */
if (line != NULL)
{
struct linebuffer dummy;
initbuffer (&dummy); /* space for lines not put in reservoir. */

/* Choose the fate of the next line, with decreasing probability (as
n_lines increases in size).
If the line will be used, store it directly in the reservoir.
Otherwise, store it in dummy space.
With 'struct linebuffer', storing into existing buffer will reduce
re-allocations (will only re-allocate if the new line is longer than
the currently allocated space). */
do
{
randint j = randint_choose (s, n_lines + 1); /* 0 .. n_lines. */
line = (j < k) ? (&rsrv[j]) : (&dummy);
}
while (readlinebuffer_delim (line, in, eolbyte) != NULL && n_lines++);

if (! n_lines)
error (EXIT_FAILURE, EOVERFLOW, _("too many input lines"));

freebuffer (&dummy);
}

/* no more input lines, or an input error. */
if (ferror (in))
error (EXIT_FAILURE, errno, _("read error"));

*out_rsrv = rsrv;
return MIN (k, n_lines);
}

static int
write_permuted_output_reservoir (size_t n_lines, struct linebuffer *lines,
size_t const *permutation)
{
size_t i;

for (i = 0; i < n_lines; i++)
{
const struct linebuffer *p = &lines[permutation[i]];
if (fwrite (p->buffer, sizeof (char), p->length, stdout) != p->length)
return -1;
}

return 0;
}

/* Read data from file IN. Input lines are delimited by EOLBYTE;
silently append a trailing EOLBYTE if the file ends in some other
byte. Store a pointer to the resulting array of lines into *PLINE.
Expand All @@ -152,6 +273,15 @@ read_input (FILE *in, char eolbyte, char ***pline)
size_t i;
size_t n_lines;

/* TODO: We should limit the amount of data read here,
to less than RESERVOIR_MIN_INPUT. I.E. adjust fread_file() to support
taking a byte limit. We'd then need to ensure we handle a line spanning
this boundary. With that in place we could set use_reservoir_sampling
when used==RESERVOIR_MIN_INPUT, and have read_input_reservoir_sampling()
call a wrapper function to populate a linebuffer from the internal pline
or if none left, stdin. Doing that would give better performance by
avoiding the reservoir CPU overhead when reading < RESERVOIR_MIN_INPUT
from a pipe, and allow us to dispense with the input_size() function. */
if (!(buf = fread_file (in, &used)))
error (EXIT_FAILURE, errno, _("read error"));

Expand All @@ -174,15 +304,15 @@ read_input (FILE *in, char eolbyte, char ***pline)
}

static int
write_permuted_output (size_t n_lines, char * const *line, size_t lo_input,
write_permuted_output (size_t n_lines, char *const *line, size_t lo_input,
size_t const *permutation, char eolbyte)
{
size_t i;

if (line)
for (i = 0; i < n_lines; i++)
{
char * const *p = line + permutation[i];
char *const *p = line + permutation[i];
size_t len = p[1] - p[0];
if (fwrite (p[0], sizeof *p[0], len, stdout) != len)
return -1;
Expand All @@ -209,14 +339,17 @@ main (int argc, char **argv)
char *random_source = NULL;
char eolbyte = '\n';
char **input_lines = NULL;
bool use_reservoir_sampling = false;

int optc;
int n_operands;
char **operand;
size_t n_lines;
char **line;
char **line = NULL;
struct linebuffer *reservoir = NULL;
struct randint_source *randint_source;
size_t *permutation;
int i;

initialize_main (&argc, &argv);
set_program_name (argv[0]);
Expand Down Expand Up @@ -341,17 +474,35 @@ main (int argc, char **argv)

fadvise (stdin, FADVISE_SEQUENTIAL);

n_lines = read_input (stdin, eolbyte, &input_lines);
line = input_lines;
if (head_lines != SIZE_MAX && input_size () > RESERVOIR_MIN_INPUT)
{
use_reservoir_sampling = true;
n_lines = SIZE_MAX; /* unknown number of input lines, for now. */
}
else
{
n_lines = read_input (stdin, eolbyte, &input_lines);
line = input_lines;
}
}

head_lines = MIN (head_lines, n_lines);

randint_source = randint_all_new (random_source,
use_reservoir_sampling ? SIZE_MAX :
randperm_bound (head_lines, n_lines));
if (! randint_source)
error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source));

if (use_reservoir_sampling)
{
/* Instead of reading the entire file into 'line',
use reservoir-sampling to store just "head_lines" random lines. */
n_lines = read_input_reservoir_sampling (stdin, eolbyte, head_lines,
randint_source, &reservoir);
head_lines = n_lines;
}

/* Close stdin now, rather than earlier, so that randint_all_new
doesn't have to worry about opening something other than
stdin. */
Expand All @@ -363,8 +514,13 @@ main (int argc, char **argv)

if (outfile && ! freopen (outfile, "w", stdout))
error (EXIT_FAILURE, errno, "%s", quotearg_colon (outfile));
if (write_permuted_output (head_lines, line, lo_input, permutation, eolbyte)
!= 0)

if (use_reservoir_sampling)
i = write_permuted_output_reservoir (n_lines, reservoir, permutation);
else
i = write_permuted_output (head_lines, line, lo_input,
permutation, eolbyte);
if (i != 0)
error (EXIT_FAILURE, errno, _("write error"));

#ifdef lint
Expand All @@ -375,6 +531,13 @@ main (int argc, char **argv)
free (input_lines[0]);
free (input_lines);
}
if (reservoir)
{
size_t j;
for (j = 0; j < n_lines; ++j)
freebuffer (&reservoir[j]);
free (reservoir);
}
#endif

return EXIT_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions tests/local.mk
Expand Up @@ -313,6 +313,7 @@ all_tests = \
tests/misc/shred-passes.sh \
tests/misc/shred-remove.sh \
tests/misc/shuf.sh \
tests/misc/shuf-reservoir.sh \
tests/misc/sort.pl \
tests/misc/sort-benchmark-random.sh \
tests/misc/sort-compress.sh \
Expand Down
69 changes: 69 additions & 0 deletions tests/misc/shuf-reservoir.sh
@@ -0,0 +1,69 @@
#!/bin/sh
# Exercise shuf's reservoir-sampling code
# NOTE:
# These tests do not check valid randomness,
# they just check memory allocation related code.

# Copyright (C) 2013 Free Software Foundation, Inc.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

. "${srcdir=.}/tests/init.sh"; path_prepend_ ./src
print_ver_ shuf
expensive_
require_valgrind_

# Run "shuf" with specific number of input lines and output lines
# Check the output for expected number of lines.
run_shuf_n()
{
INPUT_LINES="$1"
OUTPUT_LINES="$2"

# Critical memory-related bugs will cause a segfault here
# (with varying numbres of input/output lines)
seq "$INPUT_LINES" | valgrind --leak-check=full --error-exitcode=1 \
shuf -n "$OUTPUT_LINES" -o "out_${INPUT_LINES}_${OUTPUT_LINES}" || return 1

EXPECTED_LINES="$OUTPUT_LINES"
test "$INPUT_LINES" -lt "$OUTPUT_LINES" && EXPECTED_LINES="$INPUT_LINES"

# There is no sure way to verify shuffled output (as it is random).
# Ensure we have the correct number of all numeric lines non duplicated lines.
GOOD_LINES=$(grep '^[0-9][0-9]*$' "out_${INPUT_LINES}_${OUTPUT_LINES}" |
sort -un | wc -l) || framework_failure_
LINES=$(wc -l < "out_${INPUT_LINES}_${OUTPUT_LINES}") || framework_failure_

test "$EXPECTED_LINES" -eq "$GOOD_LINES" || return 1
test "$EXPECTED_LINES" -eq "$LINES" || return 1

return 0
}

# Test multiple combinations of input lines and output lines.
# (e.g. small number of input lines and large number of output lines,
# and vice-versa. Also, each reservoir allocation uses a 1024-lines batch,
# so test 1023/1024/1025 and related values).
TEST_LINES="0 1 5 1023 1024 1025 3071 3072 3073"

for IN_N in $TEST_LINES; do
for OUT_N in $TEST_LINES; do
run_shuf_n "$IN_N" "$OUT_N" || {
fail=1
echo "shuf-reservoir-sampling failed with IN_N=$IN_N OUT_N=$OUT_N" >&2;
}
done
done

Exit $fail

0 comments on commit 20d7bce

Please sign in to comment.