forked from fotisj/pydelta
-
Notifications
You must be signed in to change notification settings - Fork 9
/
cluster.py
262 lines (221 loc) · 9.87 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# -*- encoding: utf-8 -*-
"""
Clustering of distance matrixes.
:class:`Clustering` represents a hierarchical clustering which can be flattened
using :meth:`Clustering.fcluster`, the flattened clustering is then represented
by :class:`FlatClustering`.
If supported by the installed version of scikit-learn, there is also a
KMedoidsClustering.
"""
import logging
logger = logging.getLogger(__name__)
from pprint import pformat
import numpy as np
import pandas as pd
import scipy.spatial.distance as ssd
import scipy.cluster.hierarchy as sch
from delta.util import Metadata
from delta.deltas import DistanceMatrix
from delta.corpus import Corpus
from sklearn import metrics
class Clustering:
"""
Represents a hierarchical clustering.
Note:
This is subject to refactoring once we implement more clustering
methods
"""
def __init__(self, distance_matrix, method="ward", **kwargs):
self.metadata = Metadata(distance_matrix.metadata,
cluster_method=method, **kwargs)
self.distance_matrix = distance_matrix
self.describer = distance_matrix.document_describer
self.method = method
self.linkage = self._calc_linkage()
def _calc_linkage(self):
if self.method == "ward":
return sch.ward(ssd.squareform(self.distance_matrix,
force="tovector"))
else:
return sch.linkage(ssd.squareform(self.distance_matrix),
method=self.method, metric="euclidean")
def fclustering(self):
"""
Returns a default flat clustering from the hierarchical version.
This method uses the :class:`DocumentDescriber` to determine the
groups, and uses the number of groups as a maxclust criterion.
Returns:
FlatClustering: A properly initialized representation of the flat
clustering.
"""
flat = FlatClustering(self.distance_matrix, metadata=self.metadata,
flattening='maxclust')
flat.set_clusters(sch.fcluster(self.linkage, flat.group_count,
criterion="maxclust"))
return flat
class FlatClustering:
"""
A flat clustering represents a non-hierarchical clustering.
Notes:
FlatClustering uses a data frame field called ``data`` to store the
actual clustering. This field will have the same index as the distance
matrix, and three columns labeled ``Group``, ``GroupID``, and
``Cluster``. ``Group`` will be the group label returned by the
:class:`DocumentDescriber` we use, ``GroupID`` a numerical ID for each
group (to be used as ground truth) and ``Cluster`` the numerical ID of
the actual cluster associated by the clustering algorithm.
As long as FlatClusterings ``initialized`` property is ``False``, the
Clustering is not assigned yet.
"""
def __init__(self, distances, clusters=None, metadata=None, **kwargs):
if isinstance(distances, DistanceMatrix):
self.distances = distances
self.document_describer = self.distances.document_describer
self.documents = self.distances.index
elif isinstance(distances, Corpus):
self.distances = None
self.document_describer = distances.document_describer
self.documents = distances.index
else:
raise ValueError(
"Flat clustering must be initialized from a distance matrix or a corpus.\n"
"(did you want to call Clustering.fcluster() instead?)")
self.distances = distances
self.metadata = Metadata(metadata if metadata is not None else
distances.metadata, **kwargs)
self.data, self.group_count = self._init_data()
self.initialized = False
if clusters is not None:
self.set_clusters(clusters)
def set_clusters(self, clusters):
if self.initialized:
raise Exception("Already initialized")
self.data["Cluster"] = clusters
self.initialized = True
def _init_data(self):
clustering = pd.DataFrame(index=self.distances.index)
dd = self.distances.document_describer
clustering["Group"] = [dd.group_name(doc) for doc in clustering.index]
group_count = len(dd.groups(clustering.index))
group_idx = pd.Series(index=clustering.Group.value_counts().index,
data=range(0, group_count))
clustering["GroupID"] = clustering.Group.map(group_idx)
return clustering, group_count
@staticmethod
def ngroups(df):
"""
With df being a data frame that has a Group column, return the number
of different authors in df.
"""
return len(set(df.Group))
def cluster_errors(self):
"""
Calculates the number of cluster errors by:
1. calculating the total number of different authors in the set
2. calling sch.fcluster to generate at most that many flat clusters
3. for each of those clusters, the cluster errors are the number of
authors in this cluster - 1
4. sum of each cluster's errors = result
"""
return int((self.data.groupby("Cluster")
.agg(self.ngroups).Group-1).sum())
def purity(self):
"""
To compute purity, each cluster is assigned to the class which is most
frequent in the cluster, and then the accuracy of this assignment is
measured by counting the number of correctly assigned documents and
dividing by $N$
"""
def correctly_classified(cluster):
return cluster.Group.value_counts()[0]
return int(self.data.groupby("Cluster")
.agg(correctly_classified)
.Group.sum()) / self.data.index.size
def entropy(self):
"""
Smaller entropy values suggest a better clustering.
"""
classes = self.data.Group.unique().size
def cluster_entropy(cluster):
class_counts = cluster.value_counts()
return float((class_counts / cluster.index.size *
np.log(class_counts / cluster.index.size)).sum() *
(-1)/np.log(classes))
def weighted_cluster_entropy(cluster):
return (cluster.index.size / self.data.index.size) * \
cluster_entropy(cluster)
return self.data.groupby("Cluster") \
.agg(weighted_cluster_entropy).Group.sum()
def adjusted_rand_index(self):
"""
Calculates the Adjusted Rand Index for the given flat clustering
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.adjusted_rand_score.html#sklearn.metrics.adjusted_rand_score
"""
logger.debug("Calculating ARI for %s", self.data)
return metrics.adjusted_rand_score(self.data.GroupID,
self.data.Cluster)
def homogeneity_completeness_v_measure(self):
return metrics.homogeneity_completeness_v_measure(self.data.GroupID,
self.data.Cluster)
def evaluate(self):
"""
Returns:
pandas.Series: All scores for the current clustering
"""
result = pd.Series()
result["Cluster Errors"] = self.cluster_errors()
result["Adjusted Rand Index"] = self.adjusted_rand_index()
result["Homogeneity"], result["Completeness"], result["V Measure"] = \
self.homogeneity_completeness_v_measure()
result["Purity"] = self.purity()
result["Entropy"] = self.entropy()
return result
def clusters(self, labeled=False):
"""
Documents by cluster.
Args:
labeled (bool): If ``True``, represent each document by its *label*
as calculated by the :class:`DocumentDescriber`. This is
typically a human-readable, shortened description
Returns:
dict: Maps each cluster number to a list of documents.
"""
clusters = self.data.groupby("Cluster").groups
if labeled:
dd = self.distances.document_describer
return {n: [dd.label(doc) for doc in docs]
for n, docs in clusters.items()}
else:
return clusters
def describe(self):
"""
Returns a description of the current flat clustering.
"""
clusters = self.clusters(labeled=True)
result = "{} clusters of {} documents (ground truth: {} groups):\n" \
.format(len(clusters), len(self.data.index), self.group_count)
result += pformat(clusters, compact=True) + '\n'
return result
try:
from sklearn.cluster import KMedoids
class KMedoidsClustering_distances(FlatClustering):
def __init__(self, distances, n_clusters=None, metadata=None,
**kwargs):
super().__init__(distances, metadata, **kwargs)
if n_clusters is None:
n_clusters = self.group_count
model = KMedoids(n_clusters=n_clusters)
self.set_clusters(model.fit_predict(distances))
class KMedoidsClustering(FlatClustering):
def __init__(self, corpus, delta, n_clusters=None, extra_args={},
metadata=None, **kwargs):
super().__init__(corpus, metadata, **kwargs)
if n_clusters is None:
n_clusters = self.group_count
model = KMedoids(n_clusters=n_clusters,
distance_metric=delta.metric, **extra_args)
data = delta.prepare(corpus)
self.set_clusters(model.fit_predict(data))
except ImportError:
logger.log(logging.WARNING, "KMedoids clustering not available.\n" \
"You need a patched scikit-learn, see README.txt", exc_info=0)