diff --git a/Tribler/Main/vwxGUI/DispersyDebugFrame.py b/Tribler/Main/vwxGUI/DispersyDebugFrame.py index 04a582bc6ff..6e193c94ce3 100644 --- a/Tribler/Main/vwxGUI/DispersyDebugFrame.py +++ b/Tribler/Main/vwxGUI/DispersyDebugFrame.py @@ -8,7 +8,7 @@ from Tribler.Main.Utility.GuiDBHandler import startWorker, GUI_PRI_DISPERSY from Tribler.Main.Utility.utility import compute_ratio, eta_value, size_format from operator import itemgetter -from Tribler.dispersy.statistics import BartercastStatisticTypes +from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics DATA_NONE = "" @@ -135,7 +135,7 @@ def do_gui(delayedResult): self.__community_panel.UpdateInfo(stats) self.__rawinfo_panel.UpdateInfo(stats) self.__runtime_panel.UpdateInfo(stats) - self.__sharedstatistics_panel.UpdateInfo(stats) + self.__sharedstatistics_panel.UpdateInfo(_barter_statistics) self.Layout() startWorker(do_gui, do_db, uId=u"DispersyDebugFrame_UpdateInfo", priority=GUI_PRI_DISPERSY) diff --git a/Tribler/community/bartercast4/community.py b/Tribler/community/bartercast4/community.py index a1567c3dc81..96c8ec921d7 100644 --- a/Tribler/community/bartercast4/community.py +++ b/Tribler/community/bartercast4/community.py @@ -9,10 +9,12 @@ from Tribler.dispersy.distribution import DirectDistribution from Tribler.dispersy.message import Message, DelayMessageByProof from Tribler.dispersy.resolution import PublicResolution -from Tribler.dispersy.statistics import BartercastStatisticTypes +from .statistics import BartercastStatisticTypes, _barter_statistics from twisted.internet.task import LoopingCall from twisted.python import log +BARTERCAST_PERSIST_INTERVAL = 120.0 + class BarterCommunity(Community): @classmethod @@ -37,6 +39,16 @@ def __init__(self, dispersy, master, my_member): self._dispersy = dispersy # self._logger = logging.getLogger(self.__class__.__name__) log.msg("joined BC community") + self.init_database() + + # add task for persisting bartercast statistics every BARTERCAST_PERSIST_INTERVAL seconds + self._logger.debug("bartercast persist task started") + self.register_task("bartercast persist", + LoopingCall(self.backup_bartercast_statistics)).start(BARTERCAST_PERSIST_INTERVAL, now=False) + + def init_database(self): + log.msg("loading BC statistics from database") + _barter_statistics.load_statistics(self._dispersy) def initiate_meta_messages(self): return super(BarterCommunity, self).initiate_meta_messages() + [ @@ -105,7 +117,7 @@ def on_stats_request(self, messages): def create_stats_response(self, stats_type, candidate): log.msg("OUT: stats-response") meta = self.get_meta_message(u"stats-response") - records = self._dispersy._statistics.get_top_n_bartercast_statistics(stats_type, 5) + records = _barter_statistics.get_top_n_bartercast_statistics(stats_type, 5) log.msg("sending stats for type %d: %s" % (stats_type, records)) message = meta.impl(authentication=(self._my_member,), @@ -128,11 +140,16 @@ def on_stats_response(self, messages): log.msg("stats-response: %s %s %s" % (message._distribution.global_time, message.payload.stats_type, message.payload.records)) for r in message.payload.records: - self._dispersy._statistics.log_interaction(self._dispersy, + _barter_statistics.log_interaction(self._dispersy, message.payload.stats_type, message.authentication.member.mid.encode('hex'), r[0], int(r[1].encode('hex'), 16)) + # bartercast accounting stuff + def backup_bartercast_statistics(self): + self._logger.debug("merging bartercast statistics") + _barter_statistics.persist(self, 1) + class BarterCommunityCrawler(BarterCommunity): diff --git a/Tribler/community/bartercast4/statistics.py b/Tribler/community/bartercast4/statistics.py new file mode 100644 index 00000000000..83fc3e06b34 --- /dev/null +++ b/Tribler/community/bartercast4/statistics.py @@ -0,0 +1,214 @@ +from Tribler.dispersy.database import Database +import random +from operator import itemgetter +from collections import defaultdict +from os import path +from threading import RLock +import logging + + +class BarterStatistics(object): + def __init__(self): + self.db = None + self._db_counter = dict() + self._lock = RLock() + self.bartercast = defaultdict() + self._logger = logging.getLogger(self.__class__.__name__) + + def dict_inc_bartercast(self, stats_type, peer, value=1): + if not hasattr(self, "bartercast"): + self._logger.error(u"bartercast doesn't exist in statistics") + with self._lock: + assert stats_type < len(self.bartercast), u"%s doesn't exist in bartercast statistics" % stats_type + if peer not in self.bartercast[stats_type]: + self.bartercast[stats_type][peer] = value + else: + self.bartercast[stats_type][peer] += value + + def get_top_n_bartercast_statistics(self, key, n): + """ + Returns top n-n/2 barter cast statistics, +n/2 randomly selected statistics from the rest of the list. + The randomly selected statistics are added to make sure that less active peers get a chance to appear in the rankings + as well. + @TODO check if random portion should be larger or smaller + """ + with self._lock: + # shouldn't happen but dont crash the program when bartercast statistics are not available + if not hasattr(self, "bartercast"): + self._logger.error(u"bartercast doesn't exist in statistics") + return [] + if not key in getattr(self, "bartercast").keys(): + self._logger.error(u"%s doesn't exist in bartercast statistics" % key) + return [] + d = getattr(self, "bartercast")[key] + if d is not None: + random_n = n / 2 + fixed_n = n - random_n + sorted_list = sorted(d.items(), key=itemgetter(1), reverse=True) + top_stats = sorted_list[0:fixed_n] + self._logger.error("len d: %d, fixed_n: %d" % (len(d), fixed_n)) + if len(d) <= fixed_n: + random_stats = [] + else: + random_stats = random.sample(sorted_list[fixed_n:len(d)], min(random_n, len(d) - fixed_n)) + return top_stats + random_stats + return None + + def log_interaction(self, dispersy, type, peer1, peer2, value): + """ + Add statistic for interactions between peer1 and peer2 to the interaction log. + """ + self._init_database(dispersy) + self.db.execute(u"INSERT INTO interaction_log (peer1, peer2, type, value, date) values (?, ?, ?, ?, strftime('%s', 'now'))", (unicode(peer1), unicode(peer2), type, value)) + + def persist(self, dispersy, key, n=1): + """ + Persists the statistical data with name 'key' in the statistics database. + Note: performs the database update for every n-th call. This is to somewhat control the number of + writes for statistics that are updated often. + """ + if not self.should_persist(key, n): + return + + self._init_database(dispersy) +# pickle_object = cPickle.dumps(data) + self._logger.debug("persisting bc data") +# self.db.execute(u"INSERT OR REPLACE INTO statistic (name, object) values (?, ?)", (unicode(key), unicode(pickle_object))) + for t in self.bartercast: + for peer in self.bartercast[t]: + self.db.execute(u"INSERT OR REPLACE INTO statistic (type, peer, value) values (?, ?, ?)", (t, unicode(peer), self.bartercast[t][peer])) + self._logger.debug("data persisted") + + def load_statistics(self, dispersy): + self._init_database(dispersy) + data = self.db.execute(u"SELECT type, peer, value FROM statistic") + statistics = defaultdict() + for t in BartercastStatisticTypes.reverse_mapping: + statistics[t] = defaultdict() + for row in data: + t = row[0] + peer = row[1] + value = row[2] + if not t in statistics: + statistics[t] = defaultdict() + statistics[t][peer] = value + print statistics + self.bartercast = statistics + return statistics + +# data = self.db.execute(u"SELECT object FROM statistic WHERE name = ? LIMIT 1", [unicode(key)]) +# for row in data: +# return cPickle.loads(str(row[0])) +# return defaultdict() + + def _init_database(self, dispersy): + if self.db is None: + self.db = StatisticsDatabase(dispersy) + self.db.open() + + def should_persist(self, key, n): + """ + Return true and reset counter for key iff the data should be persisted (for every n calls). + Otherwise increases the counter for key. + """ + if key not in self._db_counter: + self._db_counter[key] = 1 + else: + self._db_counter[key] = self._db_counter[key] + 1 + if n <= self._db_counter[key]: + self._db_counter[key] = 0 + return True + return False + +LATEST_VERSION = 1 + +# old +# -- statistic contains a dump of the pickle object of a statistic. Mainly used to backup bartercast statistics. +# CREATE TABLE statistic( +# id INTEGER, -- primary key +# name TEXT, -- name of the statistic +# object TEXT, -- pickle object representing the statistic +# PRIMARY KEY (id), +# UNIQUE (name)); + +schema = u""" +CREATE TABLE statistic( + id INTEGER, -- primary key + type INTEGER, -- type of interaction + peer TEXT, + value INTEGER, + PRIMARY KEY (id), + UNIQUE (type, peer)); + +CREATE TABLE option(key TEXT PRIMARY KEY, value BLOB); +INSERT INTO option(key, value) VALUES('database_version', '""" + str(LATEST_VERSION) + """'); + +CREATE TABLE interaction_log( + id INTEGER, -- primary key + peer1 TEXT, + peer2 TEXT, + type INTEGER, -- type of interaction + value INTEGER, + date INTEGER, + PRIMARY KEY (id)); +""" + +cleanup = u""" +DELETE FROM statistic; +""" + + +class StatisticsDatabase(Database): + if __debug__: + __doc__ = schema + + def __init__(self, dispersy): + self._dispersy = dispersy + super(StatisticsDatabase, self).__init__(path.join(dispersy.working_directory, u"sqlite", u"statistics.db")) + + def open(self): + self._dispersy.database.attach_commit_callback(self.commit) + return super(StatisticsDatabase, self).open() + + def close(self, commit=True): + self._dispersy.database.detach_commit_callback(self.commit) + return super(StatisticsDatabase, self).close(commit) + + def cleanup(self): + self.executescript(cleanup) + + def check_database(self, database_version): + assert isinstance(database_version, unicode) + assert database_version.isdigit() + assert int(database_version) >= 0 + database_version = int(database_version) + + # setup new database with current database_version + if database_version < 1: + self.executescript(schema) + self.commit() + + else: + # upgrade to version 2 + if database_version < 2: + # there is no version 2 yet... + # if __debug__: dprint("upgrade database ", database_version, " -> ", 2) + # self.executescript(u"""UPDATE option SET value = '2' WHERE key = 'database_version';""") + # self.commit() + # if __debug__: dprint("upgrade database ", database_version, " -> ", 2, " (done)") + pass + + return LATEST_VERSION + + +def enum(*sequential, **named): + enums = dict(zip(sequential, range(len(sequential))), **named) + reverse = dict((value, key) for key, value in enums.iteritems()) + enums['reverse_mapping'] = reverse + return type('Enum', (), enums) + +BartercastStatisticTypes = enum(TORRENTS_RECEIVED=1, TUNNELS_CREATED=2, \ + TUNNELS_BYTES_SENT=3, TUNNELS_RELAY_BYTES_SENT=4, TUNNELS_EXIT_BYTES_SENT=5, \ + TUNNELS_BYTES_RECEIVED=6, TUNNELS_RELAY_BYTES_RECEIVED=7, TUNNELS_EXIT_BYTES_RECEIVED=8) + +_barter_statistics = BarterStatistics() diff --git a/Tribler/community/channel/community.py b/Tribler/community/channel/community.py index 9d45faac168..62ffafc2f52 100644 --- a/Tribler/community/channel/community.py +++ b/Tribler/community/channel/community.py @@ -24,15 +24,10 @@ PlaylistTorrentPayload, MissingChannelPayload, MarkTorrentPayload) from twisted.python.log import msg from twisted.internet.task import LoopingCall -from Tribler.dispersy.statistics import BartercastStatisticTypes - -if __debug__: - from Tribler.dispersy.tool.lencoder import log - +from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics logger = logging.getLogger(__name__) - def warnDispersyThread(func): def invoke_func(*args, **kwargs): if not isInIOThread(): @@ -354,13 +349,13 @@ def _disp_on_torrent(self, messages): peer_id = self._peer_db.addOrGetPeerID(authentication_member.public_key) torrentlist.append((self._channel_id, dispersy_id, peer_id, message.payload.infohash, message.payload.timestamp, message.payload.name, message.payload.files, message.payload.trackers)) - self._statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex')) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex')) self._channelcast_db.on_torrents_from_dispersy(torrentlist) else: for message in messages: self._channelcast_db.newTorrent(message) self._logger.debug("torrent received: %s on channel: %s" % (message.payload.infohash, self._master_member)) - self._statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex')) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex')) def _disp_undo_torrent(self, descriptors, redo=False): for _, _, packet in descriptors: diff --git a/Tribler/community/tunnel/tunnel_community.py b/Tribler/community/tunnel/tunnel_community.py index 255a4f87de1..ecfbe5a7958 100644 --- a/Tribler/community/tunnel/tunnel_community.py +++ b/Tribler/community/tunnel/tunnel_community.py @@ -37,7 +37,7 @@ from Tribler.dispersy.resolution import PublicResolution from Tribler.dispersy.util import call_on_reactor_thread from Tribler.dispersy.requestcache import NumberCache, RandomNumberCache -from Tribler.dispersy.statistics import BartercastStatisticTypes +from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics class CircuitRequestCache(NumberCache): @@ -1063,26 +1063,26 @@ def increase_bytes_sent(self, obj, num_bytes): if isinstance(obj, Circuit): obj.bytes_up += num_bytes self.stats['bytes_up'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_BYTES_SENT, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_BYTES_SENT, obj.mid, num_bytes) elif isinstance(obj, RelayRoute): obj.bytes_up += num_bytes self.stats['bytes_relay_up'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_SENT, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_SENT, obj.mid, num_bytes) elif isinstance(obj, TunnelExitSocket): obj.bytes_up += num_bytes self.stats['bytes_exit'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_SENT, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_SENT, obj.mid, num_bytes) def increase_bytes_received(self, obj, num_bytes): if isinstance(obj, Circuit): obj.bytes_down += num_bytes self.stats['bytes_down'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_BYTES_RECEIVED, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_BYTES_RECEIVED, obj.mid, num_bytes) elif isinstance(obj, RelayRoute): obj.bytes_down += num_bytes self.stats['bytes_relay_down'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_RECEIVED, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_RECEIVED, obj.mid, num_bytes) elif isinstance(obj, TunnelExitSocket): obj.bytes_down += num_bytes self.stats['bytes_enter'] += num_bytes - self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_RECEIVED, obj.mid, num_bytes) + _barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_RECEIVED, obj.mid, num_bytes) diff --git a/Tribler/twisted/plugins/bartercast_crawler_plugin.py b/Tribler/twisted/plugins/bartercast_crawler_plugin.py index 92642876b6a..120b1d150b1 100644 --- a/Tribler/twisted/plugins/bartercast_crawler_plugin.py +++ b/Tribler/twisted/plugins/bartercast_crawler_plugin.py @@ -25,7 +25,7 @@ class BartercastCrawler(Dispersy): def __init__(self, endpoint, working_directory, silent=False, crypto=NoVerifyCrypto()): - super(BartercastCrawler, self).__init__(endpoint, working_directory, u":memory:", crypto, load_bartercast=True) + super(BartercastCrawler, self).__init__(endpoint, working_directory, u":memory:", crypto) # location of persistent storage self._persistent_storage_filename = os.path.join(working_directory, "persistent-storage.data")