Skip to content

Commit 3f1a2af

Browse files
committed
PYCBC-1550: Add support for scoped search indexes
Motivation ========== Add support for search index mgmt and search operations from a scope. Modification ============ * Update C++ core * Update search index mgmt to use C++ core mgmt API * Add access to SearchIndexManager from scope instance * Add access to search() functionality (new path for search API) from scope instance Change-Id: Ifc572dddff3847da8c67d0bb5c1a73976ec373c0 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/204727 Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 7b762ee commit 3f1a2af

File tree

16 files changed

+1171
-72
lines changed

16 files changed

+1171
-72
lines changed

acouchbase/management/search.py

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@
4141

4242

4343
class SearchIndexManager(SearchIndexManagerLogic):
44-
def __init__(self, connection, loop):
44+
def __init__(self,
45+
connection,
46+
loop
47+
):
4548
super().__init__(connection)
4649
self._loop = loop
4750

@@ -167,3 +170,141 @@ def get_all_index_stats(self,
167170
**kwargs # type: Dict[str, Any]
168171
) -> Awaitable[Dict[str, Any]]:
169172
super().get_all_index_stats(*options, **kwargs)
173+
174+
175+
class ScopeSearchIndexManager(SearchIndexManagerLogic):
176+
"""
177+
**VOLATILE** This API is subject to change at any time.
178+
"""
179+
180+
def __init__(self,
181+
connection,
182+
loop,
183+
bucket_name, # type: str
184+
scope_name # type: str
185+
):
186+
super().__init__(connection, bucket_name=bucket_name, scope_name=scope_name)
187+
self._loop = loop
188+
189+
@property
190+
def loop(self):
191+
"""
192+
**INTERNAL**
193+
"""
194+
return self._loop
195+
196+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
197+
def upsert_index(self,
198+
index, # type: SearchIndex
199+
*options, # type: UpsertSearchIndexOptions
200+
**kwargs # type: Dict[str, Any]
201+
) -> Awaitable[None]:
202+
203+
super().upsert_index(index, *options, **kwargs)
204+
205+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
206+
def drop_index(self,
207+
index_name, # type: str
208+
*options, # type: DropSearchIndexOptions
209+
**kwargs # type: Dict[str, Any]
210+
) -> Awaitable[None]:
211+
212+
super().drop_index(index_name, *options, **kwargs)
213+
214+
@AsyncMgmtWrapper.inject_callbacks(SearchIndex, ManagementType.SearchIndexMgmt,
215+
SearchIndexManagerLogic._ERROR_MAPPING)
216+
def get_index(self,
217+
index_name, # type: str
218+
*options, # type: GetSearchIndexOptions
219+
**kwargs # type: Dict[str, Any]
220+
) -> Awaitable[SearchIndex]:
221+
222+
super().get_index(index_name, *options, **kwargs)
223+
224+
@AsyncMgmtWrapper.inject_callbacks(SearchIndex, ManagementType.SearchIndexMgmt,
225+
SearchIndexManagerLogic._ERROR_MAPPING)
226+
def get_all_indexes(self,
227+
*options, # type: GetAllSearchIndexesOptions
228+
**kwargs # type: Dict[str, Any]
229+
) -> Awaitable[Iterable[SearchIndex]]:
230+
super().get_all_indexes(*options, **kwargs)
231+
232+
@AsyncMgmtWrapper.inject_callbacks(int, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
233+
def get_indexed_documents_count(self,
234+
index_name, # type: str
235+
*options, # type: GetSearchIndexedDocumentsCountOptions
236+
**kwargs # type: Dict[str, Any]
237+
) -> Awaitable[int]:
238+
super().get_indexed_documents_count(index_name, *options, **kwargs)
239+
240+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
241+
def pause_ingest(self,
242+
index_name, # type: str
243+
*options, # type: PauseIngestSearchIndexOptions
244+
**kwargs # type: Dict[str, Any]
245+
) -> Awaitable[None]:
246+
super().pause_ingest(index_name, *options, **kwargs)
247+
248+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
249+
def resume_ingest(self,
250+
index_name, # type: str
251+
*options, # type: ResumeIngestSearchIndexOptions
252+
**kwargs # type: Dict[str, Any]
253+
) -> Awaitable[None]:
254+
super().resume_ingest(index_name, *options, **kwargs)
255+
256+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
257+
def allow_querying(self,
258+
index_name, # type: str
259+
*options, # type: AllowQueryingSearchIndexOptions
260+
**kwargs # type: Dict[str, Any]
261+
) -> Awaitable[None]:
262+
super().allow_querying(index_name, *options, **kwargs)
263+
264+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
265+
def disallow_querying(self,
266+
index_name, # type: str
267+
*options, # type: DisallowQueryingSearchIndexOptions
268+
**kwargs # type: Dict[str, Any]
269+
) -> Awaitable[None]:
270+
super().disallow_querying(index_name, *options, **kwargs)
271+
272+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
273+
def freeze_plan(self,
274+
index_name, # type: str
275+
*options, # type: FreezePlanSearchIndexOptions
276+
**kwargs # type: Dict[str, Any]
277+
) -> Awaitable[None]:
278+
super().freeze_plan(index_name, *options, **kwargs)
279+
280+
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
281+
def unfreeze_plan(self,
282+
index_name, # type: str
283+
*options, # type: UnfreezePlanSearchIndexOptions
284+
**kwargs # type: Dict[str, Any]
285+
) -> Awaitable[None]:
286+
super().unfreeze_plan(index_name, *options, **kwargs)
287+
288+
@AsyncMgmtWrapper.inject_callbacks(dict, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
289+
def analyze_document(self,
290+
index_name, # type: str
291+
document, # type: Any
292+
*options, # type: AnalyzeDocumentSearchIndexOptions
293+
**kwargs # type: Dict[str, Any]
294+
) -> Awaitable[Dict[str, Any]]:
295+
super().analyze_document(index_name, document, *options, **kwargs)
296+
297+
@AsyncMgmtWrapper.inject_callbacks(dict, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
298+
def get_index_stats(self,
299+
index_name, # type: str
300+
*options, # type: GetSearchIndexStatsOptions
301+
**kwargs # type: Dict[str, Any]
302+
) -> Awaitable[Dict[str, Any]]:
303+
super().get_index_stats(index_name, *options, **kwargs)
304+
305+
@AsyncMgmtWrapper.inject_callbacks(dict, ManagementType.SearchIndexMgmt, SearchIndexManagerLogic._ERROR_MAPPING)
306+
def get_all_index_stats(self,
307+
*options, # type: GetAllSearchIndexStatsOptions
308+
**kwargs # type: Dict[str, Any]
309+
) -> Awaitable[Dict[str, Any]]:
310+
super().get_all_index_stats(*options, **kwargs)

acouchbase/scope.py

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
from typing import (TYPE_CHECKING,
1717
Any,
1818
Awaitable,
19+
Dict,
1920
Optional)
2021

2122
from acouchbase.analytics import AnalyticsQuery, AsyncAnalyticsRequest
2223
from acouchbase.collection import Collection
24+
from acouchbase.management.search import ScopeSearchIndexManager
2325
from acouchbase.n1ql import AsyncN1QLRequest, N1QLQuery
2426
from acouchbase.search import AsyncFullTextSearchRequest, SearchQueryBuilder
2527
from couchbase.options import (AnalyticsOptions,
@@ -28,10 +30,11 @@
2830
from couchbase.result import (AnalyticsResult,
2931
QueryResult,
3032
SearchResult)
33+
from couchbase.serializer import Serializer
3134
from couchbase.transcoder import Transcoder
3235

3336
if TYPE_CHECKING:
34-
from acouchbase.search import SearchQuery
37+
from couchbase.search import SearchQuery, SearchRequest
3538

3639

3740
class AsyncScope:
@@ -65,6 +68,17 @@ def loop(self):
6568
"""
6669
return self._bucket.loop
6770

71+
@property
72+
def default_serializer(self) -> Optional[Serializer]:
73+
return self._bucket.default_serializer
74+
75+
@property
76+
def streaming_timeouts(self):
77+
"""
78+
**INTERNAL**
79+
"""
80+
return self._bucket.streaming_timeouts
81+
6882
@property
6983
def default_transcoder(self) -> Optional[Transcoder]:
7084
return self._bucket.default_transcoder
@@ -107,12 +121,11 @@ def _set_connection(self):
107121
"""
108122
self._connection = self._bucket.connection
109123

110-
def query(
111-
self,
112-
statement, # type: str
113-
*options, # type: QueryOptions
114-
**kwargs # type: Any
115-
) -> QueryResult:
124+
def query(self,
125+
statement, # type: str
126+
*options, # type: QueryOptions
127+
**kwargs # type: Dict[str, Any]
128+
) -> QueryResult:
116129
"""Executes a N1QL query against the scope.
117130
118131
.. note::
@@ -203,12 +216,11 @@ def query(
203216
query.params,
204217
**request_args))
205218

206-
def analytics_query(
207-
self,
208-
statement, # type: str
209-
*options, # type: AnalyticsOptions
210-
**kwargs
211-
) -> AnalyticsResult:
219+
def analytics_query(self,
220+
statement, # type: str
221+
*options, # type: AnalyticsOptions
222+
**kwargs # type: Dict[str, Any]
223+
) -> AnalyticsResult:
212224
"""Executes an analaytics query against the scope.
213225
214226
.. note::
@@ -299,13 +311,12 @@ def analytics_query(
299311
query.params,
300312
**request_args))
301313

302-
def search_query(
303-
self,
304-
index, # type: str
305-
query, # type: SearchQuery
306-
*options, # type: SearchOptions
307-
**kwargs
308-
) -> SearchResult:
314+
def search_query(self,
315+
index, # type: str
316+
query, # type: SearchQuery
317+
*options, # type: SearchOptions
318+
**kwargs # type: Dict[str, Any]
319+
) -> SearchResult:
309320
"""Executes an search query against the scope.
310321
311322
.. note::
@@ -409,14 +420,47 @@ def search_query(
409420
if not ('scope_name' in opt or 'scope_name' in kwargs):
410421
kwargs['scope_name'] = f'{self.name}'
411422

412-
query = SearchQueryBuilder.create_search_query_object(
413-
index, query, *options, **kwargs
414-
)
423+
query = SearchQueryBuilder.create_search_query_object(index, query, *options, **kwargs)
415424
return SearchResult(AsyncFullTextSearchRequest.generate_search_request(self.connection,
416425
self.loop,
417426
query.as_encodable(),
418427
**request_args))
419428

429+
def search(self,
430+
index, # type: str
431+
request, # type: SearchRequest
432+
*options, # type: SearchOptions
433+
**kwargs, # type: Dict[str, Any]
434+
) -> SearchResult:
435+
"""
436+
**VOLATILE** This API is subject to change at any time.
437+
"""
438+
# See couchbase.cluster.search() for note on streaming timeout
439+
streaming_timeout = self.streaming_timeouts.get('search_timeout', None)
440+
query = SearchQueryBuilder.create_search_query_from_request(index, request, *options, **kwargs)
441+
req = AsyncFullTextSearchRequest.generate_search_request(self.connection,
442+
self.loop,
443+
query.as_encodable(),
444+
default_serializer=self.default_serializer,
445+
streaming_timeout=streaming_timeout,
446+
bucket_name=self.bucket_name,
447+
scope_name=self.name)
448+
return SearchResult(req)
449+
450+
def search_indexes(self) -> ScopeSearchIndexManager:
451+
"""
452+
**VOLATILE** This API is subject to change at any time.
453+
454+
Get a :class:`~acouchbase.management.search.ScopeSearchIndexManager` which can be used to manage the search
455+
indexes of this scope.
456+
457+
Returns:
458+
:class:`~acouchbase.management.search.ScopeSearchIndexManager`: A :class:`~acouchbase.management.search.ScopeSearchIndexManager` instance.
459+
460+
""" # noqa: E501
461+
# TODO: AlreadyShutdownException?
462+
return ScopeSearchIndexManager(self.connection, self.loop, self.bucket_name, self.name)
463+
420464
@staticmethod
421465
def default_name():
422466
return "_default"

acouchbase/tests/_test_utils.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from __future__ import annotations
17+
1618
import asyncio
1719
from typing import (Any,
1820
Callable,
@@ -112,6 +114,7 @@ def __init__(self, cluster, bucket, collection, cluster_config, **kwargs):
112114
self._collection_spec = None
113115
self._scope = None
114116
self._named_collection = None
117+
self._use_scope_search_mgmt = False
115118

116119
@property
117120
def collection(self) -> Collection:
@@ -182,6 +185,10 @@ def rate_limit_params(self) -> Optional[RateLimitData]:
182185
"""Returns the rate limit testing data"""
183186
return self._rate_limit_params if hasattr(self, '_rate_limit_params') else None
184187

188+
@property
189+
def use_scope_search_mgmt(self) -> bool:
190+
return self._use_scope_search_mgmt
191+
185192
@classmethod # noqa: C901
186193
async def get_environment(cls, # noqa: C901
187194
test_suite,
@@ -239,6 +246,33 @@ async def get_environment(cls, # noqa: C901
239246

240247
return cb_env
241248

249+
def disable_scope_search_mgmt(self) -> TestEnvironment:
250+
self._use_scope_search_mgmt = False
251+
return self
252+
253+
def disable_search_mgmt(self) -> TestEnvironment:
254+
self.check_if_feature_supported('search_index_mgmt')
255+
if not hasattr(self, '_sixm'):
256+
del self._sixm
257+
258+
return self
259+
260+
def enable_scope_search_mgmt(self) -> TestEnvironment:
261+
self._use_scope_search_mgmt = True
262+
return self
263+
264+
def enable_search_mgmt(self) -> TestEnvironment:
265+
self.check_if_feature_supported('search_index_mgmt')
266+
if self.use_scope_search_mgmt:
267+
if not hasattr(self.scope, 'search_indexes'):
268+
pytest.skip('Search index management not available on scope.')
269+
self._sixm = self.scope.search_indexes()
270+
else:
271+
if not hasattr(self.cluster, 'search_indexes'):
272+
pytest.skip('Search index management not available on cluster.')
273+
self._sixm = self.cluster.search_indexes()
274+
return self
275+
242276
async def get_new_key_value(self, reset=True):
243277
if reset is True:
244278
try:

0 commit comments

Comments
 (0)