Skip to content

Commit

Permalink
Faster filesystem diffs when there are large numbers of source files
Browse files Browse the repository at this point in the history
  • Loading branch information
micha committed Dec 21, 2014
1 parent e86da38 commit ed5d71a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 94 deletions.
17 changes: 8 additions & 9 deletions boot/core/src/boot/core.clj
Expand Up @@ -98,7 +98,7 @@
:asset-paths (user-asset-dirs)}]
(when-let [s (seq (get-env k))]
(binding [file/*hard-link* false]
(apply file/sync :theirs (first d) s)))))
(apply file/sync! :time (first d) s)))))

(defn- set-fake-class-path!
"Sets the fake.class.path system property to reflect all JAR files on the
Expand Down Expand Up @@ -348,7 +348,7 @@
timestamps to decide which version of files to emit to dest. Uses hardlinks
instead of copying file contents. File modification times are preserved."
[dest & srcs]
(apply file/sync :time dest srcs))
(apply file/sync! :time dest srcs))

;; Boot Environment ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

Expand All @@ -364,17 +364,16 @@
(if (empty? dirs)
(constantly true)
(do (pod/require-in @pod/worker-pod "boot.watcher")
(let [q (LinkedBlockingQueue.)
watchers (map file/make-watcher dirs)
paths (into-array String dirs)
k (.invoke @pod/worker-pod "boot.watcher/make-watcher" q paths)]
(let [q (LinkedBlockingQueue.)
watcher (apply file/watcher! :time dirs)
paths (into-array String dirs)
k (.invoke @pod/worker-pod "boot.watcher/make-watcher" q paths)]
(daemon
(loop [ret (util/guard [(.take q)])]
(when ret
(if-let [more (.poll q (or debounce 10) TimeUnit/MILLISECONDS)]
(recur (conj ret more))
(let [changed (->> (map #(%) watchers)
(reduce (partial merge-with set/union)) :time set)]
(let [changed (watcher)]
(when-not (empty? changed) (callback changed))
(recur (util/guard [(.take q)])))))))
#(.invoke @pod/worker-pod "boot.watcher/stop-watcher" k)))))
Expand Down Expand Up @@ -505,7 +504,7 @@
[task-stack]
(let [sync! #(let [tgt (get-env :target-path)]
(binding [file/*hard-link* false]
(apply file/sync :time tgt (output-dirs %)))
(apply file/sync! :time tgt (output-dirs %)))
(file/delete-empty-subdirs! tgt)
(sync-user-dirs!))]
(binding [*warnings* (atom 0)]
Expand Down
20 changes: 9 additions & 11 deletions boot/core/src/boot/task/built_in.clj
Expand Up @@ -135,12 +135,12 @@
(pod/require-in @pod/worker-pod "boot.watcher")
(fn [next-task]
(fn [fileset]
(let [q (LinkedBlockingQueue.)
return (atom fileset)
srcdirs (map (memfn getPath) (core/user-dirs fileset))
watchers (map file/make-watcher srcdirs)
paths (into-array String srcdirs)
k (.invoke @pod/worker-pod "boot.watcher/make-watcher" q paths)]
(let [q (LinkedBlockingQueue.)
return (atom fileset)
srcdirs (map (memfn getPath) (core/user-dirs fileset))
watcher (apply file/watcher! :time srcdirs)
paths (into-array String srcdirs)
k (.invoke @pod/worker-pod "boot.watcher/make-watcher" q paths)]
(core/cleanup (.invoke @pod/worker-pod "boot.watcher/stop-watcher" k))
(when-not quiet (util/info "Starting file watcher (CTRL-C to quit)...\n\n"))
(loop [ret (util/guard [(.take q)])]
Expand All @@ -149,13 +149,11 @@
(recur (conj ret more))
(let [start (System/currentTimeMillis)
etime #(- (System/currentTimeMillis) start)
changed (->> (map #(%) watchers)
(reduce (partial merge-with set/union))
:time (filter (memfn exists)) set)]
changed (watcher)]
(when-not (empty? changed)
(when verbose
(doseq [p (->> changed (map #(.getPath %)))]
(util/info (format "→ %s %s\n" (.lastModified (io/file p)) p)))
(doseq [[op p f] changed]
(util/info (format "→ %s %s %s\n" op (.lastModified f) p)))
(util/info "\n"))
(binding [*out* (if quiet (new java.io.StringWriter) *out*)
*err* (if quiet (new java.io.StringWriter) *err*)]
Expand Down
2 changes: 1 addition & 1 deletion boot/core/src/boot/tmpregistry.clj
Expand Up @@ -102,7 +102,7 @@
(assert (or (not (nil? sortd)) (empty? dests))
"syncs appear to have a cyclic dependency")
(doseq [dest sortd]
(apply file/sync :hash (io/file dest) (map io/file (core/get syncs dest))))))
(apply file/sync! :hash (io/file dest) (map io/file (core/get syncs dest))))))
(-tmpfile? [this f]
(when (file/parent? dir f) f)))

Expand Down
124 changes: 52 additions & 72 deletions boot/pod/src/boot/file.clj
Expand Up @@ -3,7 +3,8 @@
[clojure.java.io :as io]
[clojure.set :as set]
[clojure.data :as data]
[boot.from.digest :as digest])
[boot.from.digest :as digest]
[clojure.core.reducers :as r])
(:import
[java.nio.file Files]
[java.lang.management ManagementFactory])
Expand Down Expand Up @@ -108,53 +109,56 @@
outs (map #(srcdir->outdir % src dest) files)]
(mapv copy-with-lastmod (map io/file files) (map io/file outs)))))

(defn select-keys-by [m pred?]
(select-keys m (filter pred? (keys m))))

(defn dir-set
([dir]
(let [info (juxt #(relative-to dir %) #(.lastModified %))
mapf #(zipmap [:dir :abs :rel :mod] (list* dir % (info %)))
ign? #(and *ignore* (*ignore* %))]
(set (mapv mapf (filter (memfn isFile) (remove ign? (file-seq dir)))))))
([dir1 dir2 & dirs]
(reduce set/union (map dir-set (list* dir1 dir2 dirs)))))

(defn dir-map
[& dirs]
(->>
(apply dir-set (mapv io/file dirs))
(mapv #(vector (.getPath (:rel %)) %))
(into {})))

(defn dir-map-ext
[exts & dirs]
(let [ext #(let [f (name (io/file %))] (subs f (.lastIndexOf f ".")))
ext? #(contains? exts (ext %))]
(select-keys-by (apply dir-map dirs) ext?)))

(defn what-changed
([dst-map src-map] (what-changed dst-map src-map :time))
([dst-map src-map algo]
(let [[created deleted modified]
(data/diff (set (keys src-map)) (set (keys dst-map)))
algos {:hash #(not= (digest/md5 (:abs (src-map %)))
(digest/md5 (:abs (dst-map %))))
:time #(< (:mod (dst-map %)) (:mod (src-map %)))
:theirs #(not= (:mod (dst-map %)) (:mod (src-map %)))}
modified (set (filter (algos algo) modified))]
[(set/union created modified) deleted])))

(defn diff
[algo dst src & srcs]
(let [d (dir-map (io/file dst))
s (->> (cons src srcs)
(map io/file)
(apply dir-map))
[to-cp to-rm] (what-changed d s algo)
cp (map #(vector :cp (:abs (s %)) (io/file dst %)) to-cp)
rm (map #(vector :rm (io/file dst %)) to-rm)]
(concat cp rm)))
(defn files-for [& dirs]
(->> (for [dir dirs]
(let [path (-> (if (string? dir) dir (.getPath dir)) (.replaceAll "/$" ""))
snip (count (str path "/"))]
(->> (file-seq (io/file path))
(filter (memfn isFile))
(reduce #(let [p (subs (.getPath %2) snip)]
(-> (assoc-in %1 [:file p] %2)
(assoc-in [:time p] (.lastModified %2))))
{}))))
(reduce (partial merge-with into) {})))

(def time-diff-memo
(memoize
(fn [bef aft]
((fn [[b a]] [(set/difference b a) a])
(->> (data/diff bef aft) (take 2) (map (comp set keys)))))))

(defn time-diff [before after]
(time-diff-memo (:time before) (:time after)))

(defmulti patch-cp? (fn [pred a b] pred))
(defmethod patch-cp? :default [_ a b] true)
(defmethod patch-cp? :theirs [_ a b] true)
(defmethod patch-cp? :hash [_ a b] (not= (digest/md5 a) (digest/md5 b)))

(defn patch [pred before after]
(let [[rm cp] (time-diff before after)]
(concat
(for [x rm] [:rm x (get-in before [:file x])])
(for [x cp :let [b (get-in before [:file x])
a (get-in after [:file x])]
:when (patch-cp? pred a b)]
[:cp x a]))))

(defn sync! [pred dest & srcs]
(let [before (files-for dest)
after (apply files-for srcs)]
(doseq [[op p x] (patch pred before after)]
(case op
:rm (io/delete-file x true)
:cp (copy-with-lastmod x (io/file dest p))))))

(defn watcher! [pred & dirs]
(let [state (atom (apply files-for dirs))]
(fn []
(let [state' (apply files-for dirs)
patch' (patch pred @state state')]
(reset! state state')
patch'))))

(defn match-filter?
[filters f]
Expand All @@ -165,27 +169,3 @@
(and
(or (empty? include) (match-filter? include f))
(or (empty? exclude) (not (match-filter? exclude f)))))

(defn sync*
[ops]
(let [opfn {:rm #(when *sync-delete* (.delete (nth % 1)))
:cp #(when (keep-filters? *include* *exclude* (nth % 2))
(copy-with-lastmod (nth % 1) (nth % 2)))}]
(doseq [[op s d :as cmd] ops] ((opfn op) cmd))))

(defn sync
[algo dst src & srcs]
(sync* (apply diff algo dst src srcs)))

(defn make-watcher [dir]
(let [prev (atom nil)]
(fn []
(let [only-file #(filter file? %)
make-info #(guard (vector [% (.lastModified %)] [% (digest/md5 %)]))
file-info #(remove nil? (mapcat make-info %))
info (->> dir io/file file-seq only-file file-info set)
mods (set/difference (set/union info @prev) (set/intersection info @prev))
by #(->> %2 (filter (comp %1 second)) (map first) set)]
(reset! prev info)
{:hash (by string? mods) :time (by number? mods)}))))

1 change: 0 additions & 1 deletion boot/worker/src/boot/watcher.clj
Expand Up @@ -86,7 +86,6 @@
(when watch-key
(if-not (.isValid watch-key)
(do (util/dbug "invalid watch key %s\n" (.watchable watch-key))
(Thread/sleep 500)
(recur))
(do (doseq [event (.pollEvents watch-key)]
(let [dir (.toFile (.watchable watch-key))
Expand Down

0 comments on commit ed5d71a

Please sign in to comment.