-
Notifications
You must be signed in to change notification settings - Fork 40
/
prog_factories.py
210 lines (192 loc) · 8.59 KB
/
prog_factories.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"Scripts for generating, solving and sweeping programs"
from time import time
import warnings as pywarnings
import numpy as np
from adce import adnumber
from ..nomials import parse_subs
from ..solution_array import SolutionArray
from ..keydict import KeyDict
from ..small_scripts import maybe_flatten
from ..small_classes import FixedScalar
from ..exceptions import Infeasible
from ..globals import SignomialsEnabled
def evaluate_linked(constants, linked):
"Evaluates the values and gradients of linked variables."
kdc = KeyDict({k: adnumber(maybe_flatten(v), k)
for k, v in constants.items()})
kdc_plain = None
array_calulated = {}
for key in constants: # remove gradients from constants
key.descr.pop("gradients", None)
for v, f in linked.items():
try:
if v.veckey and v.veckey.vecfn:
if v.veckey not in array_calulated:
with SignomialsEnabled(): # to allow use of gpkit.units
vecout = v.veckey.vecfn(kdc)
if not hasattr(vecout, "shape"):
vecout = np.array(vecout)
array_calulated[v.veckey] = vecout
out = array_calulated[v.veckey][v.idx]
else:
with SignomialsEnabled(): # to allow use of gpkit.units
out = f(kdc)
if isinstance(out, FixedScalar): # to allow use of gpkit.units
out = out.value
if hasattr(out, "units"):
out = out.to(v.units or "dimensionless").magnitude
elif out != 0 and v.units:
pywarnings.warn(
"Linked function for %s did not return a united value."
" Modifying it to do so (e.g. by using `()` instead of `[]`"
" to access variables) will reduce errors." % v)
out = maybe_flatten(out)
if not hasattr(out, "x"):
constants[v] = out
continue # a new fixed variable, not a calculated one
constants[v] = out.x
v.descr["gradients"] = {adn.tag: grad
for adn, grad in out.d().items()
if adn.tag}
except Exception as exception: # pylint: disable=broad-except
from .. import settings
if settings.get("ad_errors_raise", None):
raise
if kdc_plain is None:
kdc_plain = KeyDict(constants)
constants[v] = f(kdc_plain)
v.descr.pop("gradients", None)
print("Warning: skipped auto-differentiation of linked variable"
" %s because %s was raised. Set `gpkit.settings"
"[\"ad_errors_raise\"] = True` to raise such Exceptions"
" directly.\n" % (v, repr(exception)))
if ("Automatic differentiation not yet supported for <class "
"'gpkit.nomials.math.Monomial'> objects") in str(exception):
print("This particular warning may have come from using"
" gpkit.units.* in the function for %s; try using"
" gpkit.ureg.* or gpkit.units.*.units instead." % v)
def progify(program, return_attr=None):
"""Generates function that returns a program() and optionally an attribute.
Arguments
---------
program: NomialData
Class to return, e.g. GeometricProgram or SequentialGeometricProgram
return_attr: string
attribute to return in addition to the program
"""
def programfn(self, constants=None, **initargs):
"Return program version of self"
if not constants:
constants, _, linked = parse_subs(self.varkeys, self.substitutions)
if linked:
evaluate_linked(constants, linked)
prog = program(self.cost, self, constants, **initargs)
prog.model = self # NOTE SIDE EFFECTS
if return_attr:
return prog, getattr(prog, return_attr)
return prog
return programfn
def solvify(genfunction):
"Returns function for making/solving/sweeping a program."
def solvefn(self, solver=None, *, verbosity=1, skipsweepfailures=False,
**kwargs):
"""Forms a mathematical program and attempts to solve it.
Arguments
---------
solver : string or function (default None)
If None, uses the default solver found in installation.
verbosity : int (default 1)
If greater than 0 prints runtime messages.
Is decremented by one and then passed to programs.
skipsweepfailures : bool (default False)
If True, when a solve errors during a sweep, skip it.
**kwargs : Passed to solve and program init calls
Returns
-------
sol : SolutionArray
See the SolutionArray documentation for details.
Raises
------
ValueError if the program is invalid.
RuntimeWarning if an error occurs in solving or parsing the solution.
"""
constants, sweep, linked = parse_subs(self.varkeys, self.substitutions)
solution = SolutionArray()
solution.modelstr = str(self)
# NOTE SIDE EFFECTS: self.program and self.solution set below
if sweep:
run_sweep(genfunction, self, solution, skipsweepfailures,
constants, sweep, linked, solver, verbosity, **kwargs)
else:
self.program, progsolve = genfunction(self, **kwargs)
result = progsolve(solver, verbosity=verbosity, **kwargs)
if kwargs.get("process_result", True):
self.process_result(result)
solution.append(result)
solution.to_arrays()
self.solution = solution
return solution
return solvefn
# pylint: disable=too-many-locals,too-many-arguments,too-many-branches,too-many-statements
def run_sweep(genfunction, self, solution, skipsweepfailures,
constants, sweep, linked, solver, verbosity, **kwargs):
"Runs through a sweep."
# sort sweeps by the eqstr of their varkey
sweepvars, sweepvals = zip(*sorted(list(sweep.items()),
key=lambda vkval: vkval[0].eqstr))
if len(sweep) == 1:
sweep_grids = np.array(list(sweepvals))
else:
sweep_grids = np.meshgrid(*list(sweepvals))
N_passes = sweep_grids[0].size
sweep_vects = {var: grid.reshape(N_passes)
for (var, grid) in zip(sweepvars, sweep_grids)}
if verbosity > 0:
print("Sweeping with %i solves:" % N_passes)
tic = time()
self.program = []
last_error = None
for i in range(N_passes):
constants.update({var: sweep_vect[i]
for (var, sweep_vect) in sweep_vects.items()})
if linked:
evaluate_linked(constants, linked)
program, solvefn = genfunction(self, constants, **kwargs)
program.model = None # so it doesn't try to debug
self.program.append(program) # NOTE: SIDE EFFECTS
try:
if verbosity > 1:
print("\nSolve %i:" % i)
result = solvefn(solver, verbosity=verbosity-1, **kwargs)
if kwargs.get("process_result", True):
self.process_result(result)
solution.append(result)
except Infeasible as e:
last_error = e
if not skipsweepfailures:
raise RuntimeWarning(
"Solve %i was infeasible; progress saved to m.program."
" To continue sweeping after failures, solve with"
" skipsweepfailures=True." % i) from e
if verbosity > 0:
print("Solve %i was %s." % (i, e.__class__.__name__))
if not solution:
raise RuntimeWarning("All solves were infeasible.") from last_error
solution["sweepvariables"] = KeyDict()
ksweep = KeyDict(sweep)
for var, val in list(solution["constants"].items()):
if var in ksweep:
solution["sweepvariables"][var] = val
del solution["constants"][var]
elif linked: # if any variables are linked, we check all of them
if hasattr(val[0], "shape"):
differences = ((l != val[0]).any() for l in val[1:])
else:
differences = (l != val[0] for l in val[1:])
if not any(differences):
solution["constants"][var] = [val[0]]
else:
solution["constants"][var] = [val[0]]
if verbosity > 0:
soltime = time() - tic
print("Sweeping took %.3g seconds." % (soltime,))