-
Notifications
You must be signed in to change notification settings - Fork 0
/
views.py
105 lines (69 loc) · 2.63 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Limited ``sqlalchemy`` support for views based on https://github.com/sqlalchemy/sqlalchemy/wiki/Views."""
import functools
import logging
import types
import sqlalchemy as sa
import sqlalchemy.ext.compiler
from .. import _globals
__all__ = ['register_view',
'create_all_views',
'view',
'make_table']
REGISTERED = {}
DDL = {}
TABLES = types.SimpleNamespace()
log = logging.getLogger(__name__)
def register_view(name, **kwargs):
log.debug('register_view(%r)', name)
assert name not in REGISTERED
def decorator(func):
REGISTERED[name] = functools.partial(func, **kwargs)
return func
return decorator
def create_all_views(*, clear=False):
log.debug('run create_view() for %d views in REGISTERED', len(REGISTERED))
for name, func in REGISTERED.items():
table = view(name, selectable=func(), clear=clear)
setattr(TABLES, name, table)
def view(name, selectable, *, clear=False):
"""Register a CREATE and DROP VIEW DDL for the given selectable."""
log.debug('view(%r, clear=%r)', name, clear)
if clear:
DDL[name] = None, None
return None
DDL[name] = (CreateView(name, selectable),
DropView(name))
return make_table(selectable, name=name)
def make_table(selectable, *, name='view_table'):
table = sa.table(name)
for c in selectable.alias().c:
_, col = c._make_proxy(table)
table.append_column(col)
return table
@sa.event.listens_for(_globals.REGISTRY.metadata, 'after_create')
def after_create(target, bind, **kwargs):
for name, (create_view, _) in DDL.items():
if create_view is not None:
log.debug('CREATE VIEW %r', name)
create_view(target, bind)
@sa.event.listens_for(_globals.REGISTRY.metadata, 'before_drop')
def before_drop(target, bind, **kwargs):
for name, (_, drop_view) in DDL.items():
if drop_view is not None:
log.debug('DROP VIEW %r', name)
drop_view(target, bind)
class CreateView(sa.schema.DDLElement):
def __init__(self, name, selectable):
self.name = name
self.selectable = selectable
class DropView(sa.schema.DDLElement):
def __init__(self, name):
self.name = name
@sa.ext.compiler.compiles(CreateView)
def compile_create_view(element, compiler, **kwargs):
select = compiler.sql_compiler.process(element.selectable,
literal_binds=True)
return f'\nCREATE VIEW {element.name} AS {select}\n'
@sa.ext.compiler.compiles(DropView)
def compile_drop_view(element, compiler, **kwargs):
return f'\nDROP VIEW {element.name}\n'