Skip to content

Commit

Permalink
Initial N1QL commit
Browse files Browse the repository at this point in the history
This provides the base implementation from libcouchbase. Don't merge
this yet as this depends on yet-unmerged features of libcouchbase
itself.

Change-Id: I33fc05900c62ea8e19fd6b27222b27d32e757000
Reviewed-on: http://review.couchbase.org/46105
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Reviewed-by: Mark Nunberg <mark.nunberg@couchbase.com>
Tested-by: Mark Nunberg <mark.nunberg@couchbase.com>
  • Loading branch information
mnunberg authored and Mark Nunberg committed Feb 20, 2015
1 parent 01a843f commit da6ad0f
Show file tree
Hide file tree
Showing 8 changed files with 554 additions and 0 deletions.
16 changes: 16 additions & 0 deletions couchbase/bucket.py
Expand Up @@ -28,6 +28,7 @@
import couchbase.exceptions as exceptions
from couchbase.views.params import make_dvpath, make_options_string
from couchbase.views.iterator import View
from couchbase.n1ql import N1QLQuery, N1QLRequest
from couchbase._pyport import basestring


Expand Down Expand Up @@ -1178,6 +1179,21 @@ def query(self, design, view, use_devmode=False, itercls=View, **kwargs):
design = self._mk_devmode(design, use_devmode)
return itercls(self, design, view, **kwargs)

def n1ql_query(self, query, **kwargs):
"""
Execute a N1QL query.
:param query: The query to execute. This may either be a
:class:`.N1QLQuery` object, or a string (which will be
implicitly converted to one).
:param kwargs: Arguments for :class:`.N1QLRequest`.
:return: An iterator which yields rows. The returned
"""
if not isinstance(query, N1QLQuery):
query = N1QLQuery(query)

return N1QLRequest(query, self, **kwargs)

def __repr__(self):
return ('<{modname}.{cls} bucket={bucket}, nodes={nodes} at 0x{oid:x}>'
).format(modname=__name__, cls=self.__class__.__name__,
Expand Down
300 changes: 300 additions & 0 deletions couchbase/n1ql.py
@@ -1 +1,301 @@
#
# Copyright 2015, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json

from couchbase._pyport import long, basestring
from couchbase._libcouchbase import _N1QLParams
from couchbase.views.iterator import AlreadyQueriedError
from couchbase.exceptions import CouchbaseError


class N1QLError(CouchbaseError):
@property
def n1ql_errcode(self):
return self.objextra['code']


class N1QLRow(object):
def __init__(self, jsobj):
"""
Default class wrapping a row returned by an N1QL query.
The fields in the row may be obtained by using the index
syntax (``['name']`` or ``[4]``) to retrieve a named or
indexed result field.
:param jsobj: The raw JSON from the row
"""
self._jsobj = jsobj

def __getitem__(self, item):
if isinstance(item, (int, long)):
try:
return self._jsobj['${0}'.format(item)]
except IndexError:
raise IndexError('Row has no positional index {0}'.format(item))
else:
return self._jsobj[item]

@property
def raw(self):
"""
Retrieve the underlying JSON object
"""
return self._jsobj

def named_field(self, name):
"""
Retrieve a field by its name
:param name: The field name
:return: The field value
:raise: :exc:`KeyError` if there is no such field
"""
return self._jsobj[name]

def pos_field(self, pos):
"""
Retrieve a field by its index.
The field must have been retrieved as an indexed parameter
:param pos: The numeric index
:return: The field value
:raise: :exc:`IndexError` if there is no such field
"""
return self._jsobj['${0}'.format(pos)]

def pos_values(self):
"""
Get all positional field values as a list
:return: All positional field values.
:raise: :exc:`ValueError` if there are no positional results.
"""
ll = []
i = 1
while True:
try:
ll.append(self[i])
i += 1
except (IndexError, KeyError):
break
if ll:
return ll
else:
raise ValueError('No positional arguments')

def unwrap(self):
"""
Unwraps the result from any surrounding placeholders.
This method is useful if the query yields a single top level
field (either positional or named).
This will fail if there is more than one top level object in the row
:return: A new :class:`N1QLRow` which encapsulates the top level
field.
"""
if len(self._jsobj) > 1:
raise ValueError('Cannot unwrap!')
return self.__class__(self._jsobj[self._jsobj.keys()[0]])


class N1QLQuery(object):
def __init__(self, query, prepared=False):
"""
Create an N1QL Query object. This may be passed as the
`params` argument to :class:`N1QLRequest`.
:param query: The query body. This may either be a query string,
or a dictionary representing a prepared query.
:param prepared: Set to true if the query object should be treated
as a prepared statement
"""
self._cparams = _N1QLParams()
if prepared:
if not isinstance(query, dict):
raise ValueError('Prepared query must be a dictionary')
self._cparams.setquery(json.dumps(query), 2)
else:
self._cparams.setquery(query)
self._stmt = query

self.posargs = []
self.namedargs = {}

def set_args(self, **kv):
"""
Set a named parameter in the query. The named field must
exist in the query itself.
:param kv: Key-Value pairs representing values within the
query. These values should be stripped of their leading
`$` identifier.
"""
for k in kv:
self.set_rawargs({'$'+k: kv[k]}, encoded=False)
return self

def set_rawargs(self, kv, encoded=False):
"""
:param kv: A dictionary containing the raw key-values for
the query arguments.
:param encoded: Whether the values are already JSON encoded.
"""
if encoded:
self.namedargs.update(**kv)
return self
for k in kv:
self.namedargs[k] = json.dumps(kv[k])

return self

def set_option(self, name, value):
self._cparams.setopt(name, value)

def add_args(self, *args):
for arg in args:
self.posargs.append(json.dumps(arg))

@property
def statement(self):
return self._stmt

def clear(self):
self._cparams.clear()

def _presubmit(self):
for arg in self.posargs:
self._cparams.add_pos_param(arg)
for k in self.namedargs:
self._cparams.set_named_param(k, self.namedargs[k])

def __repr__(self):
return ('<{cls} stmt={stmt} at {oid}>'.format(
cls=self.__class__.__name__,
stmt=repr(self._stmt),
oid=id(self)))


class N1QLInsertQuery(N1QLQuery):
def __init__(self, keyspace, kv, encode=True):
"""
Create an INSERT query
:param string keyspace: The keyspace
:param kv: key=value entries to insert
:param boolean encode: Whether to encode the values
"""

kvp = []
for k in kv:
v = kv[k]
cur = 'VALUES("{0}",{1})'
cur = cur.format(k, json.dumps(v) if encode else v)
kvp.append(cur)

ss = 'INSERT INTO {0} {1}'.format(
keyspace, ','.join(kvp)
)
print ss
super(N1QLInsertQuery, self).__init__(ss)


class N1QLRequest(object):
def __init__(self, params, parent, row_factory=N1QLRow, _host=''):
"""
Object representing the execution of the request on the
server.
:param params: An :class:`N1QLQuery` object.
:param parent: The parent :class:`.Bucket` object
:param row_factory: Callable which accepts the JSON encoded
rows and converts them to Python objects. The default is
:class:`N1QLRow`. You may use ``lambda x: x`` to just
return the raw JSON
:param _host: `host:port` specifier, useful for standalone
instances of Developer Preview N1QL versions.
To actually receive results of the query, iterate over this
object.
"""
if isinstance(params, basestring):
params = N1QLQuery(params)

self._params = params
self._parent = parent
self.row_factory = row_factory
self.errors = []
self._mres = None
self._do_iter = True
self._host = _host
self.__raw = False

def _start(self):
if self._mres:
return

self._params._presubmit()
self._mres = self._parent._n1ql_query(
self._params._cparams, _host=self._host)
self.__raw = self._mres[None]

@property
def raw(self):
return self.__raw

def _handle_meta(self, value):
if not isinstance(value, dict):
return
if 'errors' in value:
for err in value['errors']:
raise N1QLError.pyexc('N1QL Execution failed', err)

def _process_payload(self, rows):
if rows:
return [self.row_factory(row) for row in rows]

elif self.raw.done:
self._handle_meta(self.raw.value)
self._do_iter = False
return []

def execute(self):
"""
Execute the statement and raise an exception on failure.
This method is useful for statements which modify data or
indexes, where the application does not need to extract any
data, but merely determine success or failure.
"""
for r in self:
pass

def get_single_result(self):
"""
Execute the statement and return its single result.
This should only be used on statements which are intended to
return only a single result.
:return: The single result, as encapsulated by the
`row_factory`
"""
for r in self:
return r

def __iter__(self):
if not self._do_iter:
raise AlreadyQueriedError()

self._start()
while self._do_iter:
raw_rows = self.raw.fetch(self._mres)
for row in self._process_payload(raw_rows):
yield row
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -88,6 +88,7 @@
'connevents',
'pipeline',
'views',
'n1ql',
]

if platform.python_implementation() != 'PyPy':
Expand Down
1 change: 1 addition & 0 deletions src/connection.c
Expand Up @@ -455,6 +455,7 @@ static PyMethodDef Bucket_TABLE_methods[] = {

OPFUNC(_http_request, "Internal routine for HTTP requests"),
OPFUNC(_view_request, "Internal routine for view requests"),
OPFUNC(_n1ql_query, "Internal routine for N1QL queries"),

OPFUNC(observe, "Get replication/persistence status for keys"),
OPFUNC(observe_multi, "multi-key variant of observe"),
Expand Down
1 change: 1 addition & 0 deletions src/ext.c
Expand Up @@ -282,6 +282,7 @@ init_libcouchbase(void)
X(IOEvent, pycbc_IOEventType_init) \
X(TimerEvent, pycbc_TimerEventType_init) \
X(AsyncResult, pycbc_AsyncResultType_init) \
X(_N1QLParams, pycbc_N1QLParamsType_init) \
X(_IOPSWrapper, pycbc_IOPSWrapperType_init)

#define X(name, inf) PyObject *cls_##name;
Expand Down

0 comments on commit da6ad0f

Please sign in to comment.