-
Notifications
You must be signed in to change notification settings - Fork 60
/
watch.py
62 lines (49 loc) · 2.05 KB
/
watch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
from typing import * # noqa
from watchdog import events, observers
from . import exceptions, registry
from .compilers import BaseCompiler # noqa
class EventHandler(events.FileSystemEventHandler):
# noinspection PyShadowingNames
def __init__(self, scanned_dir, verbosity, compilers):
# type: (str, int, List[BaseCompiler]) -> None
self.scanned_dir = scanned_dir
self.verbosity = verbosity
self.compilers = compilers
super(EventHandler, self).__init__()
def on_any_event(self, e):
if e.is_directory or e.event_type not in ("created", "modified"):
return
path = e.src_path[len(self.scanned_dir):]
if path.startswith("/"):
path = path[1:]
for compiler in self.compilers:
if compiler.is_supported(path):
if self.verbosity > 1:
print("Modified: '{0}'".format(path))
try:
compiler.compile(path, from_management=True, verbosity=self.verbosity)
if compiler.supports_dependencies:
for dependent in compiler.get_dependents(path):
compiler.compile(path, from_management=True, verbosity=self.verbosity)
self.compile(dependent, from_management=True, verbosity=self.verbosity)
except (exceptions.StaticCompilationError, ValueError) as e:
print(e)
break
def watch_dirs(scanned_dirs, verbosity):
print("Watching directories:")
for scanned_dir in scanned_dirs:
print(scanned_dir)
print("\nPress Control+C to exit.\n")
compilers = registry.get_compilers().values()
observer = observers.Observer()
for scanned_dir in scanned_dirs:
handler = EventHandler(scanned_dir, verbosity, compilers)
observer.schedule(handler, path=scanned_dir, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()