Skip to content

Commit

Permalink
fix(mssql): dont yield from inside a cursor
Browse files Browse the repository at this point in the history
Continuing in the efforts against this heisenbug.

Previous fix didn't do the trick.

We noticed two failure modes:
1. the LHS is wrong (expr.execute())
2. the RHS is wrong (wrote the wrong number of rows to a file)

both of these point, at their core, to some issue with the cursor, since
the mssql backend relies on a SQLAlchemy cursor for feeding both
`execute` and `to_pyarrow_batches` (which itself feeds `to_parquet` and
`to_csv`).

We found this:
http://www.pymssql.org/en/stable/pymssql_examples.html#important-note-about-cursors

which lets us know that we _cannot_ have multiple cursors on the same
connection.

Attempting to close the cursor after the `yield` loop doesn't work.

So instead, for now, this terrible workaround.
We fetch the entire contents, then re-batch it using `toolz`.

We should noodle on this some more, but this is one possible "fix"
  • Loading branch information
gforsyth authored and cpcloud committed Jun 23, 2023
1 parent 27d4a8d commit 4af0731
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion ibis/backends/mssql/__init__.py
Expand Up @@ -2,16 +2,18 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Any, Iterable, Literal, Mapping

import sqlalchemy as sa
import toolz

from ibis.backends.base.sql.alchemy import BaseAlchemyBackend
from ibis.backends.mssql.compiler import MsSqlCompiler
from ibis.backends.mssql.datatypes import _type_from_result_set_info

if TYPE_CHECKING:
import ibis.expr.schema as sch
import ibis.expr.types as ir


class Backend(BaseAlchemyBackend):
Expand Down Expand Up @@ -79,3 +81,22 @@ def _table_from_schema(
return super()._table_from_schema(
temp * "#" + name, schema=schema, database=database, temp=False
)

def _cursor_batches(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
) -> Iterable[list]:
self._run_pre_execute_hooks(expr)
query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params)
sql = query_ast.compile()

with self._safe_raw_sql(sql) as cursor:
# this is expensive for large result sets
#
# see https://github.com/ibis-project/ibis/pull/6513
batch = cursor.fetchall()

yield from toolz.partition_all(chunk_size, batch)

0 comments on commit 4af0731

Please sign in to comment.