-
Notifications
You must be signed in to change notification settings - Fork 0
/
collection_features.py
129 lines (96 loc) · 4.3 KB
/
collection_features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import book_classification as bc
from scipy import sparse
from functools import reduce
from collections import defaultdict
import numpy
from scipy import sparse
class CollectionFeatures:
def __init__(self, collection, collection_extractor, features_by_book):
self._collection = collection
self._collection_extractor = collection_extractor
self._features_by_book = features_by_book
def collection(self):
return self._collection
def by_book(self, book):
return self._features_by_book[book]
def select(self, filter_pred):
features_by_book = defaultdict(dict)
for book, features in self._features_by_book.items():
for k, v in features.items():
if filter_pred(k):
features_by_book[book][k] = v
return self.__class__(self._collection, self._collection_extractor, features_by_book)
class CollectionHierarchialFeatures:
def __init__(self, by_book, by_author, total):
self._by_book = by_book
self._by_author = by_author
self._total = total
def by_book(self, book):
return self._by_book[book]
def by_author(self, author):
return self._by_author[author]
def total(self):
return self._total
@classmethod
def from_book_collection(cls, collection, extractor):
features_by_book = {}
for book in collection.books():
features_by_book[book] = extractor.extract_from(book)
features_by_author = {}
for author in collection.authors():
features = (features_by_book[book] for book in collection.books_by(author))
features_by_author[author] = reduce(lambda x,y: x.combine(y), features)
features_total = reduce(lambda x,y: x.combine(y), features_by_author.values())
return cls(features_by_book, features_by_author, features_total)
class DummyCollectionFeaturesEncoder:
def encode(self, features):
print(len(features.collection()))
rows = list(features.by_book(b)._entries for b in features.collection().books())
matrix = sparse.vstack([sparse.csr_matrix(r) for r in rows], format="csr")
print(type(matrix))
print("matrix of %s, with %s/%s (%s%%) non-zeroes" %
(matrix.shape, matrix.nnz, matrix.shape[0]*matrix.shape[1],
100 * matrix.nnz / (matrix.shape[0]*matrix.shape[1])))
#matrix = numpy.vstack(rows)
#print("matrix of %s, with %s/%s (%s%%) non-zeroes" %
# (matrix.shape, numpy.count_nonzero(matrix), matrix.shape[0]*matrix.shape[1],
# 100 * numpy.count_nonzero(matrix) / (matrix.shape[0]*matrix.shape[1])))
return matrix
class SparseMatrixRowFeaturesEncoder:
def encode(self, features):
return sparse.vstack(list(features))
class FeaturesEncoder:
def __init__(self, vocabulary):
self._vocabulary = vocabulary
self._numeric_indexer = bc.NumericIndexer(self._vocabulary)
def encode(self, features):
for k, v in features.items():
if self._numeric_indexer.can_encode(k):
yield self._numeric_indexer.encode(k), v
def decode(self, items):
for k, v in items:
yield self._numeric_indexer.decode(k), v
def vocabulary(self):
return self._numeric_indexer.vocabulary()
class CollectionFeaturesEncoder:
def __init__(self, encoder):
self._encoder = encoder
def encode(self, features):
num_rows = len(features.collection())
num_cols = len(self._encoder.vocabulary())
matrix = sparse.dok_matrix((num_rows, num_cols))
for i, book in enumerate(features.collection().books()):
book_features = features.by_book(book)
for j, v in self._encoder.encode(book_features):
matrix[i, j] = v
return matrix.tocsc()
def vocabulary(self):
return self._features_encoder.vocabulary()
class CollectionFeaturesMatrixExtractor:
def __init__(self, extractor, base_collection):
self._extractor = bc.CollectionFeaturesExtractor(extractor)
self._training = base_collection
self._encoder = self._extractor.encoder_for(self._training)
def extract_from(self, collection):
features = self._extractor.extract_from(collection)
return self._encoder.encode(features)