-
-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathmisc.py
252 lines (212 loc) · 7.49 KB
/
misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""
Misc functions / classes
"""
from functools import wraps
from contextlib import contextmanager
# note: ensure sage's version of python has nose installed or starting new sessions may hang!
try:
from nose.tools import assert_is, assert_equal, assert_in, assert_not_in, assert_raises, assert_regexp_matches, assert_is_instance, assert_is_not_none, assert_greater
except ImportError:
pass
import sys
import re
from datetime import datetime
class Config(object):
"""
Config file wrapper / handler class
This is designed to make loading and working with an
importable configuration file with options relevant to
multiple classes more convenient.
Rather than re-importing a configuration module whenever
a specific is attribute is needed, a Config object can
be instantiated by some base application and the relevant
attributes can be passed to whatever classes / functions
are needed.
This class tracks both the default and user-specified
configuration files
"""
def __init__(self):
import config_default
self.config = None
self.config_default = config_default
try:
import config
self.config = config
except ImportError:
pass
def get_config(self, attr):
"""
Get a config attribute. If the attribute is defined
in the user-specified file, that is used, otherwise
the default config file attribute is used if
possible. If the attribute is a dictionary, the items
in config and default_config will be merged.
:arg attr str: the name of the attribute to get
:returns: the value of the named attribute, or
None if the attribute does not exist.
"""
default_config_val = self.get_default_config(attr)
config_val = default_config_val
if self.config is not None:
try:
config_val = getattr(self.config, attr)
except AttributeError:
pass
if isinstance(config_val, dict):
config_val = dict(default_config_val.items() + config_val.items())
return config_val
def get_default_config(self, attr):
"""
Get a config attribute from the default config file.
:arg attr str: the name of the attribute toget
:returns: the value of the named attribute, or
None if the attribute does not exist.
"""
config_val = None
try:
config_val = getattr(self.config_default, attr)
except AttributeError:
pass
return config_val
def set_config(self, attr, value):
"""
Set a config attribute
:arg attr str: the name of the attribute to set
:arg value: an arbitrary value to set the named
attribute to
"""
setattr(self.config, attr, value)
def get_attrs(self):
"""
Get a list of all the config object's attributes
This isn't very useful right now, since it includes
__<attr>__ attributes and the like.
:returns: a list of all attributes belonging to
the imported config module.
:rtype: list
"""
return dir(self.config)
def get_db_file(config):
"""
A convenience function to get the correct location of a
database from a config object.
:arg config: a Config object
:returns: the localation of the database file, for the
purposes of instantiating a database.
:rtype: str
"""
db_file = None
db = config.get_config("db")
db_config = config.get_config("db_config")
if db == "sqlalchemy":
db_file = db_config.get("uri")
return db_file
def decorator_defaults(func):
"""
This function allows a decorator to have default arguments.
Normally, a decorator can be called with or without arguments.
However, the two cases call for different types of return values.
If a decorator is called with no parentheses, it should be run
directly on the function. However, if a decorator is called with
parentheses (i.e., arguments), then it should return a function
that is then in turn called with the defined function as an
argument.
This decorator allows us to have these default arguments without
worrying about the return type.
EXAMPLES::
sage: from sage.misc.decorators import decorator_defaults
sage: @decorator_defaults
... def my_decorator(f,*args,**kwds):
... print kwds
... print args
... print f.__name__
...
sage: @my_decorator
... def my_fun(a,b):
... return a,b
...
{}
()
my_fun
sage: @my_decorator(3,4,c=1,d=2)
... def my_fun(a,b):
... return a,b
...
{'c': 1, 'd': 2}
(3, 4)
my_fun
"""
from inspect import isfunction
@wraps(func)
def my_wrap(*args,**kwargs):
if len(kwargs)==0 and len(args)==1 and isfunction(args[0]):
# call without parentheses
return func(*args)
else:
def _(f):
return func(f, *args, **kwargs)
return _
return my_wrap
@contextmanager
def session_metadata(metadata):
# flush any messages waiting in buffers
sys.stdout.flush()
sys.stderr.flush()
session = sys.stdout.session
old_metadata = session.metadata
new_metadata = old_metadata.copy()
new_metadata.update(metadata)
session.metadata = new_metadata
try:
yield None
finally:
sys.stdout.flush()
sys.stderr.flush()
session.metadata = old_metadata
def display_message(data):
session = sys.stdout.session
content = {'data': data, 'source': 'sagecell'}
session.send(sys.stdout.pub_socket, 'display_data', content=content, parent = sys.stdout.parent_header)
def json_default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
else:
raise TypeError, 'Object of type %s with value of %s is not JSON serializable' % (type(obj), repr(obj))
##########################################
## Unit Testing Misc Functions
##########################################
def assert_len(obj,l):
from nose.tools import assert_equal
return assert_equal(len(obj), l, "Object %s should have length %s, but has length %s"%(obj,l,len(obj)))
uuid_re = re.compile('[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}')
def assert_uuid(s):
from nose.tools import assert_regexp_matches
return assert_regexp_matches(s, uuid_re)
# from the attest python package - license is modified BSD
from StringIO import StringIO
@contextmanager
def capture_output(split=False):
"""Captures standard output and error during the context. Returns a
tuple of the two streams as lists of lines, added after the context has
executed.
.. testsetup::
from attest import capture_output
>>> with capture_output() as (out, err):
... print 'Captured'
...
>>> out
['Captured']
"""
stdout, stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = StringIO(), StringIO()
out, err = [], []
try:
yield out, err
finally:
if split:
out.extend(sys.stdout.getvalue().splitlines())
err.extend(sys.stderr.getvalue().splitlines())
else:
out.append(sys.stdout.getvalue())
err.append(sys.stderr.getvalue())
sys.stdout, sys.stderr = stdout, stderr