JensNevens committed Oct 8, 2018
1 parent 83c635f commit 82a1932ef83e8f84f627b72b8e4741fce4803a12
@@ -1 +1 @@
@@ -3,7 +3,9 @@
(defsystem :corpus-processing
:depends-on (:utils :cl-fad :cl-json :trivial-timeout)
(#+:lw(:file "corpus-processing")
(:file "json-corpus-processing")))
((:file "corpus-processing")
(:file "json-corpus-processing")
(:file "json-stream-processing")

@@ -20,7 +20,8 @@
(number-of-threads 4)
(number-of-lines-per-thread 2000))
(number-of-lines-per-thread 2000)
"Applies function to every line in inputfile and writes the result in outputfile.
A higher number-of-threads results in a higher speed (if these threads are available).
A higher number-of-lines-per-thread results in a higher speed, but also in a higher
@@ -54,12 +55,14 @@
(dotimes (n number-of-complete-batches)
(format t "~%Started complete batch ~a/~a..." (+ 1 n) number-of-complete-batches)
(process-batch-multi-threaded function function-kwargs stream
number-of-threads number-of-lines-per-batch outputfile)
number-of-threads number-of-lines-per-batch outputfile
(format t "~%(Finished)"))
(when (> number-of-lines-in-last-batch 0)
(format t "~%Started extra batch...")
(process-batch-multi-threaded function function-kwargs stream
number-of-threads number-of-lines-in-last-batch outputfile)
number-of-threads number-of-lines-in-last-batch outputfile
(format t "~%(Finished)"))))
(let ((finish-time (get-universal-time)))
(multiple-value-bind (h m s)
@@ -68,7 +71,7 @@
(format t "~%~%***************** Finished Corpus Processing *****************~%~%"))

(defun process-batch-multi-threaded (function function-kwargs stream number-of-threads number-of-lines outputfile)
(defun process-batch-multi-threaded (function function-kwargs stream number-of-threads number-of-lines outputfile write-empty-lines-p)
"takes a batch of lines, divides them over threads, applies function on each line and writes
the output to outputfile"
(let ((list-of-thread-batches nil))
@@ -79,9 +82,9 @@
(format t "~% Launching ~a threads with ~a lines and 1 thread with ~a lines. "
(- number-of-threads 1) lines-per-thread (+ lines-per-thread lines-last-thread))
(dotimes (n (- number-of-threads 1)) ;; For each thread, except last
(push (cons (+ 1 n) (stream-to-list stream lines-per-thread)) list-of-thread-batches))
(push (cons (+ 1 n) (stream->list stream :number-of-lines lines-per-thread)) list-of-thread-batches))
;; For last thread
(push (cons number-of-threads (stream-to-list stream (+ lines-per-thread lines-last-thread)))
(push (cons number-of-threads (stream->list stream :number-of-lines (+ lines-per-thread lines-last-thread)))

;; process each thread-batch
@@ -98,7 +101,7 @@
(push (mp:mailbox-read mailbox) mailbox-messages))
;; Write batches to outputfile
(dolist (list-of-lines (sort mailbox-messages #'< :key #'car))
(write-to-file outputfile (cdr list-of-lines))))))
(write-to-file outputfile (cdr list-of-lines) write-empty-lines-p)))))

(defun all-threads-dead (thread-list)
"Returns t if all processes in thread-list are of status dead."
@@ -109,26 +112,12 @@
(setf all-dead nil)))

(defun stream-to-list (stream number-of-lines)
"Returns a list with the next number-of-lines lines of stream."
(loop for n from 1 upto number-of-lines
for line = (read-line stream nil nil)
collect line))

(defun write-to-file (output-file list-of-lines)
(defun write-to-file (output-file list-of-lines write-empty-lines-p)
"writes list-of-lines to output-file (appending)"
(with-open-file (stream output-file :direction :output :if-exists :append)
(mapcar #'(lambda (line) (format stream "~a~%" line))

(defun number-of-lines (file)
"Returns the number of lines in a file"
(let ((number-of-lines
(parse-integer (first (split-sequence:split-sequence
(first (exec-and-return "wc" "-l" (format nil "~a" file)))
:remove-empty-subseqs t)))))
(if write-empty-lines-p
(mapcar #'(lambda (line) (format stream "~a~%" line)) list-of-lines)
(mapcar #'(lambda (line) (when line (format stream "~a~%" line))) list-of-lines))))

(defun process-list-of-lines (function function-kwargs list-of-lines mailbox)
"applies function to list-of-lines and returns the processed list-of-lines"
