/
mapping.py
117 lines (92 loc) · 4.15 KB
/
mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
This module no longer contains the mapping of data model classes to the
relational database.
The module will be revised during migration from SQLAlchemy Migrate to Alembic.
"""
import logging
from threading import local
from typing import Optional, Type
from sqlalchemy import (
and_,
select,
)
from sqlalchemy.orm import class_mapper, object_session, relation
from galaxy import model
from galaxy.model import mapper_registry
from galaxy.model.base import SharedModelMapping
from galaxy.model.migrate.triggers.update_audit_table import install as install_timestamp_triggers
from galaxy.model.orm.engine_factory import build_engine
from galaxy.model.orm.now import now
from galaxy.model.security import GalaxyRBACAgent
from galaxy.model.view.utils import install_views
log = logging.getLogger(__name__)
metadata = mapper_registry.metadata
class_mapper(model.HistoryDatasetCollectionAssociation).add_property(
"creating_job_associations", relation(model.JobToOutputDatasetCollectionAssociation, viewonly=True))
# Helper methods.
def db_next_hid(self, n=1):
"""
db_next_hid( self )
Override __next_hid to generate from the database in a concurrency safe way.
Loads the next history ID from the DB and returns it.
It also saves the future next_id into the DB.
:rtype: int
:returns: the next history id
"""
session = object_session(self)
table = self.table
if "postgres" not in session.bind.dialect.name:
next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar()
table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n))
else:
stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter)
next_hid = session.execute(stmt).scalar() - n
session.expire(self, ['hid_counter'])
return next_hid
model.History._next_hid = db_next_hid # type: ignore
def _workflow_invocation_update(self):
session = object_session(self)
table = self.table
now_val = now()
stmt = table.update().values(update_time=now_val).where(and_(table.c.id == self.id, table.c.update_time < now_val))
session.execute(stmt)
model.WorkflowInvocation.update = _workflow_invocation_update # type: ignore
class GalaxyModelMapping(SharedModelMapping):
security_agent: GalaxyRBACAgent
thread_local_log: Optional[local]
create_tables: bool
User: Type
GalaxySession: Type
def init(file_path, url, engine_options=None, create_tables=False, map_install_models=False,
database_query_profiling_proxy=False, object_store=None, trace_logger=None, use_pbkdf2=True,
slow_query_log_threshold=0, thread_local_log: Optional[local] = None, log_query_counts=False) -> GalaxyModelMapping:
"""Connect mappings to the database"""
if engine_options is None:
engine_options = {}
# Connect dataset to the file path
model.Dataset.file_path = file_path
# Connect dataset to object store
model.Dataset.object_store = object_store
# Use PBKDF2 password hashing?
model.User.use_pbkdf2 = use_pbkdf2
# Load the appropriate db module
engine = build_engine(url, engine_options, database_query_profiling_proxy, trace_logger, slow_query_log_threshold, thread_local_log=thread_local_log, log_query_counts=log_query_counts)
# Connect the metadata to the database.
metadata.bind = engine
model_modules = [model]
if map_install_models:
import galaxy.model.tool_shed_install.mapping # noqa: F401
from galaxy.model import tool_shed_install
galaxy.model.tool_shed_install.mapping.init(url=url, engine_options=engine_options, create_tables=create_tables)
model_modules.append(tool_shed_install)
result = GalaxyModelMapping(model_modules, engine=engine)
# Create tables if needed
if create_tables:
metadata.create_all()
install_timestamp_triggers(engine)
install_views(engine)
result.create_tables = create_tables
# load local galaxy security policy
result.security_agent = GalaxyRBACAgent(result)
result.thread_local_log = thread_local_log
return result