-
Notifications
You must be signed in to change notification settings - Fork 2
/
database.py
127 lines (111 loc) · 3.66 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import logging
from typing import Any
from sqlalchemy import (
Column,
Date,
Engine,
ForeignKey,
MetaData,
Numeric,
String,
Table,
Unicode,
UnicodeText,
create_engine,
)
logger = logging.getLogger(__name__)
metadata = MetaData()
persons = Table(
"HR_PERSON_EMPLOYEE_LIMITED",
metadata,
Column("MIT_ID", String),
Column("KRB_NAME_UPPERCASE", String),
Column("FIRST_NAME", Unicode),
Column("LAST_NAME", Unicode),
Column("MIDDLE_NAME", Unicode),
Column("EMAIL_ADDRESS", String),
Column("DATE_TO_FACULTY", Date),
Column("ORIGINAL_HIRE_DATE", Date),
Column("APPOINTMENT_END_DATE", Date),
Column("PERSONNEL_SUBAREA_CODE", String),
Column("JOB_TITLE", String),
Column("HR_ORG_UNIT_ID", String),
)
dlcs = Table(
"HR_ORG_UNIT",
metadata,
Column(
"HR_ORG_UNIT_ID",
String,
ForeignKey("HR_PERSON_EMPLOYEE_LIMITED.HR_ORG_UNIT_ID"),
),
Column("ORG_HIER_SCHOOL_AREA_NAME", String),
Column("DLC_NAME", String),
Column("HR_ORG_LEVEL5_NAME", String),
)
orcids = Table(
"ORCID_TO_MITID",
metadata,
Column("MIT_ID", String, ForeignKey("HR_PERSON_EMPLOYEE_LIMITED.MIT_ID")),
Column("ORCID", String),
)
aa_articles = Table(
"AA_ARTICLE",
metadata,
Column("AA_MATCH_SCORE", Numeric(3, 1)),
Column("ARTICLE_ID", String),
Column("ARTICLE_TITLE", Unicode),
Column("ARTICLE_YEAR", String),
Column("AUTHORS", UnicodeText),
Column("DOI", String),
Column("ISSN_ELECTRONIC", String),
Column("ISSN_PRINT", String),
Column("IS_CONFERENCE_PROCEEDING", String),
Column("JOURNAL_FIRST_PAGE", String),
Column("JOURNAL_LAST_PAGE", String),
Column("JOURNAL_ISSUE", Unicode),
Column("JOURNAL_NAME", Unicode),
Column("JOURNAL_VOLUME", Unicode),
Column("MIT_ID", String),
Column("PUBLISHER", Unicode),
)
class DatabaseEngine:
"""Database engine.
This provides access to an SQLAlchemy database engine. Only one
of these should be created per application. Calling the object
will return the configured engine, though you should generally
use the :func:`~carbon.db.session` to interact with the databse.
"""
_engine = None
def __call__(self) -> Engine:
if self._engine:
return self._engine
nonconfigured_engine_error_message = (
"No SQLAlchemy engine was found. The engine must be created "
"by running 'engine.configure()' with a valid connection string."
)
raise AttributeError(nonconfigured_engine_error_message)
def configure(self, connection_string: str, **kwargs: Any) -> None: # noqa: ANN401
self._engine = self._engine or create_engine(connection_string, **kwargs)
def run_connection_test(self) -> None:
"""Test connection to the Data Warehouse.
Verify that the provided Data Warehouse credentials can be used
to successfully connect to the Data Warehouse.
"""
logger.info("Testing connection to the Data Warehouse")
try:
connection = self._engine.connect() # type: ignore[union-attr]
except Exception as error:
error_message = f"Failed to connect to the Data Warehouse: {error}"
logger.exception(error_message)
raise
else:
dbapi_connection = connection.connection
version = (
dbapi_connection.version if hasattr(dbapi_connection, "version") else ""
)
logger.info(
"Successfully connected to the Data Warehouse: %s",
version, # type: ignore[union-attr]
)
connection.close()