Skip to content

Commit

Permalink
Rewrote and renamed net.test() to net.evaluate() and net.evaluate_and…
Browse files Browse the repository at this point in the history
…_label()
  • Loading branch information
dsblank committed Sep 13, 2018
1 parent 1c64e0c commit 2b982f6
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 181 deletions.
343 changes: 179 additions & 164 deletions conx/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,15 +875,168 @@ def reset(self, clear=False, **overrides):
self.compile_options.update(overrides)
self.compile_model(**self.compile_options)

def test(self, batch_size=32, show=False, tolerance=None, force=False,
show_inputs=True, show_outputs=True,
filter="all", interactive=True):
def evaluate(self, batch_size=None, show=False, show_inputs=True, show_targets=True,
kverbose=0, sample_weight=None, steps=None, tolerance=None, force=False,
max_col_width=15):
"""
Test a dataset.
Evaluate the train and/or test sets.
If show is True, then it will show details for each
training/test pair, the amount of detail then determined by
show_inputs, and show_outputs.
If force is True, then it will show all patterns, even if there are many.
Set max_col_width to change the maximum width that any one column can be.
"""
if len(self.dataset) == 0:
print("WARNING: nothing to test; use network.dataset.load()",
file=sys.stderr)
return
if tolerance is not None:
self.tolerance = tolerance
print("%s:" % self.name)
if 0 < self.dataset._split <= 1:
size, num_train, num_test = self.dataset._get_split_sizes()
results = self.model.evaluate(self.dataset._inputs, self.dataset._targets,
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
print("Training Data Results:")
if show:
self._evaluate_range(slice(num_train), show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
results = self.model.evaluate([self.dataset._inputs[bank][:num_train]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][:num_train]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))
print()
print("Testing Data Results:")
if show:
self._evaluate_range(slice(num_train, num_train + num_test), show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
results = self.model.evaluate([self.dataset._inputs[bank][num_train:]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][num_train:]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))

else: # all data, no split:
print("All Data Results:")
if show:
self._evaluate_range(slice(len(self.dataset)), show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
results = self.model.evaluate(self.dataset._inputs, self.dataset._targets,
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))

def _evaluate_range(self, slice, show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width):
def split_heading(name, fill=""):
if "_" not in name:
return [fill, name]
else:
return name.rsplit("_", 1)
column_count = 1 # the row number
if show_inputs:
column_count += self.num_input_layers
if show_targets:
column_count += self.num_target_layers
column_count += len(self.model.metrics_names)
cols = [[] for i in range(column_count)]
for i in range(len(self.dataset))[slice]:
result = self.model.evaluate([self.dataset._inputs[bank][i:i+1]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][i:i+1]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
cols[0].append(str(i))
col = 1
if show_inputs:
for j in range(self.num_input_layers):
cols[col].append(self.pf(self.dataset._inputs[j][i], max_line_width=max_col_width))
col += 1
if show_targets:
for j in range(self.num_target_layers):
cols[col].append(self.pf(self.dataset._targets[j][i], max_line_width=max_col_width))
col += 1
for item in result:
cols[col].append(self.pf(item))
col += 1
headings = [["", "#"]]
if show_inputs:
headings.extend([split_heading(name) for name in self.input_bank_order])
if show_targets:
headings.extend([split_heading(name, "target") for name in self.output_bank_order])
headings.extend([split_heading(name) for name in self.model.metrics_names])
correct = {}
for c in range(len(cols)):
if headings[c][1].endswith("acc"):
correct[c] = 0
for r in range(len(cols[c])):
if cols[c][r] == self.pf(0):
cols[c][r] = "x"
elif cols[c][r] == self.pf(1):
cols[c][r] = "correct"
correct[c] += 1
column_widths = [max([len(c) for c in row]) if row else 0 for row in cols]
column_widths = [max([column_widths[c], len(headings[c][0]), len(headings[c][1])]) for c in range(len(column_widths))]
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
for h in range(len(headings)):
print(("%" + str(column_widths[h]) + "s") % headings[h][0], end=" | ")
print()
for h in range(len(headings)):
print(("%" + str(column_widths[h]) + "s") % headings[h][1], end=" | ")
print()
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
for i in range(len(cols[0])): # at least one column
if not force and i == 15:
print()
print("... skipping some rows ...")
print()
if force or i < 15:
for c in range(column_count):
if len(cols[c]) > i:
print(("%" + str(column_widths[c]) + "s") % cols[c][i], end=" | ")
else:
print(("%" + str(column_widths[c]) + "s") % "", end=" | ")
print() # end of line
## Totals:
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
row = ""
for c in range(column_count):
if c in correct:
row += (("%" + str(column_widths[c]) + "s") % correct[c]) + " | "
else:
row += (("%" + str(column_widths[c]) + "s") % "") + " "
print("Correct:", row[9:])
for c in range(column_count):
if c in correct:
print("-" * column_widths[c], end="-+-")
else:
print("-" * column_widths[c], end="---")
print()

def evaluate_and_label(self, batch_size=32, tolerance=None):
"""
Test the network on the dataset, and categorize the results.
"""
tolerance = tolerance if tolerance is not None else self.tolerance
if len(self.dataset.inputs) == 0:
raise Exception("nothing to test")
length = len(self.dataset.train_targets)
if self.dataset._split == 1.0: ## special case; use entire set
inputs = self.dataset._inputs
Expand All @@ -892,106 +1045,17 @@ def test(self, batch_size=32, show=False, tolerance=None, force=False,
## need to split; check format based on output banks:
targets = [column[:length] for column in self.dataset._targets]
inputs = [column[:length] for column in self.dataset._inputs]
if interactive:
self._test(inputs, targets, "validation dataset", batch_size, show,
tolerance, force, show_inputs, show_outputs, filter, interactive)
else:
results = self._test(inputs, targets, "validation dataset", batch_size, show,
tolerance, force, show_inputs, show_outputs, filter, interactive)
categories = {}
for i in range(length):
label = "%s (%s)" % (self.dataset.labels[i],
"correct" if results[i] else "wrong")
if not label in categories:
categories[label] = []
categories[label].append(self.dataset.inputs[i])
return sorted(categories.items())

def _test(self, inputs, targets, dataset, batch_size=32, show=False,
tolerance=None, force=False,
show_inputs=True, show_outputs=True,
filter="all", interactive=True):
"""
>>> net = Network("Playback Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.dataset.load([
... [[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]])
>>> array = net.to_array()
>>> net.from_array(np.zeros(len(array))) ## Zero-out weights
>>> net._test(net.dataset._inputs, net.dataset._targets, "TEST")
========================================================
Testing TEST with tolerance None...
Total count: 4
correct: 0
incorrect: 4
Total percentage correct: 0.0
>>> net._test(net.dataset._inputs, net.dataset._targets, "TEST", show=True)
========================================================
Testing TEST with tolerance None...
# | inputs | targets | outputs | result
---------------------------------------
0 | [[0.00, 0.00]] | [[0.00]] | [0.50] | X
1 | [[0.00, 1.00]] | [[1.00]] | [0.50] | X
2 | [[1.00, 0.00]] | [[1.00]] | [0.50] | X
3 | [[1.00, 1.00]] | [[0.00]] | [0.50] | X
Total count: 4
correct: 0
incorrect: 4
Total percentage correct: 0.0
"""
if interactive:
print("=" * 56)
print("Testing %s with tolerance %.6s..." % (dataset, tolerance))
outputs = self.model.predict(inputs, batch_size=batch_size)
## FYI: outputs not shaped
if self.num_target_layers > 1:
correct = self.compute_correct(outputs, targets, tolerance)
else:
## Warning:
## keras returns outputs as a single column
## conx targets are always multi-column
correct = self.compute_correct([outputs], targets, tolerance)
count = len(correct)
if show:
if show_inputs:
in_formatted = self.pf_matrix(inputs, force)
count = len(in_formatted)
if show_outputs:
targ_formatted = self.pf_matrix(targets, force)
out_formatted = self.pf_matrix(outputs, force)
count = len(out_formatted)
header = "# | "
if show_inputs:
header += "inputs | "
if show_outputs:
header += "targets | outputs | "
header += "result"
print(header)
print("---------------------------------------")
for i in range(count):
show_it = ((filter == "all") or
(filter == "correct" and correct[i]) or
(filter == "incorrect" and not correct[i]))
if show_it:
line = "%d | " % i
if show_inputs:
line += "%s | " % in_formatted[i]
if show_outputs:
line += "%s | %s | " % (targ_formatted[i], out_formatted[i])
line += "correct" if correct[i] else "X"
print(line)
if interactive:
print("Total count:", len(correct))
print(" correct:", len([c for c in correct if c]))
print(" incorrect:", len([c for c in correct if not c]))
print("Total percentage correct:", list(correct).count(True)/len(correct))
else:
return list(correct)

correct = self.compute_correct([outputs], targets, tolerance)
categories = {}
for i in range(length):
label_i = self.dataset.labels[i] if i < len(self.dataset.labels) else "Unlabeled"
label = "%s (%s)" % (label_i, "correct" if correct[i] else "wrong")
if not label in categories:
categories[label] = []
categories[label].append(self.dataset.inputs[i])
return sorted(categories.items())

def compute_correct(self, outputs, targets, tolerance=None):
"""
Both are np.arrays. Return [True, ...].
Expand Down Expand Up @@ -1152,35 +1216,6 @@ def _compute_result_acc(self, results):
else:
raise Exception("attempting to find accuracy in results, but there aren't any")

def evaluate(self, batch_size=32):
"""
Test the network on the train and test data, returning a dict of results.
Example:
>>> net = Network("Evaluate", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mean_squared_error', optimizer="adam")
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> net.evaluate() # doctest: +ELLIPSIS
{'loss': ..., 'acc': ...}
"""
if len(self.dataset.inputs) == 0:
raise Exception("no dataset loaded")
if self.model is None:
raise Exception("need to build and compile network")
if self.model.optimizer is None:
raise Exception("need to compile network")
(train_inputs, train_targets), (test_inputs, test_targets) = self.dataset._split_data()
train_metrics = self.model.evaluate(train_inputs, train_targets, batch_size=batch_size, verbose=0)
results = {k:v for k, v in zip(self.model.metrics_names, train_metrics)}
if len(test_inputs) > 0:
test_metrics = self.model.evaluate(test_inputs, test_targets, batch_size=batch_size, verbose=0)
results.update({"val_"+k: v for k, v in zip(self.model.metrics_names, test_metrics)})
return results

def test_dataset_ranges(self):
"""
Test the dataset ranges to see if in range of activation functions.
Expand Down Expand Up @@ -4018,35 +4053,9 @@ def pp(self, *args, **opts):
vector = args[0]
print(label + self.pf(vector[:20], **opts))

def pf_matrix(self, matrix, force=False, **opts):
"""
Pretty-fromat a matrix. If a list, then that implies multi-bank.
"""
if isinstance(matrix, list): ## multiple output banks
rows = []
for r in range(len(matrix[0])):
row = []
for c in range(len(matrix)):
row.append(self.pf(matrix[c][r], **opts))
if c > 99 and not force:
row.append("...")
rows.append("[" + (",".join(row)) + "]")
if r > 99 and not force:
rows.append("...")
break
return rows
else:
rows = []
for r in range(len(matrix)):
rows.append(self.pf(matrix[r], **opts))
if r > 99 and not force:
rows.append("...")
break
return rows

def pf(self, vector, **opts):
def pf(self, vector, max_line_width=79, **opts):
"""
Pretty-format a vector. Returns string.
Pretty-format a vector or item. Returns string.
Arguments:
vector (list): The first parameter.
Expand All @@ -4070,19 +4079,25 @@ def pf(self, vector, **opts):
>>> net.pf([0]*10000) # doctest: +ELLIPSIS
'[0.00, 0.00, 0.00, ...]'
"""
if isinstance(vector, (int, float)):
vector = np.float16(vector)
if isinstance(vector, collections.Iterable):
vector = list(vector)
if isinstance(vector, (list, tuple)):
vector = np.array(vector)
config = copy.copy(self.config)
config.update(opts)
precision = "{0:.%df}" % config["precision"]
return np.array2string(
retval = np.array2string(
vector,
formatter={'float_kind': precision.format,
'int_kind': precision.format},
separator=", ",
max_line_width=79).replace("\n", "")
max_line_width=max_line_width).replace("\n", "")[:max_line_width]
if len(retval) == max_line_width:
return retval + "..."
else:
return retval

def set_weights(self, weights, layer_name=None):
"""
Expand Down

0 comments on commit 2b982f6

Please sign in to comment.