/
meta.py
165 lines (136 loc) · 5.74 KB
/
meta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# encoding: utf-8
import datetime
from paste.deploy.converters import asbool
from ckan.common import config
"""SQLAlchemy Metadata and Session object"""
from sqlalchemy import MetaData, and_
import sqlalchemy.orm as orm
from sqlalchemy.orm.session import SessionExtension
import extension
import ckan.lib.activity_streams_session_extension as activity
__all__ = ['Session', 'engine_is_sqlite', 'engine_is_pg']
class CkanCacheExtension(SessionExtension):
''' This extension checks what tables have been affected by
database access and allows us to act on them. Currently this is
used by the page cache to flush the cache when data in the database
is altered. '''
def __init__(self, *args, **kw):
super(CkanCacheExtension, self).__init__(*args, **kw)
# Setup Redis support if needed.
self.use_redis = asbool(config.get('ckan.page_cache_enabled'))
if self.use_redis:
import redis
self.redis = redis
self.redis_connection is None
self.redis_exception = redis.exceptions.ConnectionError
def after_commit(self, session):
if hasattr(session, '_object_cache'):
oc = session._object_cache
oc_list = oc['new']
oc_list.update(oc['changed'])
oc_list.update(oc['deleted'])
objs = set()
for item in oc_list:
objs.add(item.__class__.__name__)
# Flush Redis
if self.use_redis:
if self.redis_connection is None:
try:
self.redis_connection = self.redis.StrictRedis()
except self.redis_exception:
pass
try:
self.redis_connection.flushdb()
except self.redis_exception:
pass
class CkanSessionExtension(SessionExtension):
def before_flush(self, session, flush_context, instances):
if not hasattr(session, '_object_cache'):
session._object_cache= {'new': set(),
'deleted': set(),
'changed': set()}
changed = [obj for obj in session.dirty if
session.is_modified(obj, include_collections=False, passive=True)]
session._object_cache['new'].update(session.new)
session._object_cache['deleted'].update(session.deleted)
session._object_cache['changed'].update(changed)
def before_commit(self, session):
session.flush()
try:
obj_cache = session._object_cache
revision = session.revision
except AttributeError:
return
if getattr(session, 'revisioning_disabled', False):
return
new = obj_cache['new']
changed = obj_cache['changed']
deleted = obj_cache['deleted']
for obj in new | changed | deleted:
if not hasattr(obj, '__revision_class__'):
continue
revision_cls = obj.__revision_class__
revision_table = orm.class_mapper(revision_cls).mapped_table
## when a normal active transaction happens
### this is an sql statement as we do not want it in object cache
session.execute(
revision_table.update().where(
and_(revision_table.c.id == obj.id,
revision_table.c.current == '1')
).values(current='0')
)
q = session.query(revision_cls)
q = q.filter_by(expired_timestamp=datetime.datetime(9999, 12, 31), id=obj.id)
results = q.all()
for rev_obj in results:
values = {}
if rev_obj.revision_id == revision.id:
values['revision_timestamp'] = revision.timestamp
else:
values['expired_timestamp'] = revision.timestamp
session.execute(
revision_table.update().where(
and_(revision_table.c.id == rev_obj.id,
revision_table.c.revision_id == rev_obj.revision_id)
).values(**values)
)
def after_commit(self, session):
if hasattr(session, '_object_cache'):
del session._object_cache
def after_rollback(self, session):
if hasattr(session, '_object_cache'):
del session._object_cache
# __all__ = ['Session', 'engine', 'metadata', 'mapper']
# SQLAlchemy database engine. Updated by model.init_model()
engine = None
Session = orm.scoped_session(orm.sessionmaker(
autoflush=False,
autocommit=False,
expire_on_commit=False,
extension=[CkanCacheExtension(),
CkanSessionExtension(),
extension.PluginSessionExtension(),
activity.DatasetActivitySessionExtension()],
))
create_local_session = orm.sessionmaker(
autoflush=False,
autocommit=False,
expire_on_commit=False,
extension=[CkanCacheExtension(),
CkanSessionExtension(),
extension.PluginSessionExtension(),
activity.DatasetActivitySessionExtension()],
)
#mapper = Session.mapper
mapper = orm.mapper
# Global metadata. If you have multiple databases with overlapping table
# names, you'll need a metadata for each database
metadata = MetaData()
def engine_is_sqlite(sa_engine=None):
# Returns true iff the engine is connected to a sqlite database.
return (sa_engine or engine).url.drivername == 'sqlite'
def engine_is_pg(sa_engine=None):
# Returns true iff the engine is connected to a postgresql database.
# According to http://docs.sqlalchemy.org/en/latest/core/engines.html#postgresql
# all Postgres driver names start with `postgres`
return (sa_engine or engine).url.drivername.startswith('postgres')