Skip to content

Commit

Permalink
fix(sqlite): defer db connection until needed (#3127)
Browse files Browse the repository at this point in the history
* fix(sqlite): defer db connection until needed

- to .execute() the same table (or a copy) from multiple threads

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix(api): separate backend creation from configuration

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test(lazyconnect): explicit None to connect

* chore(lazyconnect): add type information; reconnect()

* test(expr): expr from multiple DBs should fail #64

- but expressions from different connections to the same DB should be compatible

* fix(expr): multiple conns to same db should be compatible

- add Backend.datasource to identify database apart from connection object

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix(sqlite): move instance vars into __init__

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* chore(lazyconnect): fix type imports

* fix(lazyconnect): set internal state to None when copy/reconstituting

* fix(lazyconnect): cleaner default datasource string

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor(lazyconnect): set instance vars in constructor

* refactor(lazyconnect): update per PR review comments

* refactor(lazyconnect): rename backend.datasource to .db_identity

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
saulpw and pre-commit-ci[bot] committed Dec 18, 2021
1 parent 4af456e commit 5467afa
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 58 deletions.
51 changes: 49 additions & 2 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import warnings
from typing import Any, Callable

from cached_property import cached_property

import ibis
import ibis.expr.operations as ops
import ibis.expr.schema as sch
Expand Down Expand Up @@ -136,17 +138,62 @@ class BaseBackend(abc.ABC):
database_class = Database
table_class: type[ops.DatabaseTable] = ops.DatabaseTable

def __init__(self, *args, **kwargs):
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs

def __getstate__(self):
return dict(
database_class=self.database_class,
table_class=self.table_class,
_con_args=self._con_args,
_con_kwargs=self._con_kwargs,
)

def __hash__(self):
return hash(self.db_identity)

def __eq__(self, other):
return self.db_identity == other.db_identity

@property
@abc.abstractmethod
def name(self) -> str:
"""
Name of the backend, for example 'sqlite'.
"""

@cached_property
def db_identity(self) -> str:
"""
Identity of the database. Multiple connections to the same
database will have the same db_identity. Default implementation
assumes connection parameters uniquely specify the database.
"""
parts = [self.table_class.__name__]
parts.extend(self._con_args)
parts.extend(f'{k}={v}' for k, v in self._con_kwargs.items())
return '_'.join(map(str, parts))

def connect(self, *args, **kwargs) -> BaseBackend:
"""
Return new client object with saved args/kwargs, having called
.reconnect() on it.
"""
new_backend = self.__class__(*args, **kwargs)
new_backend.reconnect()
return new_backend

def reconnect(self) -> None:
"""
Reconnect to the target database already configured with connect().
"""
self.do_connect(*self._con_args, **self._con_kwargs)

@abc.abstractmethod
def connect(connection_string, **options):
def do_connect(self, *args, **kwargs) -> None:
"""
Connect to the underlying database and return a client object.
Connect to database specified by args and kwargs.
"""

def database(self, name: str = None) -> Database:
Expand Down
8 changes: 3 additions & 5 deletions ibis/backends/base/file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class BaseFileBackend(BaseBackend):

database_class = FileDatabase

def connect(self, path):
def do_connect(self, path):
"""Create a Client for use with Ibis
Parameters
Expand All @@ -67,10 +67,8 @@ def connect(self, path):
-------
Backend
"""
new_backend = self.__class__()
new_backend.path = new_backend.root = Path(path)
new_backend.dictionary = {}
return new_backend
self.path = self.root = Path(path)
self.dictionary = {}

@property
def version(self) -> str:
Expand Down
12 changes: 5 additions & 7 deletions ibis/backends/base/sql/alchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,11 @@ def _build_alchemy_url(
database=database,
)

def connect(self, con: sqlalchemy.engine.Engine):
new_backend = self.__class__()
new_backend.con = con
new_backend.meta = sqlalchemy.MetaData(bind=con)
new_backend._inspector = sqlalchemy.inspect(con)
new_backend._schemas: Dict[str, sch.Schema] = {}
return new_backend
def do_connect(self, con: sqlalchemy.engine.Engine) -> None:
self.con = con
self._inspector = sqlalchemy.inspect(self.con)
self.meta = sqlalchemy.MetaData(bind=self.con)
self._schemas: Dict[str, sch.Schema] = {}

@property
def version(self):
Expand Down
5 changes: 5 additions & 0 deletions ibis/backends/base/sql/alchemy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ def __init__(self, table, source, schema=None):
schema = sch.infer(table, schema=schema)
super().__init__(table.name, schema, source)
self.sqla_table = table

def __getstate__(self):
d = super().__getstate__()
d['sqla_table'] = self.sqla_table
return d
6 changes: 2 additions & 4 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Backend(BaseSQLBackend):
table_expr_class = ClickhouseTable
compiler = ClickhouseCompiler

def connect(
def do_connect(
self,
host='localhost',
port=9000,
Expand Down Expand Up @@ -80,8 +80,7 @@ def connect(
-------
ClickhouseClient
"""
new_backend = self.__class__()
new_backend.con = _DriverClient(
self.con = _DriverClient(
host=host,
port=port,
database=database,
Expand All @@ -90,7 +89,6 @@ def connect(
client_name=client_name,
compression=compression,
)
return new_backend

def register_options(self):
ibis.config.register_option(
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class Backend(BasePandasBackend):
database_class = DaskDatabase
table_class = DaskTable

def connect(self, dictionary):
def do_connect(self, dictionary):
# register dispatchers
from . import udf # noqa: F401

return super().connect(dictionary)
super().do_connect(dictionary)

@property
def version(self):
Expand Down
7 changes: 2 additions & 5 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ class Backend(BaseSQLBackend):
def hdfs_connect(self, *args, **kwargs):
return hdfs_connect(*args, **kwargs)

def connect(
self,
def do_connect(
new_backend,
host='localhost',
port=21050,
database='default',
Expand Down Expand Up @@ -240,7 +240,6 @@ def connect(
"""
import hdfs

new_backend = self.__class__()
new_backend._kudu = None
new_backend._temp_objects = set()

Expand All @@ -267,8 +266,6 @@ def connect(

new_backend._ensure_temp_db_exists()

return new_backend

@property
def version(self):
cursor = self.raw_sql('select version()')
Expand Down
8 changes: 3 additions & 5 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Backend(BaseAlchemyBackend):
name = 'mysql'
compiler = MySQLCompiler

def connect(
def do_connect(
self,
host='localhost',
user=None,
Expand Down Expand Up @@ -96,10 +96,8 @@ def connect(
driver=f'mysql+{driver}',
)

new_backend = super().connect(sqlalchemy.create_engine(alchemy_url))
new_backend.database_name = alchemy_url.database

return new_backend
self.database_name = alchemy_url.database
super().do_connect(sqlalchemy.create_engine(alchemy_url))

@contextlib.contextmanager
def begin(self):
Expand Down
6 changes: 2 additions & 4 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BasePandasBackend(BaseBackend):
Base class for backends based on pandas.
"""

def connect(self, dictionary):
def do_connect(self, dictionary):
"""Construct a client from a dictionary of DataFrames.
Parameters
Expand All @@ -30,9 +30,7 @@ def connect(self, dictionary):
from . import execution # noqa F401
from . import udf # noqa F401

new_backend = self.__class__()
new_backend.dictionary = dictionary
return new_backend
self.dictionary = dictionary

def from_dataframe(self, df, name='df', client=None):
"""
Expand Down
7 changes: 3 additions & 4 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Backend(BaseAlchemyBackend):
name = 'postgres'
compiler = PostgreSQLCompiler

def connect(
def do_connect(
self,
host='localhost',
user=None,
Expand Down Expand Up @@ -93,9 +93,8 @@ def connect(
database=database,
driver=f'postgresql+{driver}',
)
new_backend = super().connect(sqlalchemy.create_engine(alchemy_url))
new_backend.database_name = alchemy_url.database
return new_backend
self.database_name = alchemy_url.database
super().do_connect(sqlalchemy.create_engine(alchemy_url))

def list_databases(self, like=None):
# http://dba.stackexchange.com/a/1304/58517
Expand Down
13 changes: 5 additions & 8 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,23 @@ class Backend(BaseSQLBackend):
table_class = PySparkDatabaseTable
table_expr_class = PySparkTable

def connect(self, session):
def do_connect(self, session):
"""
Create a pyspark `Backend` for use with Ibis.
Pipes `**kwargs` into Backend, which pipes them into SparkContext.
See documentation for SparkContext:
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext
"""
new_backend = self.__class__()
new_backend._context = session.sparkContext
new_backend._session = session
new_backend._catalog = session.catalog
self._context = session.sparkContext
self._session = session
self._catalog = session.catalog

# Spark internally stores timestamps as UTC values, and timestamp data
# that is brought in without a specified time zone is converted as
# local time to UTC with microsecond resolution.
# https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#timestamp-with-time-zone-semantics
new_backend._session.conf.set('spark.sql.session.timeZone', 'UTC')

return new_backend
self._session.conf.set('spark.sql.session.timeZone', 'UTC')

@property
def version(self):
Expand Down
48 changes: 42 additions & 6 deletions ibis/backends/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,44 @@ class Backend(BaseAlchemyBackend):
database_class = Database
compiler = SQLiteCompiler

def connect(self, path=None, create=False):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._con: sqlalchemy.engine.Engine = None
self._meta: sqlalchemy.MetaData = None

def __getstate__(self) -> dict:
r = super().__getstate__()
r.update(
dict(
compiler=self.compiler,
database_name=self.database_name,
_con=None, # clear connection on copy()
_meta=None,
)
)
return r

@property
def con(self) -> sqlalchemy.engine.Engine:
if self._con is None:
self.reconnect()
return self._con

@con.setter
def con(self, v: Optional[sqlalchemy.engine.Engine]):
self._con = v

@property
def meta(self) -> sqlalchemy.MetaData:
if self._meta is None:
self.reconnect()
return self._meta

@meta.setter
def meta(self, v: sqlalchemy.MetaData):
self._meta = v

def do_connect(self, path=None, create=False):
"""
Create an Ibis client connected to a SQLite database.
Expand All @@ -47,15 +83,15 @@ def connect(self, path=None, create=False):
create : boolean, default False
If file does not exist, create it
"""
new_backend = super().connect(sqlalchemy.create_engine("sqlite://"))
new_backend.database_name = "base"
self.database_name = "base"

super().do_connect(sqlalchemy.create_engine("sqlite://"))
if path is not None:
new_backend.attach(new_backend.database_name, path, create=create)
self.attach(self.database_name, path, create=create)

udf.register_all(new_backend.con)
udf.register_all(self.con)

return new_backend
self._meta = sqlalchemy.MetaData(bind=self.con)

def list_tables(self, like=None, database=None):
if database is None:
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/sqlite/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_list_tables(con):


def test_attach_file(dbpath):
client = Backend().connect()
client = Backend().connect(None)

client.attach('foo', dbpath)
client.attach('bar', dbpath)
Expand Down
11 changes: 7 additions & 4 deletions ibis/expr/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import warnings
import webbrowser
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Dict, Optional

import numpy as np

Expand Down Expand Up @@ -205,7 +205,9 @@ def _find_backends(self):
"""
from ibis.backends.base import BaseBackend

seen_backends = set()
seen_backends: Dict[
str, BaseBackend
] = {} # key is backend.db_identity

stack = [self.op()]
seen = set()
Expand All @@ -218,11 +220,12 @@ def _find_backends(self):

for arg in node.flat_args():
if isinstance(arg, BaseBackend):
seen_backends.add(arg)
if arg.db_identity not in seen_backends:
seen_backends[arg.db_identity] = arg
elif isinstance(arg, ir.Expr):
stack.append(arg.op())

return list(seen_backends)
return list(seen_backends.values())

def _find_backend(self):
backends = self._find_backends()
Expand Down
Loading

0 comments on commit 5467afa

Please sign in to comment.