Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

So long rawserver #1399

Merged
merged 2 commits into from
May 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 20 additions & 84 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Written by Arno Bakker
# Updated by George Milescu
# Updated by Niels Zeilemaker
# see LICENSE.txt for license information
import binascii
import errno
Expand All @@ -14,8 +14,6 @@
from Tribler.Core.Modules.search_manager import SearchManager
from Tribler.Core.CacheDB.sqlitecachedb import forceDBThread
from Tribler.Core.DownloadConfig import DownloadStartupConfig
from Tribler.Core.RawServer.RawServer import RawServer
from Tribler.Core.ServerPortHandler import MultiHandler
from Tribler.Core.TorrentDef import TorrentDef, TorrentDefNoMetainfo
from Tribler.Core.Utilities.configparser import CallbackConfigParser
from Tribler.Core.Video.VideoPlayer import VideoPlayer
Expand All @@ -25,7 +23,7 @@
from Tribler.Core.torrentstore import TorrentStore
from Tribler.Main.globals import DefaultDownloadStartupConfig
from Tribler.dispersy.util import blockingCallFromThread, blocking_call_on_reactor_thread
from Tribler.dispersy.endpoint import RawserverEndpoint
from Tribler.Core.APIImplementation.TwistedRawServer import TwistedRawServer


try:
Expand Down Expand Up @@ -74,8 +72,6 @@ def __init__(self):

# modules
self.rawserver = None
self.multihandler = None

self.torrent_store = None
self.rtorrent_handler = None
self.tftp_handler = None
Expand Down Expand Up @@ -105,14 +101,7 @@ def register(self, session, sesslock, autoload_discovery=True):
self.session = session
self.sesslock = sesslock

self.rawserver = RawServer(self.sessdoneflag,
self.session.get_timeout_check_interval(),
self.session.get_timeout(),
ipv6_enable=self.session.get_ipv6(),
fatal_func=self.rawserver_fatalerrorfunc,
nonfatal_func=self.rawserver_nonfatalerrorfunc)

self.multihandler = MultiHandler(self.rawserver, self.sessdoneflag)
self.rawserver = TwistedRawServer()

if self.session.get_torrent_store():
self.torrent_store = TorrentStore(self.session.get_torrent_store_dir())
Expand Down Expand Up @@ -161,9 +150,10 @@ def register(self, session, sesslock, autoload_discovery=True):
self.tftp_handler = None
if self.session.get_dispersy():
from Tribler.dispersy.dispersy import Dispersy
from Tribler.dispersy.endpoint import StandaloneEndpoint

# set communication endpoint
endpoint = RawserverEndpoint(self.rawserver, self.session.get_dispersy_port(), ip=self.session.get_ip())
endpoint = StandaloneEndpoint(self.session.get_dispersy_port(), ip=self.session.get_ip())

working_directory = unicode(self.session.get_state_dir())
self.dispersy = Dispersy(endpoint, working_directory)
Expand Down Expand Up @@ -252,6 +242,8 @@ def load_communities():
if self.rtorrent_handler:
self.rtorrent_handler.initialize()

self.start_upnp()

self.initComplete = True

def add(self, tdef, dscfg, pstate=None, initialdlstatus=None, setupDelay=0, hidden=False):
Expand Down Expand Up @@ -404,47 +396,12 @@ def update_trackers_db(infohash, new_trackers):
# Update collected torrents
self.rtorrent_handler._save_torrent(new_def)

def rawserver_fatalerrorfunc(self, e):
""" Called by network thread """
self._logger.debug("tlm: RawServer fatal error func called : %s", e)
print_exc()

def rawserver_nonfatalerrorfunc(self, e):
""" Called by network thread """
self._logger.debug("tlm: RawServer non fatal error func called: %s", e)
print_exc()
# Could log this somewhere, or phase it out

def _run(self):
""" Called only once by network thread """

try:
try:
self.start_upnp()
self.multihandler.listen_forever()
except:
print_exc()
finally:
self.stop_upnp()
self.rawserver.shutdown()

#
# State retrieval
#
def set_download_states_callback(self, usercallback, getpeerlist, when=0.0):
""" Called by any thread """
self.sesslock.acquire()
try:
# Even if the list of Downloads changes in the mean time this is
# no problem. For removals, dllist will still hold a pointer to the
# Download, and additions are no problem (just won't be included
# in list of states returned via callback.
#
dllist = self.downloads.values()
finally:
self.sesslock.release()

for d in dllist:
for d in self.downloads.values():
# Arno, 2012-05-23: At Niels' request to get total transferred
# stats. Causes MOREINFO message to be sent from swift proc
# for every initiated dl.
Expand All @@ -457,19 +414,8 @@ def set_download_states_callback(self, usercallback, getpeerlist, when=0.0):

def network_set_download_states_callback(self, usercallback):
""" Called by network thread """
self.sesslock.acquire()
try:
# Even if the list of Downloads changes in the mean time this is
# no problem. For removals, dllist will still hold a pointer to the
# Download, and additions are no problem (just won't be included
# in list of states returned via callback.
#
dllist = self.downloads.values()
finally:
self.sesslock.release()

dslist = []
for d in dllist:
for d in self.downloads.values():
try:
ds = d.network_get_state(None, False, sessioncalling=True)
dslist.append(ds)
Expand Down Expand Up @@ -517,14 +463,12 @@ def load_checkpoint(self, initialdlstatus=None, initialdlstatus_dict={}):
def load_download_pstate_noexc(self, infohash):
""" Called by any thread, assume sesslock already held """
try:
dir = self.session.get_downloads_pstate_dir()
basename = binascii.hexlify(infohash) + '.state'
filename = os.path.join(dir, basename)
return self.load_download_pstate(filename)
except Exception as e:
# TODO: remove saved checkpoint?
# self.rawserver_nonfatalerrorfunc(e)
return None

except Exception:
self._logger.exception("Exception while loading pstate: %s", infohash)

def resume_download(self, filename, initialdlstatus=None, initialdlstatus_dict={}, setupDelay=0):
tdef = dscfg = pstate = None
Expand Down Expand Up @@ -583,7 +527,7 @@ def resume_download(self, filename, initialdlstatus=None, initialdlstatus_dict={
self._logger.info("tlm: not resuming checkpoint because download has already been added")

except Exception as e:
self.rawserver_nonfatalerrorfunc(e)
self._logger.exception("tlm: load check_point: exception while adding download %s", tdef)
else:
self._logger.info("tlm: removing checkpoint %s destdir is %s", filename, dscfg.get_dest_dir())
os.remove(filename)
Expand Down Expand Up @@ -622,8 +566,9 @@ def network_checkpoint_callback(self, dllist, stop, checkpoint, gracetime):
self._logger.debug("tlm: network checkpointing: %s %s", d.get_def().get_name(), pstate)

self.save_download_pstate(infohash, pstate)

except Exception as e:
self.rawserver_nonfatalerrorfunc(e)
self._logger.exception("Exception while checkpointing: %s", d.get_def().get_name())

if stop:
# Some grace time for early shutdown tasks
Expand Down Expand Up @@ -708,6 +653,8 @@ def early_shutdown(self):
mainlineDHT.deinit(self.mainline_dht)
self.mainline_dht = None

self.stop_upnp()

def network_shutdown(self):
try:
self._logger.info("tlm: network_shutdown")
Expand All @@ -729,6 +676,9 @@ def network_shutdown(self):
if self.ltmgr:
self.ltmgr.shutdown()

if self.rawserver:
self.rawserver.cancel_all_pending_tasks()

def save_download_pstate(self, infohash, pstate):
""" Called by network thread """
basename = binascii.hexlify(infohash) + '.state'
Expand All @@ -743,20 +693,6 @@ def load_download_pstate(self, filename):
pstate.read_file(filename)
return pstate

def run(self):
if prctlimported:
prctl.set_name("Tribler" + currentThread().getName())

if PROFILE:
fname = "profile-%s" % self.getName()
import cProfile
cProfile.runctx("self._run()", globals(), locals(), filename=fname)
import pstats
self._logger.info("profile: data for %s", self.getName())
pstats.Stats(fname, stream=sys.stderr).sort_stats("cumulative").print_stats(20)
else:
self._run()

def start_upnp(self):
if self.ltmgr:
self.set_activity(NTFY_ACT_UPNP)
Expand Down
17 changes: 17 additions & 0 deletions Tribler/Core/APIImplementation/TwistedRawServer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from Tribler.dispersy.taskmanager import TaskManager
from twisted.internet import reactor
from threading import RLock

class TwistedRawServer(TaskManager):

def __init__(self):
super(TwistedRawServer, self).__init__()
self._auto_counter = 0
self._lock = RLock()

def add_task(self, wrapper, delay):
with self._lock:
self._auto_counter += 1

task_name = "twisted_rawserver %d" % self._auto_counter
reactor.callFromThread(lambda: self.register_task(task_name, reactor.callLater(delay, wrapper)))
6 changes: 0 additions & 6 deletions Tribler/Core/CacheDB/SqliteCacheDBHandler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Written by Jie Yang
# Modified by George Milescu
# see LICENSE.txt for license information
# Note for Developers: Please write a unittest in Tribler/Test/test_sqlitecachedbhandler.py
# for any function you add to database.
Expand Down Expand Up @@ -29,11 +28,6 @@
from Tribler.Core.Utilities.tracker_utils import get_uniformed_tracker_url


try:
WindowsError
except NameError:
WindowsError = Exception

SHOW_ERROR = False

VOTECAST_FLUSH_DB_INTERVAL = 15
Expand Down
19 changes: 9 additions & 10 deletions Tribler/Core/DecentralizedTracking/mainlineDHT.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@
DEBUG = False

DHT_IMPORTED = False
if sys.version.split()[0] >= '2.5':
try:
import Tribler.Core.DecentralizedTracking.pymdht.core.pymdht as pymdht
import Tribler.Core.DecentralizedTracking.pymdht.core.node as node
import Tribler.Core.DecentralizedTracking.pymdht.plugins.routing_nice_rtt as routing_mod
import Tribler.Core.DecentralizedTracking.pymdht.plugins.lookup_a4 as lookup_mod
import Tribler.Core.DecentralizedTracking.pymdht.core.exp_plugin_template as experimental_m_mod
DHT_IMPORTED = True
except ImportError:
logger.exception(u"Could not import pymdht")
try:
import Tribler.Core.DecentralizedTracking.pymdht.core.pymdht as pymdht
import Tribler.Core.DecentralizedTracking.pymdht.core.node as node
import Tribler.Core.DecentralizedTracking.pymdht.plugins.routing_nice_rtt as routing_mod
import Tribler.Core.DecentralizedTracking.pymdht.plugins.lookup_a4 as lookup_mod
import Tribler.Core.DecentralizedTracking.pymdht.core.exp_plugin_template as experimental_m_mod
DHT_IMPORTED = True
except ImportError:
logger.exception(u"Could not import pymdht")


def init(addr, conf_path):
Expand Down
1 change: 0 additions & 1 deletion Tribler/Core/DownloadConfig.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Written by Arno Bakker
# Updated by George Milescu
# Updated by Egbert Bouman, now using ConfigParser
# see LICENSE.txt for license information

Expand Down
1 change: 0 additions & 1 deletion Tribler/Core/DownloadState.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Written by Arno Bakker
# Updated by George Milescu
# see LICENSE.txt for license information
""" Contains a snapshot of the state of the Download at a specific point in time. """
import logging
Expand Down
34 changes: 0 additions & 34 deletions Tribler/Core/MessageID.py

This file was deleted.

Loading