-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathImportManager.py
263 lines (233 loc) · 9.45 KB
/
ImportManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""ImportManager
Manages imported modules and protects against concurrent imports.
Keeps lists of all imported Python modules and templates as well as other
config files used by Webware for Python. Modules which are not directly
imported can be monitored using hupper. This can be used to detect changes in
source files, templates or config files in order to reload them automatically.
"""
import sys
import warnings
from os.path import getmtime, isfile, join
from importlib.util import module_from_spec, spec_from_file_location
from importlib.machinery import (
ModuleSpec, SourceFileLoader, SourcelessFileLoader)
class ImportManager:
"""The import manager.
Keeps track of the Python modules and other system files that have been
imported and are used by Webware.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Create ImportManager as a singleton."""
if not cls._instance:
cls._instance = super().__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
"""Initialize import manager."""
self._fileList = {}
self._moduleFiles = {}
self._notifyHook = None
self._reloader = self.getReloader()
def getReloader(self):
"""Get the current reloader if the application is monitored."""
reloader = None
with warnings.catch_warnings():
# ignore deprecation warnings from hupper
warnings.filterwarnings('ignore', category=DeprecationWarning)
try:
import hupper
except ImportError:
pass
else:
if hupper.is_active():
try:
reloader = hupper.get_reloader()
except RuntimeError:
pass
if reloader:
print('Application is monitored for reloading.')
print()
return reloader
def moduleFromSpec(self, spec):
"""Load the module with the given module spec."""
if not spec or not isinstance(spec, ModuleSpec):
raise TypeError(f'Invalid spec: {spec!r}')
try:
module = module_from_spec(spec)
except Exception:
# Also record file paths which weren't successfully loaded, which
# may happen due to a syntax error in a servlet, because we also
# want to know when such a file is modified:
if spec.origin:
self.recordFile(spec.origin)
raise
self.recordModule(module)
spec.loader.exec_module(module)
return module
def findSpec(self, name, path, fullModuleName=None):
"""Find the module spec for the given name at the given path."""
if not name or not isinstance(name, str):
raise TypeError(f'Invalid name: {name!r}')
if not path or not isinstance(path, str):
raise TypeError(f'Invalid path: {path!r}')
# find package
packageDirectory = join(path, name)
packageFileName = '__init__.py'
filePath = join(packageDirectory, packageFileName)
if isfile(filePath):
return spec_from_file_location(name, filePath)
# find package as byte code without source
filePath += 'c'
if isfile(filePath):
return spec_from_file_location(name, filePath)
# find module
fileName = f'{name}.py'
filePath = join(path, fileName)
if isfile(filePath):
if fullModuleName:
name = fullModuleName
loader = SourceFileLoader(name, filePath)
return spec_from_file_location(name, filePath, loader=loader)
# find module as optimized byte code without source
filePath += 'c'
if isfile(filePath):
loader = SourcelessFileLoader(name, filePath)
return spec_from_file_location(name, filePath, loader=loader)
raise ImportError(f'No module named {name!r}')
def fileList(self, update=True):
"""Return the list of tracked files."""
if update:
# Update list of files of imported modules
moduleNames = [modname for modname in sys.modules
if modname not in self._moduleFiles]
if moduleNames:
self.recordModules(moduleNames)
return self._fileList
def notifyOfNewFiles(self, hook):
"""Register notification hook.
Called by someone else to register that they'd like to know
when a new file is imported.
"""
self._notifyHook = hook
def watchFile(self, path, moduleName=None, getmtime=getmtime):
"""Add more files to watch without importing them."""
mtime = getmtime(path)
self._fileList[path] = mtime
if moduleName:
self._moduleFiles[moduleName] = path
# send notification that this file was imported
if self._notifyHook:
self._notifyHook(path, mtime)
# let reloader know that this was imported
if self._reloader:
self._reloader.watch_files([path])
def recordModule(self, module, isfile=isfile):
"""Record a module."""
moduleName = getattr(module, '__name__', None)
if not moduleName or moduleName not in sys.modules:
return
fileList = self._fileList
# __orig_file__ is used for PSP and Cheetah templates; we want
# to record the source filenames, not the auto-generated modules:
f = getattr(module, '__orig_file__', None)
if f and f not in fileList:
try:
if isfile(f):
self.watchFile(f, moduleName)
except OSError:
pass
else:
f = getattr(module, '__file__', None)
if f and f not in fileList:
# record the .py file corresponding to each .pyc
if f[-4:].lower() == '.pyc':
f = f[:-1]
try:
if isfile(f):
self.watchFile(f, moduleName)
else:
self.watchFile(join(f, '__init__.py'))
except OSError:
pass
def recordModules(self, moduleNames=None):
"""Record a list of modules (or all modules)."""
if moduleNames is None:
moduleNames = sys.modules
for modname in moduleNames:
mod = sys.modules[modname]
self.recordModule(mod)
def recordFile(self, filename, isfile=isfile):
"""Record a file."""
if isfile(filename):
self.watchFile(filename)
def fileUpdated(self, filename, update=True, getmtime=getmtime):
"""Check whether file has been updated."""
fileList = self.fileList(update)
try:
oldTime = fileList[filename]
except KeyError:
return True
try:
newTime = getmtime(filename)
except OSError:
return True
if oldTime >= newTime:
return False
fileList[filename] = newTime
# Note that the file list could be changed while running this
# method in a monitor thread, so we don't use iteritems() here:
for moduleName, moduleFile in self._moduleFiles.items():
if moduleFile == filename:
module = sys.modules.get(moduleName)
return not module or not getattr(
module, '__donotreload__', False)
return True # it's not a module, we must reload
def updatedFile(self, update=True, getmtime=getmtime):
"""Check whether one of the files has been updated."""
fileList = self.fileList(update)
for filename, oldTime in list(fileList.items()):
try:
newTime = getmtime(filename)
except OSError:
return filename
if oldTime >= newTime:
continue
fileList[filename] = newTime
for moduleName, moduleFile in self._moduleFiles.items():
if moduleFile == filename:
module = sys.modules.get(moduleName)
if module and getattr(module, '__donotreload__', False):
break
return filename # it's a module that needs to be reloaded
else:
return filename # it's not a module, we must reload
def delModules(self, includePythonModules=False, excludePrefixes=None):
"""Delete imported modules.
Deletes all the modules that have been imported unless they are part
of Webware. This can be used to support auto reloading.
"""
moduleFiles = self._moduleFiles
fileList = self._fileList
for modname in moduleFiles:
if modname == __name__:
continue
filename = self._moduleFiles[modname]
if not includePythonModules:
if not filename or filename.startswith(sys.prefix):
continue
for prefix in excludePrefixes or []:
if modname.startswith(prefix):
break
else:
try:
del sys.modules[modname]
except KeyError:
pass
try:
del moduleFiles[modname]
except KeyError:
pass
try:
del fileList[filename]
except KeyError:
pass