-
Notifications
You must be signed in to change notification settings - Fork 445
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2164 from devos50/search_endpoint
READY: Implemented endpoint to search for torrents and channels in Tribler
- Loading branch information
Showing
10 changed files
with
445 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import json | ||
from twisted.web import server, resource | ||
from Tribler.Core.Modules.restapi.util import convert_db_channel_to_json, convert_db_torrent_to_json | ||
from Tribler.Core.simpledefs import NTFY_CHANNELCAST, SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, SIGNAL_TORRENT | ||
|
||
|
||
MAX_EVENTS_BUFFER_SIZE = 100 | ||
|
||
|
||
class EventsEndpoint(resource.Resource): | ||
""" | ||
Important events in Tribler are returned over the events endpoint. This connection is held open. Each event is | ||
pushed over this endpoint in the form of a JSON dictionary. Each JSON dictionary contains a type field that | ||
indicates the type of the event. | ||
Currently, the following events are implemented: | ||
- events_start: An indication that the event socket is opened and that the server is ready to push events. | ||
- search_result_channel: This event dictionary contains a search result with a channel that has been found. | ||
- search_result_torrent: This event dictionary contains a search result with a torrent that has been found. | ||
""" | ||
|
||
def __init__(self, session): | ||
resource.Resource.__init__(self) | ||
self.session = session | ||
self.channel_db_handler = self.session.open_dbhandler(NTFY_CHANNELCAST) | ||
self.events_request = None | ||
self.buffer = [] | ||
|
||
self.session.add_observer(self.on_search_results_channels, SIGNAL_CHANNEL, [SIGNAL_ON_SEARCH_RESULTS]) | ||
self.session.add_observer(self.on_search_results_torrents, SIGNAL_TORRENT, [SIGNAL_ON_SEARCH_RESULTS]) | ||
|
||
def write_data(self, message): | ||
""" | ||
Write data over the event socket. If the event socket is not open, add the message to the buffer instead. | ||
""" | ||
if not self.events_request: | ||
if len(self.buffer) >= MAX_EVENTS_BUFFER_SIZE: | ||
self.buffer.pop(0) | ||
self.buffer.append(message) | ||
else: | ||
self.events_request.write(message) | ||
|
||
def on_search_results_channels(self, subject, changetype, objectID, results): | ||
""" | ||
Returns the channel search results over the events endpoint. | ||
""" | ||
for channel in results['result_list']: | ||
self.write_data(json.dumps({"type": "search_result_channel", | ||
"result": convert_db_channel_to_json(channel)}) + '\n') | ||
|
||
def on_search_results_torrents(self, subject, changetype, objectID, results): | ||
""" | ||
Returns the torrent search results over the events endpoint. | ||
""" | ||
for torrent in results['result_list']: | ||
self.write_data(json.dumps({"type": "search_result_torrent", | ||
"result": convert_db_torrent_to_json(torrent)}) + '\n') | ||
|
||
def render_GET(self, request): | ||
self.events_request = request | ||
|
||
request.write(json.dumps({"type": "events_start"})) | ||
|
||
while not len(self.buffer) == 0: | ||
request.write(self.buffer.pop(0)) | ||
|
||
return server.NOT_DONE_YET |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import json | ||
import logging | ||
|
||
from twisted.web import http, resource | ||
from Tribler.Core.Utilities.search_utils import split_into_keywords | ||
from Tribler.Core.exceptions import OperationNotEnabledByConfigurationException | ||
from Tribler.Core.simpledefs import NTFY_CHANNELCAST, NTFY_TORRENTS, SIGNAL_TORRENT, SIGNAL_ON_SEARCH_RESULTS, \ | ||
SIGNAL_CHANNEL | ||
|
||
|
||
class SearchEndpoint(resource.Resource): | ||
""" | ||
This endpoint is responsible for searching in channels and torrents present in the local Tribler database. | ||
A GET request to this endpoint will create a search. Results are returned over the events endpoint, one by one. | ||
First, the results available in the local database will be pushed. After that, incoming Dispersy results are pushed. | ||
The query to this endpoint is passed using the url, i.e. /search?q=pioneer | ||
Example response over the events endpoint: | ||
{ | ||
"type": "search_result_channel", | ||
"query": "test", | ||
"result": { | ||
"id": 3, | ||
"dispersy_cid": "da69aaad39ccf468aba2ab9177d5f8d8160135e6", | ||
"name": "My fancy channel", | ||
"description": "A description of this fancy channel", | ||
"subscribed": True, | ||
"votes": 23, | ||
"torrents": 3, | ||
"spam": 5, | ||
"modified": 14598395, | ||
} | ||
} | ||
""" | ||
|
||
def __init__(self, session): | ||
resource.Resource.__init__(self) | ||
self.session = session | ||
self.channel_db_handler = self.session.open_dbhandler(NTFY_CHANNELCAST) | ||
self.torrent_db_handler = self.session.open_dbhandler(NTFY_TORRENTS) | ||
self._logger = logging.getLogger(self.__class__.__name__) | ||
|
||
def render_GET(self, request): | ||
""" | ||
This method first fires a search query in the SearchCommunity/AllChannelCommunity to search for torrents and | ||
channels. Next, the results in the local database are queried and returned over the events endpoint. | ||
""" | ||
request.setHeader('Content-Type', 'text/json') | ||
if 'q' not in request.args: | ||
request.setResponseCode(http.BAD_REQUEST) | ||
return json.dumps({"error": "query parameter missing"}) | ||
|
||
# We first search the local database for torrents and channels | ||
keywords = split_into_keywords(unicode(request.args['q'][0])) | ||
results_local_channels = self.channel_db_handler.searchChannels(keywords) | ||
results_dict = {"keywords": keywords, "result_list": results_local_channels} | ||
self.session.notifier.notify(SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
|
||
torrent_db_columns = ['T.torrent_id', 'infohash', 'T.name', 'length', 'category', 'num_seeders', 'num_leechers'] | ||
results_local_torrents = self.torrent_db_handler.searchNames(keywords, keys=torrent_db_columns, doSort=False) | ||
results_dict = {"keywords": keywords, "result_list": results_local_torrents} | ||
self.session.notifier.notify(SIGNAL_TORRENT, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
|
||
# Create remote searches | ||
try: | ||
self.session.search_remote_torrents(keywords) | ||
self.session.search_remote_channels(keywords) | ||
except OperationNotEnabledByConfigurationException as exc: | ||
self._logger.error(exc) | ||
|
||
return json.dumps({"queried": True}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
""" | ||
This file contains some utility methods that are used by the API. | ||
""" | ||
|
||
|
||
def convert_db_channel_to_json(channel): | ||
""" | ||
This method converts a channel in the database to a JSON dictionary. | ||
""" | ||
return {"id": channel[0], "dispersy_cid": channel[1].encode('hex'), "name": channel[2], "description": channel[3], | ||
"votes": channel[5], "torrents": channel[4], "spam": channel[6], "modified": channel[8], | ||
"subscribed": (channel[7] == 2)} | ||
|
||
|
||
def convert_db_torrent_to_json(torrent): | ||
""" | ||
This method converts a torrent in the database to a JSON dictionary. | ||
""" | ||
return {"id": torrent[0], "infohash": torrent[1].encode('hex'), "name": torrent[2], "length": torrent[3], | ||
"category": torrent[4], "num_seeders": torrent[5] or 0, "num_leechers": torrent[6] or 0} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import json | ||
from twisted.internet import reactor | ||
from twisted.internet.defer import Deferred | ||
from twisted.internet.protocol import Protocol | ||
from twisted.web.client import Agent | ||
from twisted.web.http_headers import Headers | ||
from Tribler.Core.Modules.restapi import events_endpoint | ||
from Tribler.Core.Utilities.twisted_thread import deferred | ||
from Tribler.Core.simpledefs import SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, SIGNAL_TORRENT | ||
from Tribler.Core.version import version_id | ||
from Tribler.Test.Core.Modules.RestApi.base_api_test import AbstractApiTest | ||
|
||
|
||
class EventDataProtocol(Protocol): | ||
""" | ||
This class is responsible for reading the data received over the event socket. | ||
""" | ||
def __init__(self, messages_to_wait_for, finished): | ||
self.json_buffer = [] | ||
self.messages_to_wait_for = messages_to_wait_for + 1 # The first event message is always events_start | ||
self.finished = finished | ||
|
||
def dataReceived(self, data): | ||
self.json_buffer.append(json.loads(data)) | ||
self.messages_to_wait_for -= 1 | ||
if self.messages_to_wait_for == 0: | ||
self.finished.callback(self.json_buffer[1:]) | ||
|
||
|
||
class TestEventsEndpoint(AbstractApiTest): | ||
|
||
def __init__(self, *args, **kwargs): | ||
super(TestEventsEndpoint, self).__init__(*args, **kwargs) | ||
self.events_deferred = Deferred() | ||
|
||
def on_event_socket_opened(self, response): | ||
response.deliverBody(EventDataProtocol(self.messages_to_wait_for, self.events_deferred)) | ||
|
||
def open_events_socket(self): | ||
agent = Agent(reactor) | ||
return agent.request('GET', 'http://localhost:%s/events' % self.session.get_http_api_port(), | ||
Headers({'User-Agent': ['Tribler ' + version_id]}), None)\ | ||
.addCallback(self.on_event_socket_opened) | ||
|
||
@deferred(timeout=10) | ||
def test_events_buffer(self): | ||
""" | ||
Testing whether we still receive messages that are in the buffer before the event connection is opened | ||
""" | ||
def verify_delayed_message(results): | ||
self.assertEqual(results[0][u'type'], u'search_result_channel') | ||
self.assertTrue(results[0][u'result']) | ||
|
||
events_endpoint.MAX_EVENTS_BUFFER_SIZE = 1 | ||
|
||
results_dict = {"keywords": ["test"], "result_list": [('a',) * 9]} | ||
self.session.notifier.use_pool = False | ||
self.session.notifier.notify(SIGNAL_TORRENT, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
self.session.notifier.notify(SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
self.messages_to_wait_for = 1 | ||
self.open_events_socket() | ||
return self.events_deferred.addCallback(verify_delayed_message) | ||
|
||
@deferred(timeout=10) | ||
def test_search_results(self): | ||
""" | ||
Testing whether the event endpoint returns search results when we have search results available | ||
""" | ||
def verify_search_results(results): | ||
self.assertEqual(results[0][u'type'], u'search_result_channel') | ||
self.assertEqual(results[1][u'type'], u'search_result_torrent') | ||
|
||
self.assertTrue(results[0][u'result']) | ||
self.assertTrue(results[1][u'result']) | ||
|
||
def create_search_results(_): | ||
results_dict = {"keywords": ["test"], "result_list": [('a',) * 9]} | ||
self.session.notifier.use_pool = False | ||
self.session.notifier.notify(SIGNAL_CHANNEL, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
self.session.notifier.notify(SIGNAL_TORRENT, SIGNAL_ON_SEARCH_RESULTS, None, results_dict) | ||
|
||
self.messages_to_wait_for = 2 | ||
self.open_events_socket().addCallback(create_search_results) | ||
return self.events_deferred.addCallback(verify_search_results) |
Oops, something went wrong.