/
backend_views.py
77 lines (49 loc) · 1.95 KB
/
backend_views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# backend_views - limited sqlalchemy support for views
"""Based on https://github.com/sqlalchemy/sqlalchemy/wiki/Views"""
import logging
import sqlalchemy as sa
import sqlalchemy.ext.compiler
from . import backend as _backend
__all__ = ['view', 'make_table']
DDL = {}
log = logging.getLogger(__name__)
class CreateView(sa.schema.DDLElement):
def __init__(self, name, selectable):
self.name = name
self.selectable = selectable
class DropView(sa.schema.DDLElement):
def __init__(self, name):
self.name = name
@sa.ext.compiler.compiles(CreateView)
def compile_create_view(element, compiler, **kwargs):
select = compiler.sql_compiler.process(element.selectable,
literal_binds=True)
return f'\nCREATE VIEW {element.name} AS {select}\n'
@sa.ext.compiler.compiles(DropView)
def compile_drop_view(element, compiler, **kwargs):
return f'\nDROP VIEW {element.name}\n'
@sa.event.listens_for(_backend.Model.metadata, 'after_create')
def after_create(target, bind, **kwargs):
for name, (create_view, _) in DDL.items():
if create_view is not None:
log.debug('CREATE VIEW %r', name)
create_view(target, bind)
@sa.event.listens_for(_backend.Model.metadata, 'before_drop')
def before_drop(target, bind, **kwargs):
for name, (_, drop_view) in DDL.items():
if drop_view is not None:
log.debug('DROP VIEW %r', name)
drop_view(target, bind)
def view(name, selectable, *, clear=False):
"""Register a CREATE and DROP VIEW DDL for the given selectable."""
log.debug('view(%r, clear=%r)', name, clear)
if clear:
DDL[name] = None, None
return None
DDL[name] = (CreateView(name, selectable), DropView(name))
return make_table(selectable, name=name)
def make_table(selectable, *, name='view_table'):
result = sa.table(name)
for c in selectable.c:
c._make_proxy(result)
return result