Skip to content

Commit

Permalink
Refactored thin layer around Mahout matrices. Pulled out local mode, …
Browse files Browse the repository at this point in the history
…only distributed mode against Hadoop cluster now.
  • Loading branch information
algoriffic committed Apr 14, 2010
1 parent 1c31cab commit 52a59c8
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 189 deletions.
31 changes: 14 additions & 17 deletions README.md
Expand Up @@ -11,10 +11,10 @@ the cosine similarity distance of each individual document to the first n princi
components.

This first version requires that the number of clusters and the reduced rank be
supplied by the user. There are two modes to the algorithm - local and distributed.
In local mode, matrix algebra is performed in memory therefore only
small document sets will work. In distributed mode, decomposition is done using Apache
Mahout. This mode is more appropriate for large document sets.
supplied by the user. Decomposition is performed using the DistributedLanczosSolver
from Apache Mahout on a Hadoop cluster. After decomposition of the term-document
matrix, the reduced rank document vectors are clusters using k-means clustering also
from Apache Mahout.

Development goals include determining the optimal number of clusters, optimizing
the reduced rank, etc.
Expand Down Expand Up @@ -43,7 +43,6 @@ classloader include:
arpack-combo-0.1.jar
clojure-1.2.0.jar
clojure-contrib-1.2.0-master-20100122.191106-1.jar
incanter-full-1.0.0.jar
apache-solr-clustering-3.1-dev.jar
parallelcolt-0.7.2.jar
lsa4solr.jar
Expand Down Expand Up @@ -107,6 +106,16 @@ to get decent results:
</fieldType>


Hadoop Setup
-----------------

In order to use lsa4solr with Hadoop, make sure that the mahout-math-0.4.jar is
in the Hadoop lib directory. This is a dependency of the mahout-core-0.4.jar which
contains the distributed job. Put the core-site.xml and mapred-site.xml files from
the resources directory into Solr's webapp/WEB-INF/classes directory and configure
them to point to your Hadoop setup.


Using
-----

Expand All @@ -124,18 +133,6 @@ where

The cluster information will be at the bottom of the response.

Using with Hadoop
-----------------

In order to use lsa4solr with Hadoop, make sure that the mahout-math-0.4.jar is
in the Hadoop lib directory. This is a dependency of the mahout-core-0.4.jar which
contains the distributed job. Put the core-site.xml and mapred-site.xml files from
the resources directory into Solr's webapp/WEB-INF/classes directory and configure
them to point to your Hadoop setup. Finally, access lsa4solr-distributed by appending
"mode=distributed" to the URL:

http://localhost:8983/solr/lsa4solr?nclusters=2&q=Summary:.*&rows=100&k=10&mode=distributed

Testing
-------

Expand Down
4 changes: 1 addition & 3 deletions project.clj
@@ -1,11 +1,9 @@
(defproject lsa4solr "1.0.0-SNAPSHOT"
:description "Clustering component for Solr based on Latent Semantic Analysis"
:namespaces :all
:repositories {"incanter" "http://repo.incanter.org"
"apache" "https://repository.apache.org/"}
:repositories {"apache" "https://repository.apache.org/"}
:dependencies [[org.clojure/clojure "1.2.0-master-SNAPSHOT"]
[org.clojure/clojure-contrib "1.2.0-master-SNAPSHOT"]
[incanter/incanter "1.2.1-SNAPSHOT"]
[org.apache.mahout/mahout-core "0.4-SNAPSHOT"
:exclusions [org.apache.lucene/lucene-core
org.apache.lucene/lucene-analyzers]]
Expand Down
19 changes: 5 additions & 14 deletions src/lsa4solr/cluster.clj
@@ -1,10 +1,6 @@
(ns lsa4solr.cluster
(:use [clojure.contrib.seq-utils :only [indexed]]
[lsa4solr core clustering-protocol]
[incanter.core]
[incanter.stats])
(:import (cern.colt.matrix.tdouble.algo.decomposition DoubleSingularValueDecomposition)
(incanter Matrix)))
[lsa4solr core clustering-protocol]))

(gen-class
:name lsa4solr.cluster/LSAClusteringEngine
Expand All @@ -31,7 +27,7 @@
{(keyword text)
{
:df df
:idf (log2 (/ numdocs df))
:idf (java.lang.Math/log (/ numdocs df))
:idx (counter)
}
})
Expand Down Expand Up @@ -63,20 +59,15 @@
k
num-clusters]
(let [doc-seq (iterator-seq (.iterator doc-list))
m (get-frequency-matrix clustering-protocol reader field terms doc-seq)
svd-factorization (svd clustering-protocol k m)
clusters (cluster-docs clustering-protocol reader doc-seq svd-factorization k num-clusters id-field)]
{:clusters clusters
:svd svd-factorization}))
clusters (cluster-docs clustering-protocol reader terms doc-seq k num-clusters field id-field)]
{:clusters clusters}))


(defn -cluster [this
query
doc-list
solr-request]
(let [algorithm (.get (.getParams solr-request) "mode")
engine (cond (= "distributed" algorithm) (DistributedLSAClusteringEngine)
:else (LocalLSAClusteringEngine))
(let [engine (DistributedLSAClusteringEngine)
result (cluster-dispatch engine
(:reader @(.state this))
(:narrative-field @(.state this))
Expand Down
195 changes: 56 additions & 139 deletions src/lsa4solr/clustering_protocol.clj
@@ -1,17 +1,17 @@
(ns lsa4solr.clustering-protocol
(:use [clojure.contrib.seq-utils :only [indexed]]
[lsa4solr core hadoop-utils lucene-utils]
[incanter.core]
[incanter.stats])
(:import (cern.colt.matrix.tdouble.algo.decomposition DoubleSingularValueDecomposition)
(incanter Matrix)))
[lsa4solr core hadoop-utils lucene-utils mahout-matrix])
(:import (org.apache.hadoop.conf Configuration)
(org.apache.hadoop.fs FileSystem Path)
(org.apache.hadoop.io Text SequenceFile$Reader)
(org.apache.hadoop.fs.permission FsPermission)
(org.apache.mahout.clustering.kmeans RandomSeedGenerator KMeansDriver)))

(defprotocol LSAClusteringEngineProtocol
(get-mapper [self terms vec-ref ndocs])
(init-frequency-vector [self n])
(get-frequency-matrix [self reader field terms hits])
(svd [self k m])
(cluster-docs [self reader doc-seq svd-factorization k num-clusters id-field]))
(cluster-docs [self reader terms doc-seq k num-clusters narrative-field id-field]))

(defn get-mapper-common [terms vec-ref ndocs update-ref]
(proxy [org.apache.lucene.index.TermVectorMapper]
Expand All @@ -24,171 +24,88 @@
nil)))


(deftype LocalLSAClusteringEngine
[]
LSAClusteringEngineProtocol
(get-mapper [self terms vec-ref ndocs]
(get-mapper-common terms vec-ref ndocs
(fn [vec-ref idx weight]
(alter vec-ref assoc idx weight))))

(init-frequency-vector [self n]
(ref (vec (repeat n 0))))

(get-frequency-matrix [self reader field terms hits]
(trans (matrix (extract-frequency-vectors
reader
(fn [n] (init-frequency-vector self n))
(fn [terms vec-ref ndocs]
(get-mapper self
terms
vec-ref
ndocs))
field
terms
hits))))
(svd [self k m]
(let [svd-result (DoubleSingularValueDecomposition. m)]
{:U (Matrix. (.getU svd-result))
:S (Matrix. (.getS svd-result))
:V (Matrix. (.getV svd-result))}))

(cluster-docs [self reader doc-seq svd-factorization k num-clusters id-field]
(let [U (:U svd-factorization)
S (:S svd-factorization)
V (:V svd-factorization)
VS (mmult (sel V :cols (range 0 k))
(sel (sel S :cols (range 0 k)) :rows (range 0 k)))
pca (principal-components VS)
pcs (sel (:rotation pca) :cols (range 0 num-clusters))
sims (map (fn [docvec]
(sort-by #(second %)
(map (fn [pc]
[(first pc) (cosine-similarity docvec (second pc))])
(indexed (trans pcs)))))
VS)
labels (clojure.contrib.seq-utils/indexed (map #(first (last %)) sims))
clusters (reduce #(merge %1 %2)
{}
(map (fn [x] {(keyword (str x))
(map #(get-docid reader
id-field
(nth doc-seq %))
(map first
(filter #(= (second %) x)
labels)))})
(range 0 num-clusters)))]
clusters))
)

(deftype DistributedLSAClusteringEngine
[]
LSAClusteringEngineProtocol
(get-mapper [self terms vec-ref ndocs]
(get-mapper-common terms vec-ref ndocs
(fn [vec-ref idx weight]
(.setQuick @vec-ref idx weight))))
(set-value @vec-ref idx weight))))

(init-frequency-vector [self n]
(ref (new org.apache.mahout.math.RandomAccessSparseVector n)))
(ref (create-vector n)))

(get-frequency-matrix
[self reader field terms hits]
(matrix (extract-frequency-vectors
reader
(fn [n] (init-frequency-vector self n))
(fn [terms vec-ref ndocs]
(get-mapper self
terms
vec-ref
ndocs))
field
terms
hits)))

(get-frequency-matrix [self reader field terms hits]
(let [rows (to-array-of org.apache.mahout.math.Vector
(extract-frequency-vectors
reader
(fn [n] (init-frequency-vector self n))
(fn [terms vec-ref ndocs]
(get-mapper self
terms
vec-ref
ndocs))
field
terms
hits))]
(.transpose (new org.apache.mahout.math.SparseRowMatrix
(int-array [(count rows) (count terms)])
rows))))

(svd [self k m]
(let [hadoop-conf (new org.apache.hadoop.conf.Configuration)
fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf)
base-path (org.apache.hadoop.fs.Path. (str "/doc-clustering/" (java.lang.System/nanoTime)))
mkdirs-result (org.apache.hadoop.fs.FileSystem/mkdirs fs
base-path
(org.apache.hadoop.fs.permission.FsPermission/getDefault))
m-path (str (.toString base-path) "/mtosvd")
writer (write-matrix hadoop-conf m m-path)
dm (doto (new org.apache.mahout.math.hadoop.DistributedRowMatrix
m-path
(str (.toString base-path) "/svdout")
(.numRows m)
(.numCols m))
(.configure (new org.apache.hadoop.mapred.JobConf hadoop-conf)))
eigenvalues (new java.util.ArrayList)
eigenvectors (new org.apache.mahout.math.DenseMatrix (+ k 2) (.numCols m))
decomposer (doto (new org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver)
(.solve dm (+ k 2) eigenvectors eigenvalues false))]
{:eigenvectors eigenvectors
:eigenvalues eigenvalues
:U nil
:S (diag (map #(sqrt %) (reverse (take-last k eigenvalues))))
:V (trans
(matrix (to-array (map (fn [vec] (map #(.get %1)
(iterator-seq (.iterateAll (.vector vec)))))
(take k eigenvectors)))))}))

(cluster-docs [self
reader
terms
doc-seq
svd-factorization
k
num-clusters
narrative-field
id-field]
(let [hadoop-conf (new org.apache.hadoop.conf.Configuration)
fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf)
base-path (org.apache.hadoop.fs.Path. (str "/kmeans-clustering/" (java.lang.System/nanoTime)))
mkdirs-result (org.apache.hadoop.fs.FileSystem/mkdirs fs
base-path
(org.apache.hadoop.fs.permission.FsPermission/getDefault))
(let [fm (transpose (get-frequency-matrix self
reader
narrative-field
terms
doc-seq))
svd-factorization (decompose-svd fm k)
hadoop-conf (Configuration.)
fs (FileSystem/get hadoop-conf)
base-path (Path. (str "/lsa4solr/kmeans-clustering/" (java.lang.System/nanoTime)))
mkdirs-result (FileSystem/mkdirs fs
base-path
(FsPermission/getDefault))
U (:U svd-factorization)
S (:S svd-factorization)
V (:V svd-factorization)
m (trans
(mmult (sel S :cols (range 0 k) :rows (range 0 k))
(trans (mmult (sel V :cols (range 0 k))))))
srm (doto (org.apache.mahout.math.SparseRowMatrix. (int-array (dim m)))
((fn [sparse-row-matrix]
(doall
(for [i (range 0 (count m))
j (range 0 (count (sel m :rows 0)))]
(.setQuick sparse-row-matrix i j (sel m :rows i :cols j)))))))
;; reduced-fm (mmult U (mmult S (transpose V)))
reduced-fm (mmult V S)
reduced-m-path (str (.toString base-path) "/reducedm")
writer (lsa4solr.hadoop-utils/write-matrix hadoop-conf srm reduced-m-path)
initial-centroids (org.apache.mahout.clustering.kmeans.RandomSeedGenerator/buildRandom reduced-m-path
(str (.toString base-path) "/centroids")
num-clusters)
;; writer (write-matrix hadoop-conf (transpose reduced-fm) reduced-m-path)
writer (write-matrix hadoop-conf reduced-fm reduced-m-path)
initial-centroids (RandomSeedGenerator/buildRandom reduced-m-path
(str (.toString base-path) "/centroids")
num-clusters)
cluster-output-path (str (.toString base-path) "/clusterout")
job (org.apache.mahout.clustering.kmeans.KMeansDriver/runJob
job (KMeansDriver/runJob
reduced-m-path
(.toString initial-centroids)
cluster-output-path
"org.apache.mahout.common.distance.CosineDistanceMeasure"
0.00000001
k
num-clusters)
tkey (org.apache.hadoop.io.Text.)
tval (org.apache.hadoop.io.Text.)
tkey (Text.)
tval (Text.)
groups (clojure.contrib.seq-utils/flatten
(map (fn [path-string] (let [path (org.apache.hadoop.fs.Path. path-string)
seq-reader (org.apache.hadoop.io.SequenceFile$Reader. fs path hadoop-conf)
(map (fn [path-string] (let [path (Path. path-string)
seq-reader (SequenceFile$Reader. fs path hadoop-conf)
valseq (take-while (fn [v] (.next seq-reader tkey tval)) (repeat [tkey tval]))]
(map #(.toString (second %)) valseq)))
(map #(str cluster-output-path "/points/part-0000" %) (range 0 8))))
clusters (apply merge-with #(into %1 %2)
(map #(hash-map (keyword (second %))
(list (lsa4solr.lucene-utils/get-docid reader "id" (nth doc-seq (first %1)))))
(list (get-docid reader "id" (nth doc-seq (first %1)))))
(indexed groups)))]
clusters))

{:groups groups
:clusters clusters
:U U
:S S
:V V
:reduced-fm reduced-fm}))
)

25 changes: 14 additions & 11 deletions src/lsa4solr/hadoop_utils.clj
@@ -1,20 +1,23 @@
(ns lsa4solr.hadoop-utils)
(ns lsa4solr.hadoop-utils
(:import (org.apache.mahout.math VectorWritable)
(org.apache.hadoop.io IntWritable)
(org.apache.hadoop.fs FileSystem Path)
(org.apache.hadoop.io SequenceFile$Writer)))

(defn write-vectors [writer
m]
(doall (map #(.append writer %1 (new org.apache.mahout.math.VectorWritable (.vector %2)))
(map #(new org.apache.hadoop.io.IntWritable %)
(doall (map #(.append writer %1 (VectorWritable. (.vector %2)))
(map #(IntWritable. %)
(range 0 (.numRows m)))
(iterator-seq (.iterator m)))))

(defn write-matrix [hadoop-conf m path-string]
(let [fs (org.apache.hadoop.fs.FileSystem/get hadoop-conf)
path (new org.apache.hadoop.fs.Path path-string)]
(doto (new org.apache.hadoop.io.SequenceFile$Writer
fs
hadoop-conf
path
org.apache.hadoop.io.IntWritable
org.apache.mahout.math.VectorWritable)
(let [fs (FileSystem/get hadoop-conf)
path (Path. path-string)]
(doto (SequenceFile$Writer. fs
hadoop-conf
path
IntWritable
VectorWritable)
(write-vectors m)
(.close))))

0 comments on commit 52a59c8

Please sign in to comment.