This repository has been archived by the owner on Feb 13, 2020. It is now read-only.
/
dbapiclient.py
446 lines (347 loc) · 15.1 KB
/
dbapiclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
##
# Copyright (c) 2010-2016 Apple Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
"""
General utility client code for interfacing with DB-API 2.0 modules.
"""
from twext.enterprise.util import mapOracleOutputType
from twext.python.filepath import CachingFilePath
from txdav.common.icommondatastore import InternalDataStoreError
import pg8000 as postgres
import six
try:
import os
# In order to encode and decode values going to and from the database,
# cx_Oracle depends on Oracle's NLS support, which in turn relies upon
# libclntsh's reading of environment variables. It doesn't matter what the
# database language is; the database may contain iCalendar data in many
# languages, but we MUST set NLS_LANG to a value that includes an encoding
# (character set?) that includes all of Unicode, so that the connection can
# encode and decode any valid unicode data. This is not to encode and
# decode bytes, but rather, to faithfully relay Python unicode strings to
# the database. The default connection encoding is US-ASCII, which is
# definitely no good. NLS_LANG needs to be set before the first call to
# connect(), not actually before the module gets imported, but this is as
# good a place as any. I am explicitly setting this rather than inheriting
# it, because it's not a configuration value in the sense that multiple
# values may possibly be correct; _only_ UTF-8 is ever correct to work with
# our software, and other values will fail CalDAVTester. (The state is,
# however, process-global; after the first call to connect(), all
# subsequent connections inherit this encoding even if the environment
# variable changes.) -glyph
os.environ['NLS_LANG'] = '.AL32UTF8'
import cx_Oracle
except ImportError:
cx_Oracle = None
class DiagnosticCursorWrapper(object):
"""
Diagnostic wrapper around a DB-API 2.0 cursor for debugging connection
status.
"""
def __init__(self, realCursor, connectionWrapper):
self.realCursor = realCursor
self.connectionWrapper = connectionWrapper
@property
def rowcount(self):
return self.realCursor.rowcount
@property
def description(self):
return self.realCursor.description
def execute(self, sql, args=()):
self.connectionWrapper.state = 'executing %r' % (sql,)
# Use log.debug
# sys.stdout.write(
# "Really executing SQL %r in thread %r\n" %
# ((sql % tuple(args)), thread.get_ident())
# )
self.realCursor.execute(sql, args)
def close(self):
self.realCursor.close()
def fetchall(self):
results = self.realCursor.fetchall()
# Use log.debug
# sys.stdout.write(
# "Really fetching results %r thread %r\n" %
# (results, thread.get_ident())
# )
return results
class OracleCursorWrapper(DiagnosticCursorWrapper):
"""
Wrapper for cx_Oracle DB-API connections which implements fetchall() to read
all CLOB objects into strings.
"""
def fetchall(self):
accum = []
for row in self.realCursor:
newRow = []
for column in row:
newRow.append(mapOracleOutputType(column))
accum.append(newRow)
return accum
def var(self, *args):
"""
Create a cx_Oracle variable bound to this cursor. (Forwarded in
addition to the standard methods so that implementors of
L{IDerivedParameter} do not need to be specifically aware of this
layer.)
"""
return self.realCursor.var(*args)
def execute(self, sql, args=()):
realArgs = []
for arg in args:
if isinstance(arg, str):
# We use NCLOB everywhere, so cx_Oracle requires a unicode-type
# input. But we mostly pass around utf-8 encoded bytes at the
# application layer as they consume less memory, so do the
# conversion here.
arg = arg.decode('utf-8')
if isinstance(arg, unicode) and len(arg) > 1024:
# This *may* cause a type mismatch, but none of the non-CLOB
# strings that we're passing would allow a value this large
# anyway. Smaller strings will be automatically converted by
# the bindings; larger ones will generate an error. I'm not
# sure why cx_Oracle itself doesn't just do the following hack
# automatically and internally for larger values too, but, here
# it is:
v = self.var(cx_Oracle.NCLOB, len(arg) + 1)
v.setvalue(0, arg)
else:
v = arg
realArgs.append(v)
return super(OracleCursorWrapper, self).execute(sql, realArgs)
def callproc(self, name, args=()):
return self.realCursor.callproc(name, args)
def callfunc(self, name, returnType, args=()):
return self.realCursor.callfunc(name, returnType, args)
class DiagnosticConnectionWrapper(object):
"""
Diagnostic wrapper around a DB-API 2.0 connection for debugging connection
status.
"""
wrapper = DiagnosticCursorWrapper
def __init__(self, realConnection, label):
self.realConnection = realConnection
self.label = label
self.state = 'idle (start)'
def cursor(self):
return self.wrapper(self.realConnection.cursor(), self)
def close(self):
self.realConnection.close()
self.state = 'closed'
def commit(self):
self.realConnection.commit()
self.state = 'idle (after commit)'
def rollback(self):
self.realConnection.rollback()
self.state = 'idle (after rollback)'
class DBAPIParameters(object):
"""
Object that holds the parameters needed to configure a DBAPIConnector. Since this varies based on
the actual DB module in use, this class abstracts the parameters into separate properties that
are then used to create the actual parameters for each module.
"""
def __init__(self, endpoint=None, user=None, password=None, database=None, ssl=False, **kwargs):
"""
@param endpoint: endpoint string describing the connection
@type endpoint: L{str}
@param user: user name to connect as
@type user: L{str}
@param password: password to use
@type password: L{str}
@param database: database name to connect to
@type database: L{str}
"""
self.endpoint = endpoint
if self.endpoint.startswith("unix:"):
self.unixsocket = self.endpoint[5:]
if ":" in self.unixsocket:
self.unixsocket, self.port = self.unixsocket.split(":")
else:
self.port = None
self.host = None
elif self.endpoint.startswith("tcp:"):
self.unixsocket = None
self.host = self.endpoint[4:]
if ":" in self.host:
self.host, self.port = self.host.split(":")
else:
self.port = None
self.user = user
self.password = password
self.database = database
self.ssl = ssl
class DBAPIConnector(object):
"""
A simple wrapper for DB-API connectors.
@ivar dbModule: the DB-API module to use.
"""
wrapper = DiagnosticConnectionWrapper
def __init__(self, dbModule, preflight, *connectArgs, **connectKw):
self.dbModule = dbModule
self.connectArgs = connectArgs
self.connectKw = connectKw
self.preflight = preflight
def connect(self, label="<unlabeled>"):
connection = self.dbModule.connect(*self.connectArgs, **self.connectKw)
w = self.wrapper(connection, label)
self.preflight(w, **self.connectKw)
return w
@staticmethod
def connectorFor(dbtype, **kwargs):
if dbtype == "postgres":
return DBAPIConnector._connectorFor_module(postgres, **kwargs)
elif dbtype == "oracle":
return DBAPIConnector._connectorFor_module(cx_Oracle, **kwargs)
else:
raise InternalDataStoreError(
"Unknown database type: {}".format(dbtype)
)
@staticmethod
def _connectorFor_module(dbmodule, **kwargs):
m = getattr(DBAPIConnector, "_connectorFor_{}".format(dbmodule.__name__), None)
if m is None:
raise InternalDataStoreError(
"Unknown DBAPI module: {}".format(dbmodule)
)
return m(dbmodule, **kwargs)
@staticmethod
def _connectorFor_pgdb(dbmodule, **kwargs):
"""
Turn properties into pgdb kwargs
"""
params = DBAPIParameters(**kwargs)
dsn = "{0.host}:dbname={0.database}:{0.user}:{0.password}::".format(params)
dbkwargs = {}
if params.port:
dbkwargs["host"] = "{}:{}".format(params.host, params.port)
if "txnTimeoutSeconds" in kwargs:
dbkwargs["txnTimeoutSeconds"] = kwargs["txnTimeoutSeconds"]
return DBAPIConnector(postgres, postgresPreflight, dsn, **dbkwargs)
@staticmethod
def _connectorFor_pg8000(dbmodule, **kwargs):
"""
Turn properties into pg8000 kwargs
"""
params = DBAPIParameters(**kwargs)
dbkwargs = {
"user": params.user,
"password": params.password,
"database": params.database,
}
if params.ssl:
dbkwargs["ssl"] = params.ssl
if params.unixsocket:
dbkwargs["unix_sock"] = params.unixsocket
# We're using a socket file
socketFP = CachingFilePath(dbkwargs["unix_sock"])
if socketFP.isdir():
# We have been given the directory, not the actual socket file
socketFP = socketFP.child(".s.PGSQL.{}".format(params.port if params.port else "5432"))
dbkwargs["unix_sock"] = socketFP.path
if not socketFP.isSocket():
raise InternalDataStoreError(
"No such socket file: {}".format(socketFP.path)
)
else:
dbkwargs["host"] = params.host
if params.port:
dbkwargs["port"] = int(params.port)
if "txnTimeoutSeconds" in kwargs:
dbkwargs["txnTimeoutSeconds"] = kwargs["txnTimeoutSeconds"]
return DBAPIConnector(dbmodule, pg8000Preflight, **dbkwargs)
@staticmethod
def _connectorFor_cx_Oracle(self, **kwargs):
"""
Turn properties into DSN string
"""
dsn = "{0.user}/{0.password}@{0.host}:{0.port}/{0.database}".format(DBAPIParameters(**kwargs))
return OracleConnector(dsn)
class OracleConnectionWrapper(DiagnosticConnectionWrapper):
wrapper = OracleCursorWrapper
class OracleConnector(DBAPIConnector):
"""
A connector for cx_Oracle connections, with some special-cased behavior to
make it work more like other DB-API bindings.
Note: this is currently necessary to make our usage of twext.enterprise.dal
work with cx_Oracle, and should be factored somewhere higher-level.
"""
wrapper = OracleConnectionWrapper
def __init__(self, dsn):
super(OracleConnector, self).__init__(
cx_Oracle, oraclePreflight, dsn, threaded=True)
def oraclePreflight(connection, **kwargs):
"""
Pre-flight function for Oracle connections: set the timestamp format to be
something closely resembling our default assumption from Postgres.
"""
c = connection.cursor()
c.execute(
"alter session set NLS_TIMESTAMP_FORMAT = "
"'YYYY-MM-DD HH24:MI:SS.FF'"
)
c.execute(
"alter session set NLS_TIMESTAMP_TZ_FORMAT = "
"'YYYY-MM-DD HH:MI:SS.FF+TZH:TZM'"
)
connection.commit()
c.close()
def postgresPreflight(connection, **kwargs):
"""
Pre-flight function for PostgreSQL connections: enable standard conforming
strings, and set a non-infinite statement timeout.
"""
c = connection.cursor()
# Turn on standard conforming strings. This option is _required_ if
# you want to get correct behavior out of parameter-passing with the
# pgdb module. If it is not set then the server is potentially
# vulnerable to certain types of SQL injection.
c.execute("set standard_conforming_strings=on")
# Abort any second that takes more than 30 seconds (default) to
# execute. This is necessary as a temporary workaround since it's
# hypothetically possible that different database operations could
# block each other, while executing SQL in the same process (in the
# same thread, since SQL executes in the main thread now). It's
# preferable to see some exceptions while we're in this state than to
# have the entire worker process hang.
c.execute("set statement_timeout={}".format(kwargs.get("txnTimeoutSeconds", 30) * 1000))
# pgdb (as per DB-API 2.0) automatically puts the connection into a
# 'executing a transaction' state when _any_ statement is executed on
# it (even these not-touching-any-data statements); make sure to commit
# first so that the application sees a fresh transaction, and the
# connection can safely be pooled without executing anything on it.
connection.commit()
c.close()
def pg8000Preflight(connection, **kwargs):
"""
Pre-flight function for pg8000/PostgreSQL connections: setup type mappings
in addition to the normal postgres preflight.
"""
# Do the base PostgreSQL preflight
postgresPreflight(connection, **kwargs)
# Patch pg8000 behavior to match what we need wrt text processing
def my_text_out(v):
return v.encode("utf-8") if isinstance(v, unicode) else str(v)
connection.realConnection.py_types[str] = (705, postgres.core.FC_TEXT, my_text_out)
connection.realConnection.py_types[six.text_type] = (705, postgres.core.FC_TEXT, my_text_out)
def my_text_recv(data, offset, length):
return str(data[offset: offset + length])
connection.realConnection.default_factory = lambda: (postgres.core.FC_TEXT, my_text_recv)
connection.realConnection.pg_types[19] = (postgres.core.FC_BINARY, my_text_recv)
connection.realConnection.pg_types[25] = (postgres.core.FC_BINARY, my_text_recv)
connection.realConnection.pg_types[705] = (postgres.core.FC_BINARY, my_text_recv)
connection.realConnection.pg_types[829] = (postgres.core.FC_TEXT, my_text_recv)
connection.realConnection.pg_types[1042] = (postgres.core.FC_BINARY, my_text_recv)
connection.realConnection.pg_types[1043] = (postgres.core.FC_BINARY, my_text_recv)
connection.realConnection.pg_types[2275] = (postgres.core.FC_BINARY, my_text_recv)