Skip to content

Commit

Permalink
Merge eba50d3 into 68bccf3
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Bogensberger committed Aug 19, 2014
2 parents 68bccf3 + eba50d3 commit 06b3f0e
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Changes for crate
=================

- use bulk_args in executemany to increase performance

- improved docs formatting of field lists

2014/07/25 0.10.7
Expand Down
3 changes: 3 additions & 0 deletions docs/advanced_usage.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ This can be done by passing a object of the Client class when calling the

>>> from crate import client
>>> class MyConnectionClient:
... active_servers = ["localhost:4200"]
... def __init__(self):
... pass
... def sql(self, stmt=None, parameters=None):
... pass
... def server_infos(self, server):
... return ("localhost:4200", "my server", "0.42.0")
>>> connection = client.connect([crate_host], client=MyConnectionClient())
29 changes: 22 additions & 7 deletions docs/client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,27 @@ database operations::
... (name, date, kind, position) VALUES (?, ?, ?, ?)""",
... ('Einstein Cross', '2007-03-11', 'Quasar', 7))

To bulk insert data you need to make use of multiple rows in the
INSERT statement and provide a list of arguments with the length of
``number of inserted records * number of columns``::
To bulk insert data you can use the `executemany` function::

>>> cursor.execute("""INSERT INTO locations
... (name, date, kind, position) VALUES (?, ?, ?, ?), (?, ?, ?, ?)""",
... ('Cloverleaf', '2007-03-11', 'Quasar', 7,
... 'Old Faithful', '2007-03-11', 'Quasar', 7))
>>> cursor.executemany("""INSERT INTO locations
... (name, date, kind, position) VALUES (?, ?, ?, ?)""",
... [('Cloverleaf', '2007-03-11', 'Quasar', 7),
... ('Old Faithful', '2007-03-11', 'Quasar', 7)])


.. note::

If you are using a crate server version older than 0.42.0 the client
will execute a single sql statement for every parameter in the parameter
sequence when you are using executemany. To avoid that overhead you can
use ``execute`` and make use of multiple rows in the INSERT
statement and provide a list of arguments with the length of
``number of inserted records * number of columns``::

>>> cursor.execute("""INSERT INTO locations
... (name, date, kind, position) VALUES (?, ?, ?, ?), (?, ?, ?, ?)""",
... ('Creameries', '2007-03-11', 'Quasar', 7,
... 'Double Quasar', '2007-03-11', 'Quasar', 7))

.. Hidden: refresh locations

Expand Down Expand Up @@ -128,6 +141,8 @@ fetchall()
['Arkintoofle Minor'],
['Bartledan'],
['Cloverleaf'],
['Creameries'],
['Double Quasar'],
['Einstein Cross'],
['Folfanga'],
['Galactic Sector QQ7 Active J Gamma'],
Expand Down
6 changes: 3 additions & 3 deletions docs/https.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ a self signed certificate::

>>> http_client = HttpClient([crate_host])
>>> http_client.server_infos(http_client._get_server())
('https://localhost:65534', u'test')
('https://localhost:65534', u'test', '0.0.0')

When switching on verification and giving a wrong ``ca_cert`` an error is raised::

Expand All @@ -42,10 +42,10 @@ established, to Eves satisfaction.

>>> non_verifying_client = HttpClient([crate_host], ca_cert=invalid_ca_cert, verify_ssl_cert=False)
>>> non_verifying_client.server_infos(crate_host)
('https://localhost:65534', u'test')
('https://localhost:65534', u'test', '0.0.0')

Connecting to a host whose certficate is verified::

>>> verifying_valid_client = HttpClient([crate_host], ca_cert=valid_ca_cert, verify_ssl_cert=True)
>>> verifying_valid_client.server_infos(verifying_valid_client._get_server())
('https://localhost:65534', u'test')
('https://localhost:65534', u'test', '0.0.0')
16 changes: 15 additions & 1 deletion src/crate/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
# software solely pursuant to the terms of the relevant commercial agreement.

from .cursor import Cursor
from .exceptions import ProgrammingError
from .exceptions import ProgrammingError, ConnectionError
from .http import Client
from .blob import BlobContainer
from distutils.version import StrictVersion


class Connection(object):
Expand All @@ -36,6 +37,7 @@ def __init__(self, servers=None, timeout=None, client=None,
verify_ssl_cert=verify_ssl_cert,
ca_cert=ca_cert,
error_trace=error_trace)
self.lowest_server_version = self._lowest_server_version()
self._closed = False

def cursor(self):
Expand Down Expand Up @@ -68,6 +70,18 @@ def get_blob_container(self, container_name):
"""
return BlobContainer(container_name, self)

def _lowest_server_version(self):
lowest = None
for server in self.client.active_servers:
try:
_, _, version = self.client.server_infos(server)
version = StrictVersion(version)
except (ValueError, ConnectionError):
continue
if not lowest or version < lowest:
lowest = version
return lowest or StrictVersion('0.0.0')

def __repr__(self):
return '<Connection {0}>'.format(repr(self.client))

Expand Down
2 changes: 2 additions & 0 deletions src/crate/client/connection.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Connection
We create a new connection object::

>>> connection = connect(client=connection_client_mocked)
>>> connection.lowest_server_version.version
(0, 42, 0)

Calling the ``cursor()`` function on the connection will
return a cursor object::
Expand Down
24 changes: 17 additions & 7 deletions src/crate/client/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# software solely pursuant to the terms of the relevant commercial agreement.

from .exceptions import ProgrammingError
from distutils.version import StrictVersion

BULK_INSERT_MIN_VERSION = StrictVersion("0.42.0")


class Cursor(object):
Expand All @@ -35,7 +38,7 @@ def __init__(self, connection):
self._closed = False
self._result = None

def execute(self, sql, parameters=None):
def execute(self, sql, parameters=None, bulk_parameters=None):
"""
Prepare and execute a database operation (query or command).
"""
Expand All @@ -45,7 +48,7 @@ def execute(self, sql, parameters=None):
if self._closed:
raise ProgrammingError("Cursor closed")

self._result = self.connection.client.sql(sql, parameters)
self._result = self.connection.client.sql(sql, parameters, bulk_parameters)
if "rows" in self._result:
self.rows = iter(self._result["rows"])

Expand All @@ -56,13 +59,20 @@ def executemany(self, sql, seq_of_parameters):
"""
row_counts = []
durations = []

for params in seq_of_parameters:
self.execute(sql, parameters=params)
if self.rowcount > -1:
row_counts.append(self.rowcount)
if self.connection.lowest_server_version >= BULK_INSERT_MIN_VERSION:
self.execute(sql, bulk_parameters=seq_of_parameters)
for result in self._result.get('results', []):
if result.get('rowcount') > -1:
row_counts.append(result.get('rowcount'))
if self.duration > -1:
durations.append(self.duration)
else:
for params in seq_of_parameters:
self.execute(sql, parameters=params)
if self.rowcount > -1:
row_counts.append(self.rowcount)
if self.duration > -1:
durations.append(self.duration)
self._result = {
"rowcount": sum(row_counts) if row_counts else -1,
"duration": sum(durations) if durations else -1,
Expand Down
49 changes: 41 additions & 8 deletions src/crate/client/cursor.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,33 @@ The attribute is -1 in case close has been performed on the cursor::
>>> cursor.duration
-1

>>> connection.client.set_next_response({
... "results": [
... {
... "rowcount": 3
... },
... {
... "rowcount": 2
... }
... ],
... "duration":123,
... "cols":[ "name", "position" ],
... })

executemany
===========

``executemany()`` allows to execute a single sql statement against a sequence
of parameters. The resulting ``rowcount`` and ``duration`` are the sum of all
calls made to the server during this method execution.::
``executemany()`` allows to execute a single sql statement against a sequence of parameters::

>>> cursor = connection.cursor()

>>> cursor.executemany('', (1,2,3))
>>> cursor.rowcount
9
5
>>> cursor.duration
369
123

As executemany cannot keep all the possible resultrows during execution of many statements
and executemany is not intended to be used with statements returning result sets
the result will always be empty::
``executemany is not intended to be used with statements returning result sets. The result will alway be empty::

>>> cursor.fetchall()
[]
Expand All @@ -196,6 +205,30 @@ For completeness' sake the cursor description is updated nonetheless::
>>> [ desc[0] for desc in cursor.description ]
['name', 'position']

>>> connection.client.set_next_response({
... "rows":[ [ "North West Ripple", 1 ], [ "Arkintoofle Minor", 3 ], [ "Alpha Centauri", 3 ] ],
... "cols":[ "name", "position" ],
... "rowcount":3,
... "duration":123
... })

Usually ``executemany`` sends the ``bulk_args`` parameter to the crate sql
endpoint which was introduced with crate 0.42.0.
If the client is connected to at least one crate server with an older version
the client executes the statement once for every parameter within the sequence.

The resulting ``rowcount`` and ``duration`` are the sum of all calls made to
the server during this method execution.::


>>> connection_client_mocked.set_next_server_infos("local:4200", "my crate", "0.41.9")
>>> connection = connect(client=connection_client_mocked)
>>> cursor = connection.cursor()
>>> cursor.executemany('', (1,2,3))
>>> cursor.rowcount
9
>>> cursor.duration
369

close()
=======
Expand Down
12 changes: 8 additions & 4 deletions src/crate/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def __init__(self, servers=None, timeout=None, ca_cert=None,
servers = [self._server_url(s) for s in servers]
self._active_servers = servers
self._inactive_servers = []

self._http_timeout = timeout
pool_kw = {}
if ca_cert is None:
Expand Down Expand Up @@ -181,7 +180,7 @@ def _server_url(server):
url = '%s://%s' % (parsed.scheme, parsed.netloc)
return url

def sql(self, stmt, parameters=None):
def sql(self, stmt, parameters=None, bulk_parameters=None):
"""
Execute SQL stmt against the crate server.
"""
Expand All @@ -196,6 +195,8 @@ def sql(self, stmt, parameters=None):
}
if parameters:
data['args'] = parameters
if bulk_parameters:
data['bulk_args'] = bulk_parameters
logger.debug(
'Sending request to %s with payload: %s', self.path, data)
content = self._json_request('POST', self.path, data=data)
Expand All @@ -213,7 +214,7 @@ def server_infos(self, server):
"Invalid server response of content-type '%s'" %
response.headers.get("content-type", "unknown"))
node_name = content.get("name")
return server, node_name
return server, node_name, content.get('version', {}).get('number', '0.0.0')

def _blob_path(self, table, digest):
return '_blobs/{table}/{digest}'.format(table=table, digest=digest)
Expand Down Expand Up @@ -267,7 +268,6 @@ def blob_exists(self, table, digest):
return False
self._raise_for_status(response)


def _add_server(self, server):
with self._lock:
if server not in self.server_pool:
Expand Down Expand Up @@ -339,6 +339,10 @@ def _raise_for_status(self, response):
data = json.loads(six.text_type(response.data, 'utf-8'))
error = data.get('error', {})
error_trace = data.get('error_trace', None)
if "results" in data:
errors = [res["error_message"] for res in data["results"] if res.get("error_message")]
if errors:
raise ProgrammingError("\n".join(errors))
if isinstance(error, dict):
raise ProgrammingError(error.get('message', ''), error_trace=error_trace)
raise ProgrammingError(error, error_trace=error_trace)
Expand Down
22 changes: 22 additions & 0 deletions src/crate/client/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .http import Client
from crate.client import connect
from unittest import TestCase


class ConnectionTest(TestCase):

def test_lowest_server_version(self):
infos = [(None, None, '0.42.3'),
(None, None, '0.41.8'),
(None, None, 'not a version')]

client = Client(servers="localhost:4200 localhost:4201 localhost:4202")
client.server_infos = lambda server: infos.pop()
connection = connect(client=client)
self.assertEqual((0, 41, 8), connection.lowest_server_version.version)

def test_invalid_server_version(self):
client = Client(servers="localhost:4200")
client.server_infos = lambda server: (None, None, "No version")
connection = connect(client=client)
self.assertEqual((0, 0, 0), connection.lowest_server_version.version)
10 changes: 9 additions & 1 deletion src/crate/client/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ def test_execute_with_args(self):
c = conn.cursor()
statement = 'select * from locations where position = ?'
c.execute(statement, 1)
client.sql.assert_called_once_with(statement, 1)
client.sql.assert_called_once_with(statement, 1, None)

def test_execute_with_bulk_args(self):
client = MagicMock(spec=Client)
conn = connect(client=client)
c = conn.cursor()
statement = 'select * from locations where position = ?'
c.execute(statement, bulk_parameters=[[1]])
client.sql.assert_called_once_with(statement, None, [[1]])

0 comments on commit 06b3f0e

Please sign in to comment.