-
Notifications
You must be signed in to change notification settings - Fork 145
/
sqlalchemyfdw.py
530 lines (440 loc) · 17.5 KB
/
sqlalchemyfdw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
"""
Purpose
-------
This fdw can be used to access data stored in a remote RDBMS.
Through the use of sqlalchemy, many different rdbms engines are supported.
.. api_compat::
:read:
:write:
:transaction:
:import_schema:
Dependencies
------------
You will need the `sqlalchemy`_ library, as well as a suitable dbapi driver for
the remote database.
You can find a list of supported RDBMs, and their associated dbapi drivers and
connection strings in the `sqlalchemy dialects documentation`_.
.. _sqlalchemy dialects documentation: http://docs.sqlalchemy.org/en/latest/dialects/
.. _sqlalchemy: http://www.sqlalchemy.org/
Connection options
~~~~~~~~~~~~~~~~~~
Connection options can be passed either with a db-url, or with a combination of
individual connection parameters.
If both a ``db_url`` and individual parameters are used, the parameters will override
the value found in the ``db_url``.
In both cases, at least the ``drivername`` should be passed, either as the url scheme in
the ``db_url`` or using the ``drivername`` parameter.
``db_url``
An sqlalchemy connection string.
Examples:
- mysql: `mysql://<user>:<password>@<host>/<dbname>`
- mssql: `mssql://<user>:<password>@<dsname>`
See the `sqlalchemy dialects documentation`_. for documentation.
``username``
The remote username.
``password``
The remote password
``host``
The remote host
``database``
The remote database
``port``
The remote port
Other options
---------------
``tablename`` (required)
The table name in the remote RDBMS.
``primary_key``
Identifies a column which is a primary key in the remote RDBMS.
This options is required for INSERT, UPDATE and DELETE operations
``schema``
The schema in which this table resides on the remote side
When defining the table, the local column names will be used to retrieve the
remote column data.
Moreover, the local column types will be used to interpret the results in the
remote table. Sqlalchemy being aware of the differences between database
implementations, it will convert each value from the remote database to python
using the converter inferred from the column type, and convert it back to a
postgresql suitable form.
What does it do to reduce the amount of fetched data ?
------------------------------------------------------
- `quals` are pushed to the remote database whenever possible. This include
simple operators :
- equality, inequality (=, <>, >, <, <=, >=)
- like, ilike and their negations
- IN clauses with scalars, = ANY (array)
- NOT IN clauses, != ALL (array)
- the set of needed columns is pushed to the remote_side, and only those columns
will be fetched.
Sort push-down support
----------------------
Since the rules about NULL ordering are different for every database vendor, and
many of them don't support the NULLS FIRST, NULLS LAST clause, this FDW tries
to not generate any NULLS FIRST / LAST clause if the requested order matches
what the remote system would do by default.
Additionnaly, if it is found that a query can't be executed while keeping the
same NULL ordering (because the remote system doesn't support the NULL ordering
clause), the sort will not be pushed down.
To check the SQL query that will be sent to the remote system, use EXPLAIN:
.. code-block:: sql
postgres=# explain select * from testalchemy order by id DESC NULLS FIRST;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------
Foreign Scan on testalchemy (cost=20.00..50000000000.00 rows=100000000 width=500)
Multicorn: SELECT basetable.atimestamp, basetable.anumeric, basetable.adate, basetable.avarchar, basetable.id
FROM basetable ORDER BY basetable.id DESC
(3 lignes)
Temps : 167,856 ms
postgres=# explain select * from testalchemy order by id DESC NULLS LAST;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------
Foreign Scan on testalchemy (cost=20.00..50000000000.00 rows=100000000 width=500)
Multicorn: SELECT basetable.atimestamp, basetable.anumeric, basetable.adate, basetable.avarchar, basetable.id
FROM basetable ORDER BY basetable.id DESC NULLS LAST
(3 lignes)
Usage example
-------------
For a connection to a remote mysql database (you'll need a mysql dbapi driver,
such as pymysql):
.. code-block:: sql
CREATE SERVER alchemy_srv foreign data wrapper multicorn options (
wrapper 'multicorn.sqlalchemyfdw.SqlAlchemyFdw'
);
create foreign table mysql_table (
column1 integer,
column2 varchar
) server alchemy_srv options (
tablename 'table',
db_url 'mysql://myuser:mypassword@myhost/mydb'
);
"""
from . import ForeignDataWrapper, TableDefinition, ColumnDefinition
from .utils import log_to_postgres, ERROR, WARNING, DEBUG
from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url, URL
from sqlalchemy.sql import select, operators as sqlops, and_
from sqlalchemy.sql.expression import nullsfirst, nullslast
# Handle the sqlalchemy 0.8 / 0.9 changes
try:
from sqlalchemy.sql import sqltypes
except ImportError:
from sqlalchemy import types as sqltypes
# Handle the sqlalchemy ARRAY import change
try:
from sqlalchemy.dialects.postgresql.array import ARRAY
except ImportError:
from sqlalchemy.dialects.postgresql.base import ARRAY
from sqlalchemy.schema import Table, Column, MetaData
from sqlalchemy.dialects.mssql import base as mssql_dialect
from sqlalchemy.dialects.oracle import base as oracle_dialect
from sqlalchemy.dialects.postgresql.base import (
ischema_names, PGDialect, NUMERIC, SMALLINT, VARCHAR, TIMESTAMP, BYTEA,
BOOLEAN, TEXT)
import re
import operator
def compose(*funs):
if len(funs) == 0:
raise ValueError("At least one function is necessary for compose")
if len(funs) == 1:
return funs[0]
else:
result_fun = compose(*funs[1:])
return lambda *args, **kwargs: funs[0](result_fun(*args, **kwargs))
def not_(function):
return compose(operator.inv, function)
def _parse_url_from_options(fdw_options):
if fdw_options.get('db_url'):
url = make_url(fdw_options.get('db_url'))
else:
if 'drivername' not in fdw_options:
log_to_postgres('Either a db_url, or drivername and other '
'connection infos are needed', ERROR)
url = URL(fdw_options['drivername'])
for param in ('username', 'password', 'host',
'database', 'port'):
if param in fdw_options:
setattr(url, param, fdw_options[param])
return url
OPERATORS = {
'=': operator.eq,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
'<>': operator.ne,
'~~': sqlops.like_op,
'~~*': sqlops.ilike_op,
'!~~*': not_(sqlops.ilike_op),
'!~~': not_(sqlops.like_op),
('=', True): sqlops.in_op,
('<>', False): not_(sqlops.in_op)
}
def basic_converter(new_type):
def converter(c):
old_args = c.type.__dict__
c.type = new_type()
c.type.__dict__.update(old_args)
return converter
def length_stripper(new_type):
first = basic_converter(new_type)
def converter(c):
first(c)
c.type.__dict__['length'] = None
return converter
CONVERSION_MAP = {
oracle_dialect.NUMBER: basic_converter(NUMERIC),
mssql_dialect.TINYINT: basic_converter(SMALLINT),
mssql_dialect.NVARCHAR: basic_converter(VARCHAR),
mssql_dialect.DATETIME: basic_converter(TIMESTAMP),
mssql_dialect.VARBINARY: basic_converter(BYTEA),
mssql_dialect.IMAGE: basic_converter(BYTEA),
mssql_dialect.BIT: basic_converter(BOOLEAN),
mssql_dialect.TEXT: length_stripper(TEXT)
}
SORT_SUPPORT = {
'mssql': {'default': 'lower', 'support': False},
'postgresql': {'default': 'higher', 'support': True},
'mysql': {'default': 'lower', 'support': False},
'oracle': {'default': 'higher', 'support': True},
'sqlite': {'default': 'lower', 'support': False}
}
class SqlAlchemyFdw(ForeignDataWrapper):
"""An SqlAlchemy foreign data wrapper.
The sqlalchemy foreign data wrapper performs simple selects on a remote
database using the sqlalchemy framework.
Accepted options:
db_url -- the sqlalchemy connection string.
schema -- (optional) schema name to qualify table name with
tablename -- the table name in the remote database.
"""
def __init__(self, fdw_options, fdw_columns):
super(SqlAlchemyFdw, self).__init__(fdw_options, fdw_columns)
if 'tablename' not in fdw_options:
log_to_postgres('The tablename parameter is required', ERROR)
self.metadata = MetaData()
url = _parse_url_from_options(fdw_options)
self.engine = create_engine(url)
schema = fdw_options['schema'] if 'schema' in fdw_options else None
tablename = fdw_options['tablename']
sqlacols = []
for col in fdw_columns.values():
col_type = self._get_column_type(col.type_name)
sqlacols.append(Column(col.column_name, col_type))
self.table = Table(tablename, self.metadata, schema=schema,
*sqlacols)
self.transaction = None
self._connection = None
self._row_id_column = fdw_options.get('primary_key', None)
def _need_explicit_null_ordering(self, key):
support = SORT_SUPPORT[self.engine.dialect.name]
default = support['default']
no = None
if key.is_reversed:
no = nullsfirst if default == 'higher' else nullslast
else:
no = nullslast if default == 'higher' else nullsfirst
if key.nulls_first:
if no != nullsfirst:
return nullsfirst
return None
else:
if no != nullslast:
return nullslast
return None
def can_sort(self, sortkeys):
if SORT_SUPPORT.get(self.engine.dialect.name) is None:
# We have no idea about defaults
return []
can_order_null = SORT_SUPPORT[self.engine.dialect.name]['support']
if (any((self._need_explicit_null_ordering(x) is not None
for x in sortkeys)) and not can_order_null):
return []
return sortkeys
def explain(self, quals, columns, sortkeys=None, verbose=False):
sortkeys = sortkeys or []
statement = self._build_statement(quals, columns, sortkeys)
return [str(statement)]
def _build_statement(self, quals, columns, sortkeys):
statement = select([self.table])
clauses = []
for qual in quals:
operator = OPERATORS.get(qual.operator, None)
if operator:
clauses.append(operator(self.table.c[qual.field_name],
qual.value))
else:
log_to_postgres('Qual not pushed to foreign db: %s' % qual,
WARNING)
if clauses:
statement = statement.where(and_(*clauses))
if columns:
columns = [self.table.c[col] for col in columns]
else:
columns = self.table.c
statement = statement.with_only_columns(columns)
orders = []
for sortkey in sortkeys:
column = self.table.c[sortkey.attname]
if sortkey.is_reversed:
column = column.desc()
if sortkey.collate:
column = column.collate('"%s"' % sortkey.collate)
null_ordering = self._need_explicit_null_ordering(sortkey)
if null_ordering:
column = null_ordering(column)
statement = statement.order_by(column)
return statement
def execute(self, quals, columns, sortkeys=None):
"""
The quals are turned into an and'ed where clause.
"""
sortkeys = sortkeys or []
statement = self._build_statement(quals, columns, sortkeys)
log_to_postgres(str(statement), DEBUG)
rs = (self.connection
.execution_options(stream_results=True)
.execute(statement))
# Workaround pymssql "trash old results on new query"
# behaviour (See issue #100)
if self.engine.driver == 'pymssql' and self.transaction is not None:
rs = list(rs)
for item in rs:
yield dict(item)
@property
def connection(self):
if self._connection is None:
self._connection = self.engine.connect()
return self._connection
def begin(self, serializable):
self.transaction = self.connection.begin()
def pre_commit(self):
if self.transaction is not None:
self.transaction.commit()
self.transaction = None
def commit(self):
# Pre-commit hook does this on 9.3
if self.transaction is not None:
self.transaction.commit()
self.transaction = None
def rollback(self):
if self.transaction is not None:
self.transaction.rollback()
self.transaction = None
@property
def rowid_column(self):
if self._row_id_column is None:
log_to_postgres(
'You need to declare a primary key option in order '
'to use the write features')
return self._row_id_column
def insert(self, values):
self.connection.execute(self.table.insert(values=values))
def update(self, rowid, newvalues):
self.connection.execute(
self.table.update()
.where(self.table.c[self._row_id_column] == rowid)
.values(newvalues))
def delete(self, rowid):
self.connection.execute(
self.table.delete()
.where(self.table.c[self._row_id_column] == rowid))
def _get_column_type(self, format_type):
"""Blatant ripoff from PG_Dialect.get_column_info"""
# strip (*) from character varying(5), timestamp(5)
# with time zone, geometry(POLYGON), etc.
attype = re.sub(r'\(.*\)', '', format_type)
# strip '[]' from integer[], etc.
attype = re.sub(r'\[\]', '', attype)
is_array = format_type.endswith('[]')
charlen = re.search('\(([\d,]+)\)', format_type)
if charlen:
charlen = charlen.group(1)
args = re.search('\((.*)\)', format_type)
if args and args.group(1):
args = tuple(re.split('\s*,\s*', args.group(1)))
else:
args = ()
kwargs = {}
if attype == 'numeric':
if charlen:
prec, scale = charlen.split(',')
args = (int(prec), int(scale))
else:
args = ()
elif attype == 'double precision':
args = (53, )
elif attype == 'integer':
args = ()
elif attype in ('timestamp with time zone',
'time with time zone'):
kwargs['timezone'] = True
if charlen:
kwargs['precision'] = int(charlen)
args = ()
elif attype in ('timestamp without time zone',
'time without time zone', 'time'):
kwargs['timezone'] = False
if charlen:
kwargs['precision'] = int(charlen)
args = ()
elif attype == 'bit varying':
kwargs['varying'] = True
if charlen:
args = (int(charlen),)
else:
args = ()
elif attype in ('interval', 'interval year to month',
'interval day to second'):
if charlen:
kwargs['precision'] = int(charlen)
args = ()
elif charlen:
args = (int(charlen),)
coltype = ischema_names.get(attype, None)
if coltype:
coltype = coltype(*args, **kwargs)
if is_array:
coltype = ARRAY(coltype)
else:
coltype = sqltypes.NULLTYPE
return coltype
@classmethod
def import_schema(self, schema, srv_options, options,
restriction_type, restricts):
"""
Reflects the remote schema.
"""
metadata = MetaData()
url = _parse_url_from_options(srv_options)
engine = create_engine(url)
dialect = PGDialect()
if restriction_type == 'limit':
only = restricts
elif restriction_type == 'except':
only = lambda t, _: t not in restricts
else:
only = None
metadata.reflect(bind=engine,
schema=schema,
views=True,
only=only)
to_import = []
for _, table in sorted(metadata.tables.items()):
ftable = TableDefinition(table.name)
ftable.options['schema'] = schema
ftable.options['tablename'] = table.name
for c in table.c:
# Force collation to None to prevent imcompatibilities
setattr(c.type, "collation", None)
# If the type is specialized, call the generic
# superclass method
if type(c.type) in CONVERSION_MAP:
converter = CONVERSION_MAP[type(c.type)]
converter(c)
if c.primary_key:
ftable.options['primary_key'] = c.name
ftable.columns.append(ColumnDefinition(
c.name,
type_name=c.type.compile(dialect)))
to_import.append(ftable)
return to_import