diff --git a/README.md b/README.md index 68d6260..f0c1e2b 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,10 @@ the cosine similarity distance of each individual document to the first n princi components. This first version requires that the number of clusters and the reduced rank be -supplied by the user. There are two modes to the algorithm - local and distributed. -In local mode, matrix algebra is performed in memory therefore only -small document sets will work. In distributed mode, decomposition is done using Apache -Mahout. This mode is more appropriate for large document sets. +supplied by the user. Decomposition is performed using the DistributedLanczosSolver +from Apache Mahout on a Hadoop cluster. After decomposition of the term-document +matrix, the reduced rank document vectors are clusters using k-means clustering also +from Apache Mahout. Development goals include determining the optimal number of clusters, optimizing the reduced rank, etc. @@ -43,7 +43,6 @@ classloader include: arpack-combo-0.1.jar clojure-1.2.0.jar clojure-contrib-1.2.0-master-20100122.191106-1.jar - incanter-full-1.0.0.jar apache-solr-clustering-3.1-dev.jar parallelcolt-0.7.2.jar lsa4solr.jar @@ -107,6 +106,16 @@ to get decent results: +Hadoop Setup +----------------- + +In order to use lsa4solr with Hadoop, make sure that the mahout-math-0.4.jar is +in the Hadoop lib directory. This is a dependency of the mahout-core-0.4.jar which +contains the distributed job. Put the core-site.xml and mapred-site.xml files from +the resources directory into Solr's webapp/WEB-INF/classes directory and configure +them to point to your Hadoop setup. + + Using ----- @@ -124,18 +133,6 @@ where The cluster information will be at the bottom of the response. -Using with Hadoop ------------------ - -In order to use lsa4solr with Hadoop, make sure that the mahout-math-0.4.jar is -in the Hadoop lib directory. This is a dependency of the mahout-core-0.4.jar which -contains the distributed job. Put the core-site.xml and mapred-site.xml files from -the resources directory into Solr's webapp/WEB-INF/classes directory and configure -them to point to your Hadoop setup. Finally, access lsa4solr-distributed by appending -"mode=distributed" to the URL: - - http://localhost:8983/solr/lsa4solr?nclusters=2&q=Summary:.*&rows=100&k=10&mode=distributed - Testing ------- diff --git a/project.clj b/project.clj index 5099b23..4012e26 100644 --- a/project.clj +++ b/project.clj @@ -1,11 +1,9 @@ (defproject lsa4solr "1.0.0-SNAPSHOT" :description "Clustering component for Solr based on Latent Semantic Analysis" :namespaces :all - :repositories {"incanter" "http://repo.incanter.org" - "apache" "https://repository.apache.org/"} + :repositories {"apache" "https://repository.apache.org/"} :dependencies [[org.clojure/clojure "1.2.0-master-SNAPSHOT"] [org.clojure/clojure-contrib "1.2.0-master-SNAPSHOT"] - [incanter/incanter "1.2.1-SNAPSHOT"] [org.apache.mahout/mahout-core "0.4-SNAPSHOT" :exclusions [org.apache.lucene/lucene-core org.apache.lucene/lucene-analyzers]] diff --git a/src/lsa4solr/cluster.clj b/src/lsa4solr/cluster.clj index 6f865dd..af0e0f2 100644 --- a/src/lsa4solr/cluster.clj +++ b/src/lsa4solr/cluster.clj @@ -1,10 +1,6 @@ (ns lsa4solr.cluster (:use [clojure.contrib.seq-utils :only [indexed]] - [lsa4solr core clustering-protocol] - [incanter.core] - [incanter.stats]) - (:import (cern.colt.matrix.tdouble.algo.decomposition DoubleSingularValueDecomposition) - (incanter Matrix))) + [lsa4solr core clustering-protocol])) (gen-class :name lsa4solr.cluster/LSAClusteringEngine @@ -31,7 +27,7 @@ {(keyword text) { :df df - :idf (log2 (/ numdocs df)) + :idf (java.lang.Math/log (/ numdocs df)) :idx (counter) } }) @@ -63,20 +59,15 @@ k num-clusters] (let [doc-seq (iterator-seq (.iterator doc-list)) - m (get-frequency-matrix clustering-protocol reader field terms doc-seq) - svd-factorization (svd clustering-protocol k m) - clusters (cluster-docs clustering-protocol reader doc-seq svd-factorization k num-clusters id-field)] - {:clusters clusters - :svd svd-factorization})) + clusters (cluster-docs clustering-protocol reader terms doc-seq k num-clusters field id-field)] + {:clusters clusters})) (defn -cluster [this query doc-list solr-request] - (let [algorithm (.get (.getParams solr-request) "mode") - engine (cond (= "distributed" algorithm) (DistributedLSAClusteringEngine) - :else (LocalLSAClusteringEngine)) + (let [engine (DistributedLSAClusteringEngine) result (cluster-dispatch engine (:reader @(.state this)) (:narrative-field @(.state this)) diff --git a/src/lsa4solr/clustering_protocol.clj b/src/lsa4solr/clustering_protocol.clj index f78615e..3ecc888 100644 --- a/src/lsa4solr/clustering_protocol.clj +++ b/src/lsa4solr/clustering_protocol.clj @@ -1,17 +1,17 @@ (ns lsa4solr.clustering-protocol (:use [clojure.contrib.seq-utils :only [indexed]] - [lsa4solr core hadoop-utils lucene-utils] - [incanter.core] - [incanter.stats]) - (:import (cern.colt.matrix.tdouble.algo.decomposition DoubleSingularValueDecomposition) - (incanter Matrix))) + [lsa4solr core hadoop-utils lucene-utils mahout-matrix]) + (:import (org.apache.hadoop.conf Configuration) + (org.apache.hadoop.fs FileSystem Path) + (org.apache.hadoop.io Text SequenceFile$Reader) + (org.apache.hadoop.fs.permission FsPermission) + (org.apache.mahout.clustering.kmeans RandomSeedGenerator KMeansDriver))) (defprotocol LSAClusteringEngineProtocol (get-mapper [self terms vec-ref ndocs]) (init-frequency-vector [self n]) (get-frequency-matrix [self reader field terms hits]) - (svd [self k m]) - (cluster-docs [self reader doc-seq svd-factorization k num-clusters id-field])) + (cluster-docs [self reader terms doc-seq k num-clusters narrative-field id-field])) (defn get-mapper-common [terms vec-ref ndocs update-ref] (proxy [org.apache.lucene.index.TermVectorMapper] @@ -24,151 +24,64 @@ nil))) -(deftype LocalLSAClusteringEngine - [] - LSAClusteringEngineProtocol - (get-mapper [self terms vec-ref ndocs] - (get-mapper-common terms vec-ref ndocs - (fn [vec-ref idx weight] - (alter vec-ref assoc idx weight)))) - - (init-frequency-vector [self n] - (ref (vec (repeat n 0)))) - - (get-frequency-matrix [self reader field terms hits] - (trans (matrix (extract-frequency-vectors - reader - (fn [n] (init-frequency-vector self n)) - (fn [terms vec-ref ndocs] - (get-mapper self - terms - vec-ref - ndocs)) - field - terms - hits)))) - (svd [self k m] - (let [svd-result (DoubleSingularValueDecomposition. m)] - {:U (Matrix. (.getU svd-result)) - :S (Matrix. (.getS svd-result)) - :V (Matrix. (.getV svd-result))})) - - (cluster-docs [self reader doc-seq svd-factorization k num-clusters id-field] - (let [U (:U svd-factorization) - S (:S svd-factorization) - V (:V svd-factorization) - VS (mmult (sel V :cols (range 0 k)) - (sel (sel S :cols (range 0 k)) :rows (range 0 k))) - pca (principal-components VS) - pcs (sel (:rotation pca) :cols (range 0 num-clusters)) - sims (map (fn [docvec] - (sort-by #(second %) - (map (fn [pc] - [(first pc) (cosine-similarity docvec (second pc))]) - (indexed (trans pcs))))) - VS) - labels (clojure.contrib.seq-utils/indexed (map #(first (last %)) sims)) - clusters (reduce #(merge %1 %2) - {} - (map (fn [x] {(keyword (str x)) - (map #(get-docid reader - id-field - (nth doc-seq %)) - (map first - (filter #(= (second %) x) - labels)))}) - (range 0 num-clusters)))] - clusters)) - ) - (deftype DistributedLSAClusteringEngine [] LSAClusteringEngineProtocol (get-mapper [self terms vec-ref ndocs] (get-mapper-common terms vec-ref ndocs (fn [vec-ref idx weight] - (.setQuick @vec-ref idx weight)))) + (set-value @vec-ref idx weight)))) (init-frequency-vector [self n] - (ref (new org.apache.mahout.math.RandomAccessSparseVector n))) + (ref (create-vector n))) + + (get-frequency-matrix + [self reader field terms hits] + (matrix (extract-frequency-vectors + reader + (fn [n] (init-frequency-vector self n)) + (fn [terms vec-ref ndocs] + (get-mapper self + terms + vec-ref + ndocs)) + field + terms + hits))) - (get-frequency-matrix [self reader field terms hits] - (let [rows (to-array-of org.apache.mahout.math.Vector - (extract-frequency-vectors - reader - (fn [n] (init-frequency-vector self n)) - (fn [terms vec-ref ndocs] - (get-mapper self - terms - vec-ref - ndocs)) - field - terms - hits))] - (.transpose (new org.apache.mahout.math.SparseRowMatrix - (int-array [(count rows) (count terms)]) - rows)))) - - (svd [self k m] - (let [hadoop-conf (new org.apache.hadoop.conf.Configuration) - fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf) - base-path (org.apache.hadoop.fs.Path. (str "/doc-clustering/" (java.lang.System/nanoTime))) - mkdirs-result (org.apache.hadoop.fs.FileSystem/mkdirs fs - base-path - (org.apache.hadoop.fs.permission.FsPermission/getDefault)) - m-path (str (.toString base-path) "/mtosvd") - writer (write-matrix hadoop-conf m m-path) - dm (doto (new org.apache.mahout.math.hadoop.DistributedRowMatrix - m-path - (str (.toString base-path) "/svdout") - (.numRows m) - (.numCols m)) - (.configure (new org.apache.hadoop.mapred.JobConf hadoop-conf))) - eigenvalues (new java.util.ArrayList) - eigenvectors (new org.apache.mahout.math.DenseMatrix (+ k 2) (.numCols m)) - decomposer (doto (new org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver) - (.solve dm (+ k 2) eigenvectors eigenvalues false))] - {:eigenvectors eigenvectors - :eigenvalues eigenvalues - :U nil - :S (diag (map #(sqrt %) (reverse (take-last k eigenvalues)))) - :V (trans - (matrix (to-array (map (fn [vec] (map #(.get %1) - (iterator-seq (.iterateAll (.vector vec))))) - (take k eigenvectors)))))})) - (cluster-docs [self reader + terms doc-seq - svd-factorization k num-clusters + narrative-field id-field] - (let [hadoop-conf (new org.apache.hadoop.conf.Configuration) - fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf) - base-path (org.apache.hadoop.fs.Path. (str "/kmeans-clustering/" (java.lang.System/nanoTime))) - mkdirs-result (org.apache.hadoop.fs.FileSystem/mkdirs fs - base-path - (org.apache.hadoop.fs.permission.FsPermission/getDefault)) + (let [fm (transpose (get-frequency-matrix self + reader + narrative-field + terms + doc-seq)) + svd-factorization (decompose-svd fm k) + hadoop-conf (Configuration.) + fs (FileSystem/get hadoop-conf) + base-path (Path. (str "/lsa4solr/kmeans-clustering/" (java.lang.System/nanoTime))) + mkdirs-result (FileSystem/mkdirs fs + base-path + (FsPermission/getDefault)) U (:U svd-factorization) S (:S svd-factorization) V (:V svd-factorization) - m (trans - (mmult (sel S :cols (range 0 k) :rows (range 0 k)) - (trans (mmult (sel V :cols (range 0 k)))))) - srm (doto (org.apache.mahout.math.SparseRowMatrix. (int-array (dim m))) - ((fn [sparse-row-matrix] - (doall - (for [i (range 0 (count m)) - j (range 0 (count (sel m :rows 0)))] - (.setQuick sparse-row-matrix i j (sel m :rows i :cols j))))))) +;; reduced-fm (mmult U (mmult S (transpose V))) + reduced-fm (mmult V S) reduced-m-path (str (.toString base-path) "/reducedm") - writer (lsa4solr.hadoop-utils/write-matrix hadoop-conf srm reduced-m-path) - initial-centroids (org.apache.mahout.clustering.kmeans.RandomSeedGenerator/buildRandom reduced-m-path - (str (.toString base-path) "/centroids") - num-clusters) +;; writer (write-matrix hadoop-conf (transpose reduced-fm) reduced-m-path) + writer (write-matrix hadoop-conf reduced-fm reduced-m-path) + initial-centroids (RandomSeedGenerator/buildRandom reduced-m-path + (str (.toString base-path) "/centroids") + num-clusters) cluster-output-path (str (.toString base-path) "/clusterout") - job (org.apache.mahout.clustering.kmeans.KMeansDriver/runJob + job (KMeansDriver/runJob reduced-m-path (.toString initial-centroids) cluster-output-path @@ -176,19 +89,23 @@ 0.00000001 k num-clusters) - tkey (org.apache.hadoop.io.Text.) - tval (org.apache.hadoop.io.Text.) + tkey (Text.) + tval (Text.) groups (clojure.contrib.seq-utils/flatten - (map (fn [path-string] (let [path (org.apache.hadoop.fs.Path. path-string) - seq-reader (org.apache.hadoop.io.SequenceFile$Reader. fs path hadoop-conf) + (map (fn [path-string] (let [path (Path. path-string) + seq-reader (SequenceFile$Reader. fs path hadoop-conf) valseq (take-while (fn [v] (.next seq-reader tkey tval)) (repeat [tkey tval]))] (map #(.toString (second %)) valseq))) (map #(str cluster-output-path "/points/part-0000" %) (range 0 8)))) clusters (apply merge-with #(into %1 %2) (map #(hash-map (keyword (second %)) - (list (lsa4solr.lucene-utils/get-docid reader "id" (nth doc-seq (first %1))))) + (list (get-docid reader "id" (nth doc-seq (first %1))))) (indexed groups)))] - clusters)) - + {:groups groups + :clusters clusters + :U U + :S S + :V V + :reduced-fm reduced-fm})) ) diff --git a/src/lsa4solr/hadoop_utils.clj b/src/lsa4solr/hadoop_utils.clj index b815296..2bf18b0 100644 --- a/src/lsa4solr/hadoop_utils.clj +++ b/src/lsa4solr/hadoop_utils.clj @@ -1,20 +1,23 @@ -(ns lsa4solr.hadoop-utils) +(ns lsa4solr.hadoop-utils + (:import (org.apache.mahout.math VectorWritable) + (org.apache.hadoop.io IntWritable) + (org.apache.hadoop.fs FileSystem Path) + (org.apache.hadoop.io SequenceFile$Writer))) (defn write-vectors [writer m] - (doall (map #(.append writer %1 (new org.apache.mahout.math.VectorWritable (.vector %2))) - (map #(new org.apache.hadoop.io.IntWritable %) + (doall (map #(.append writer %1 (VectorWritable. (.vector %2))) + (map #(IntWritable. %) (range 0 (.numRows m))) (iterator-seq (.iterator m))))) (defn write-matrix [hadoop-conf m path-string] - (let [fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf) - path (new org.apache.hadoop.fs.Path path-string)] - (doto (new org.apache.hadoop.io.SequenceFile$Writer - fs - hadoop-conf - path - org.apache.hadoop.io.IntWritable - org.apache.mahout.math.VectorWritable) + (let [fs (FileSystem/get hadoop-conf) + path (Path. path-string)] + (doto (SequenceFile$Writer. fs + hadoop-conf + path + IntWritable + VectorWritable) (write-vectors m) (.close)))) \ No newline at end of file diff --git a/src/lsa4solr/lucene_utils.clj b/src/lsa4solr/lucene_utils.clj index cc894ec..875f9a9 100644 --- a/src/lsa4solr/lucene_utils.clj +++ b/src/lsa4solr/lucene_utils.clj @@ -5,8 +5,9 @@ (defn extract-frequency-vectors [reader init-frequency-vector get-mapper field terms hits] - (pmap #(let [m (init-frequency-vector (count terms)) - mapper (get-mapper terms m (count hits))] - (do (. reader getTermFreqVector (int %1) field mapper) - @m)) - hits)) \ No newline at end of file + (map #(let [m (init-frequency-vector (count terms)) + mapper (get-mapper terms m (count hits))] + (do (. reader getTermFreqVector (int %1) field mapper) + @m)) + hits)) + diff --git a/src/lsa4solr/mahout_matrix.clj b/src/lsa4solr/mahout_matrix.clj new file mode 100644 index 0000000..c5ea826 --- /dev/null +++ b/src/lsa4solr/mahout_matrix.clj @@ -0,0 +1,113 @@ +(ns lsa4solr.mahout-matrix + (:import (org.apache.mahout.math SparseMatrix RandomAccessSparseVector VectorWritable Matrix DenseMatrix) + (org.apache.mahout.math.hadoop DistributedRowMatrix) + (org.apache.mahout.math.hadoop.decomposer DistributedLanczosSolver) + (org.apache.mahout.math.function UnaryFunction) + (org.apache.hadoop.fs Path FileSystem) + (org.apache.hadoop.fs.permission FsPermission) + (org.apache.hadoop.conf Configuration) + (org.apache.hadoop.mapred JobConf) + (org.apache.hadoop.io IntWritable SequenceFile$Writer))) + +(defn create-vector + [data] + (cond + (coll? data) (doto (RandomAccessSparseVector. (count data)) + ((fn [vec] (map #(.setQuick vec %1 %2) + (range 0 (count data)) + data)))) + (integer? data) (doto (RandomAccessSparseVector. data)))) + + + +(defn set-value + ([#^RandomAccessSparseVector vector index value] (.setQuick vector index value))) + +(defn matrix + [vec-iterator] + (let [hadoop-conf (Configuration.) + fs (FileSystem/get hadoop-conf) + base-path (Path. (str "/lsa4solr/matrix/" (java.lang.System/nanoTime))) + mkdirs-result (FileSystem/mkdirs fs + base-path + (FsPermission/getDefault)) + m-path (str (.toString base-path) "/m") + tmp-path (str (.toString base-path) "/tmp") + nrows (count vec-iterator) + ncols (.size (first vec-iterator)) + writer (doto (SequenceFile$Writer. fs + hadoop-conf + (Path. m-path) + IntWritable + VectorWritable) + ((fn [wrt] + (doall + (map #(.append wrt + (IntWritable. %1) + (VectorWritable. %2)) + (range 0 nrows) + vec-iterator)))) + (.close))] + (doto + (DistributedRowMatrix. m-path + tmp-path + nrows + ncols) + (.configure (JobConf. hadoop-conf))))) + +(defmulti mmult (fn [A & B] (type A))) + +(defmethod mmult DistributedRowMatrix [A B] + (let [num-rows (.numRows A) + num-cols (second (int-array (.size B)))] + (doto (DenseMatrix. num-rows num-cols) + ((fn [m] (doall (pmap #(.assignColumn m % (.times A (.getColumn B %))) + (range 0 num-cols)))))))) +(defmethod mmult :default [A B] + (.times A B)) + +(defn diag + [vals] + (doto (SparseMatrix. (int-array [(count vals) (count vals)])) + ((fn [m] (doall (map #(.setQuick m %1 %2 %3) + (range 0 (count vals)) + (range 0 (count vals)) + vals)))))) + +(defn invert-diagonal + [mat] + (.assign mat + (proxy [UnaryFunction] + [] + (apply [arg1] (if (= arg1 0) 0 (/ 1 arg1)))))) + + +(defn transpose + [mat] + (.transpose mat)) + +(defn normalize-matrix-columns + [mat] + (let [num-rows (.numRows mat) + num-cols (.numCols mat)] + (doto (DenseMatrix. num-rows num-cols) + ((fn [m] (doall (pmap #(.assignColumn m % (.normalize (.getColumn mat %))) + (range 0 num-cols)))))))) + +(defn decompose-svd + [mat k] + (let [eigenvalues (new java.util.ArrayList) + eigenvectors (DenseMatrix. (+ k 2) (.numCols mat)) + decomposer (doto (DistributedLanczosSolver.) + (.solve mat (+ k 2) eigenvectors eigenvalues false)) + V (normalize-matrix-columns (.viewPart (.transpose eigenvectors) + (int-array [0 0]) + (int-array [(.numCols mat) k]))) + U (mmult mat V) + S (diag (take k (reverse eigenvalues)))] + {:U U + :S S + :V V})) + + +