Skip to content

Commit

Permalink
BUG: allow other connectables to be passed as the bind param
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Jevnik authored and llllllllll committed Oct 24, 2017
1 parent 4dafafa commit ba84238
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
15 changes: 7 additions & 8 deletions odo/backends/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def getbind(t, bind):
if bind is None:
return t.bind

if isinstance(bind, sa.engine.base.Engine):
if isinstance(bind, sa.engine.interfaces.Connectable):
return bind
return create_engine(bind)

Expand Down Expand Up @@ -538,9 +538,9 @@ def select_to_base(sel, dshape=None, bind=None, **kwargs):
@append.register(sa.Table, Iterator)
def append_iterator_to_table(t, rows, dshape=None, bind=None, **kwargs):
assert not isinstance(t, type)
engine = getbind(t, bind)
if not t.exists(bind=engine):
t.create(bind=engine)
bind = getbind(t, bind)
if not t.exists(bind=bind):
t.create(bind=bind)
rows = iter(rows)

# We see if the sequence is of tuples or dicts
Expand All @@ -564,9 +564,9 @@ def append_iterator_to_table(t, rows, dshape=None, bind=None, **kwargs):
names = discover(t).measure.names
rows = (dict(zip(names, row)) for row in rows)

with engine.connect() as conn:
with bind.begin():
for chunk in partition_all(1000, rows): # TODO: 1000 is hardcoded
conn.execute(t.insert(), chunk)
bind.execute(t.insert(), chunk)

return t

Expand Down Expand Up @@ -607,8 +607,7 @@ def append_select_statement_to_sql_Table(t, o, bind=None, **kwargs):

query = t.insert().from_select(o.columns.keys(), o)

with bind.connect() as conn:
conn.execute(query)
bind.execute(query)
return t


Expand Down
27 changes: 27 additions & 0 deletions odo/backends/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,3 +884,30 @@ def test_nullable_foreign_key():

actual = odo(T1, pd.DataFrame)
assert actual.equals(expected)


def test_transaction():
with tmpfile('.db') as fn:
rsc = resource('sqlite:///%s::table' % fn, dshape='var * {a: int}')

data = [(1,), (2,), (3,)]

conn_1 = rsc.bind.connect()
conn_2 = rsc.bind.connect()

trans_1 = conn_1.begin()
conn_2.begin()

odo(data, rsc, bind=conn_1)

# inside the transaction the write should be there
assert odo(rsc, list, bind=conn_1) == data

# outside of a transaction or in a different transaction the write is not
# there
assert odo(rsc, list) == odo(rsc, list, bind=conn_2) == []

trans_1.commit()

# now the data should appear outside the transaction
assert odo(rsc, list) == odo(rsc, list, bind=conn_2) == data

0 comments on commit ba84238

Please sign in to comment.