-
Notifications
You must be signed in to change notification settings - Fork 3
/
prll.sh
368 lines (331 loc) · 10 KB
/
prll.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# Copyright 2009-2018 Jure Varlec
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is provided in COPYING.
# If not, see <http://www.gnu.org/licenses/>.
prll_real() {
prll_usage() {
cat <<-EOF
prll version 0.9999
Copyright 2009-2018 Jure Varlec
USAGE: prll [ options ] { fun_name | -s 'fun_string' } fun_args ...
Shell function 'fun_name' will be run for each of 'fun_args'.
Alternatively, using -s, shell code 'fun_string' will be executed
as if it were the body of a function.
Summary of options:
-s str Use string 'str' as shell code to run.
-p Read lines from standard input and use them
instead of 'fun_args'.
-0 Same as -p, but reads null-delimited input.
-B Enable output buffering, which is the default.
Use to override the PRLL_BUFFER env. variable.
-b Disable output buffering.
-c num Set number of parallel tasks to 'num'.
-q Disable progress messages.
-Q Disable all messages except errors.
The number of tasks to be run in parallel can be set with
the PRLL_NRJOBS environment variable or the -c option. If
it is not set, prll will attempt to read the number of CPUs
from /proc/cpuinfo.
Buffering can be disabled by setting PRLL_BUFFER environment
variable to "no" or "0".
See the prll manpage for more information.
EOF
exit ${1:-1}
}
prll_die() {
for prll_i ; do
printf "PRLL: %s\n" "$prll_i" 1>&2
done
exit 1
}
prll_msg() {
case $1 in
'') printf '\n' 1>&2 ;;
# Suppress the newline
-n) shift; printf 'PRLL: %s' "$*" 1>&2 ;;
# Interpret arguments as a printf string
-e) shift; printf "$@" 1>&2 ;;
*) printf 'PRLL: %s\n' "$*" 1>&2 ;;
esac
}
# This path is defined on installation. For developement, it
# is overriden by environment.
PRLL_HELPER_PATH=${PRLL_HELPER_PATH:?}
# This executables are always needed.
command -v "${PRLL_HELPER_PATH}/"prll_qer > /dev/null \
|| prll_die "Missing prll_qer."
command -v "${PRLL_HELPER_PATH}/"prll_bfr > /dev/null \
|| prll_die "Missing prll_bfr."
prll_qer() {
"${PRLL_HELPER_PATH}/"prll_qer "$@"
}
prll_bfr() {
"${PRLL_HELPER_PATH}/"prll_bfr "$@"
}
# Read parameters and environment variables.
prll_unbuffer=no
[ "$PRLL_BUFFER" = "no" -o "$PRLL_BUFFER" = "0" ] && prll_unbuffer=yes
[ -n "$PRLL_NR_CPUS" ] && prll_nr_cpus=$PRLL_NR_CPUS # Backward compat.
[ -n "$PRLL_NRJOBS" ] && prll_nr_cpus=$PRLL_NRJOBS
OPTIND=1
prll_funname='' # Function to execute
prll_read=no # Whether to read standard input
while getopts "s:p0bBc:qQhH?" prll_i
do case $prll_i in
s) eval "prll_str2func() { $OPTARG
}"
prll_funname=prll_str2func ;;
p) prll_read=stdin ;;
0) prll_read=null ;;
b) prll_unbuffer=yes ;;
B) prll_unbuffer=no ;;
c) prll_nr_cpus="$OPTARG" ;;
q) prll_quiet=yes ;;
Q) prll_msg() { : ; } ;;
h) prll_usage 0 ;;
*) prll_usage ;;
esac
done
shift $((OPTIND - 1))
# Function was not given as a string, so the next argument must be
# the name of an external function.
if [ -z "$prll_funname" ] ; then
prll_funname="$1"
shift
fi
# Add an exception for -p and -0 for backwards compatibility
if [ "$1" = '-p' ] ; then
prll_read=stdin
shift
elif [ "$1" = '-0' ] ; then
prll_read=null
shift
fi
# Check whether arguments were supplied
if [ $prll_read = no -a -z "$1" ] ; then
prll_msg -e "Nothing to do...\n\n"
prll_usage
fi
# Number of CPUs was not given, so find it. Also, check for sanity.
if [ -z "$prll_nr_cpus" ] ; then
command -v grep > /dev/null
if [ $? -ne 0 -o ! -e /proc/cpuinfo ] ; then
prll_die \
"The number of CPUs is not set and either the grep" \
"utility is missing or there is no /proc/cpuinfo file." \
"Please set the number of CPUs."
fi
prll_nr_cpus=$(grep "processor :" < /proc/cpuinfo | wc -l)
elif [ $prll_nr_cpus -lt 1 ] ; then
prll_die "The number of CPUs is zero."
elif ! [ $prll_nr_cpus -ge 1 ] ; then
prll_die "Invalid number of CPUs."
fi
# If not reading from stdin, setup positional arguments.
if [ $prll_read = no ] ; then
prll_nr_args=$#
if [ $prll_nr_args -lt $prll_nr_cpus ] ; then
prll_nr_cpus=$prll_nr_args
fi
fi
# Create IPCs.
prll_msg "Using $prll_nr_cpus CPUs"
prll_Qkey="$(prll_qer n)"
if [ $? -ne 0 ] ; then
prll_die "Failed to create message queue."
else
prll_msg "Created message queue with key $prll_Qkey"
fi
prll_Skey="$(prll_bfr n)"
if [ $? -ne 0 ] ; then
prll_die "Failed to create semaphore."
else
prll_msg "Created semaphore with key $prll_Skey"
fi
if [ $prll_read != no ] ; then
prll_Skey2="$(prll_bfr n)"
if [ $? -ne 0 ] ; then
prll_die "Failed to create semaphore."
else
prll_msg "Created semaphore with key $prll_Skey2"
fi
fi
prll_msg "Starting work."
# Start reading stdin.
(
if [ $prll_read != no ] ; then
# This subshell has its own trap.
trap "prll_bfr r $prll_Skey2" INT
if [ $prll_read = stdin ] ; then
prll_bfr w $prll_Skey2
elif [ $prll_read = null ] ; then
prll_bfr W $prll_Skey2
fi
# Signal completion by removing the semaphore.
prll_bfr r $prll_Skey2
else
# Nothing on stdin, so just close stdout and exit the
# subshell.
exec 1>&-
fi
) | (
# Load some tokens into the queue. This will make the main loop
# start a few jobs to run in parallel.
prll_i=1
while [ $prll_i -le $prll_nr_cpus ] ; do
prll_qer c $prll_Qkey 0;
prll_i=$((prll_i + 1))
done
#####################
# FUNCTIONS FOR USERS
# Gracefully abort by inserting a 'quit' message into the queue.
prll_interrupt() {
prll_msg \
"Job $prll_progress interrupting execution." \
"Waiting for unfinished jobs."
prll_qer c $prll_Qkey 1
return 130
}
# A simple substitute for GNU seq.
prll_seq() {
prll_seq_i=1
if [ -n "$2" ] ; then
prll_seq_i=$1
shift
fi
echo $prll_seq_i
while [ $prll_seq_i -lt $1 ] ; do
prll_seq_i=$((prll_seq_i + 1))
echo $prll_seq_i
done
}
# Locking. prll_lockery() shouldn't be called by users.
prll_lockery() {
prll_locknum="$1"
[ -z "$prll_locknum" ] && prll_locknum=0
[ "$prll_locknum" -ge 5 -o "$prll_locknum" -lt 0 ] && \
prll_die "Illegal lock number!"
prll_bfr $2 $prll_Skey $prll_locknum
}
prll_lock() {
prll_lockery "$1" u
}
prll_unlock() {
prll_lockery "$1" U
}
# Argument splitting. Generates prll_arg_X variables, where X
# numbers them from 1 upwards.
prll_splitarg() {
prll_spl_i=1
prll_spl_rest="$prll_jarg"
while [ -n "$prll_spl_rest" ] ; do
prll_spl_varname="prll_arg_$prll_spl_i"
eval "
read $prll_spl_varname prll_spl_rest <<EOF
$prll_spl_rest
EOF
"
prll_spl_i=$((prll_spl_i+1))
done
prll_arg_num=$((prll_spl_i-1))
}
#######################
# END OF USER FUNCTIONS
prll_progress=0 # Counts started jobs
prll_jbfinish=0 # Counts finished jobs
# The main loop. Its iterations are controlled by prll_qer, which
# waits for a message to arrive before starting a new job, unless
# the message tells it to quit.
while prll_qer o $prll_Qkey ; do
# Only start counting finished jobs after the initial batch of
# tokens is exhausted.
[ $prll_progress -ge $prll_nr_cpus ] && \
prll_jbfinish=$((prll_jbfinish + 1))
# The function argument. It gets its value either from
# positional arguments or stdin.
prll_jarg=''
if [ $prll_read = no ] ; then
# If there are no more arguments, break the main loop.
if [ $prll_progress -ge $prll_nr_args ] ; then
break
else
prll_jarg="$1"
shift
fi
else
prll_jarg="$(prll_bfr c $prll_Skey2)"
# If there is nothing more to read, break the main loop.
if [ $? -ne 0 ] ; then
break
fi
fi
# Disable the interrupt trap for the rest of the loop iteration
# to make sure jobs are counted correctly.
trap '' INT
# Spawn subshells that start the job and buffer.
# It is done in a very roundabout way in order to workaround a race
# condition with zsh.
prll_launch_code='
(
prll_jobnr=$prll_progress
$prll_funname "$prll_jarg"
[ -z "$prll_quiet" ] &&
prll_msg "Job number $prll_progress finished. Exit code: $?"
) | \
(
if [ $prll_unbuffer = yes ] ; then
cat
else
prll_bfr b $prll_Skey
fi
prll_qer c $prll_Qkey 0
) &'
[ -n "$ZSH_VERSION" ] && prll_launch_code="${prll_launch_code}!"
eval "$prll_launch_code"
# Print progress
prll_status="Starting job ${prll_progress}, PID $!"
if [ $prll_read = no ] ; then
prll_status="$prll_status Progress:"
prll_status="$prll_status $((prll_progress*100/prll_nr_args))%"
prll_status="$prll_status Arg: $prll_jarg"
fi
[ -z "$prll_quiet" ] && prll_msg "$prll_status"
prll_progress=$((prll_progress + 1))
# Setup the interrupt trap.
trap 'prll_interrupted=1' INT
done
# Cleanup. It sets up its own interrupt trap to notify the user he
# has to do his own cleanup.
trap '' INT
if [ -n "$prll_interrupted" ] ; then
prll_msg "INTERRUPTED!"
fi
prll_msg "Waiting for unfinished jobs."
if [ $prll_progress -lt $prll_nr_cpus ] ; then
prll_progress=$((prll_nr_cpus-1))
fi
while [ $prll_progress -gt $prll_jbfinish ] ; do
prll_qer o $prll_Qkey || break
prll_jbfinish=$((prll_jbfinish + 1))
done
prll_msg "Cleaning up."
prll_qer r $prll_Qkey
prll_bfr r $prll_Skey
[ $prll_read != no ] && prll_bfr r $prll_Skey2
true # No use returning the status of IPC removal
)
return $?
}
prll() {
( prll_real "$@" )
return $?
}