Skip to content

Commit

Permalink
refactored bartercast stats into community
Browse files Browse the repository at this point in the history
  • Loading branch information
CP committed Feb 17, 2015
1 parent fdfe3b6 commit 5bdfe72
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 21 deletions.
4 changes: 2 additions & 2 deletions Tribler/Main/vwxGUI/DispersyDebugFrame.py
Expand Up @@ -8,7 +8,7 @@
from Tribler.Main.Utility.GuiDBHandler import startWorker, GUI_PRI_DISPERSY
from Tribler.Main.Utility.utility import compute_ratio, eta_value, size_format
from operator import itemgetter
from Tribler.dispersy.statistics import BartercastStatisticTypes
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics

DATA_NONE = ""

Expand Down Expand Up @@ -135,7 +135,7 @@ def do_gui(delayedResult):
self.__community_panel.UpdateInfo(stats)
self.__rawinfo_panel.UpdateInfo(stats)
self.__runtime_panel.UpdateInfo(stats)
self.__sharedstatistics_panel.UpdateInfo(stats)
self.__sharedstatistics_panel.UpdateInfo(_barter_statistics)
self.Layout()

startWorker(do_gui, do_db, uId=u"DispersyDebugFrame_UpdateInfo", priority=GUI_PRI_DISPERSY)
Expand Down
23 changes: 20 additions & 3 deletions Tribler/community/bartercast4/community.py
Expand Up @@ -9,10 +9,12 @@
from Tribler.dispersy.distribution import DirectDistribution
from Tribler.dispersy.message import Message, DelayMessageByProof
from Tribler.dispersy.resolution import PublicResolution
from Tribler.dispersy.statistics import BartercastStatisticTypes
from .statistics import BartercastStatisticTypes, _barter_statistics
from twisted.internet.task import LoopingCall
from twisted.python import log

BARTERCAST_PERSIST_INTERVAL = 120.0


class BarterCommunity(Community):
@classmethod
Expand All @@ -37,6 +39,16 @@ def __init__(self, dispersy, master, my_member):
self._dispersy = dispersy
# self._logger = logging.getLogger(self.__class__.__name__)
log.msg("joined BC community")
self.init_database()

# add task for persisting bartercast statistics every BARTERCAST_PERSIST_INTERVAL seconds
self._logger.debug("bartercast persist task started")
self.register_task("bartercast persist",
LoopingCall(self.backup_bartercast_statistics)).start(BARTERCAST_PERSIST_INTERVAL, now=False)

def init_database(self):
log.msg("loading BC statistics from database")
_barter_statistics.load_statistics(self._dispersy)

def initiate_meta_messages(self):
return super(BarterCommunity, self).initiate_meta_messages() + [
Expand Down Expand Up @@ -105,7 +117,7 @@ def on_stats_request(self, messages):
def create_stats_response(self, stats_type, candidate):
log.msg("OUT: stats-response")
meta = self.get_meta_message(u"stats-response")
records = self._dispersy._statistics.get_top_n_bartercast_statistics(stats_type, 5)
records = _barter_statistics.get_top_n_bartercast_statistics(stats_type, 5)
log.msg("sending stats for type %d: %s" % (stats_type, records))

message = meta.impl(authentication=(self._my_member,),
Expand All @@ -128,11 +140,16 @@ def on_stats_response(self, messages):
log.msg("stats-response: %s %s %s"
% (message._distribution.global_time, message.payload.stats_type, message.payload.records))
for r in message.payload.records:
self._dispersy._statistics.log_interaction(self._dispersy,
_barter_statistics.log_interaction(self._dispersy,
message.payload.stats_type,
message.authentication.member.mid.encode('hex'),
r[0], int(r[1].encode('hex'), 16))

# bartercast accounting stuff
def backup_bartercast_statistics(self):
self._logger.debug("merging bartercast statistics")
_barter_statistics.persist(self, 1)


class BarterCommunityCrawler(BarterCommunity):

Expand Down
214 changes: 214 additions & 0 deletions Tribler/community/bartercast4/statistics.py
@@ -0,0 +1,214 @@
from Tribler.dispersy.database import Database
import random
from operator import itemgetter
from collections import defaultdict
from os import path
from threading import RLock
import logging


class BarterStatistics(object):
def __init__(self):
self.db = None
self._db_counter = dict()
self._lock = RLock()
self.bartercast = defaultdict()
self._logger = logging.getLogger(self.__class__.__name__)

def dict_inc_bartercast(self, stats_type, peer, value=1):
if not hasattr(self, "bartercast"):
self._logger.error(u"bartercast doesn't exist in statistics")
with self._lock:
assert stats_type < len(self.bartercast), u"%s doesn't exist in bartercast statistics" % stats_type
if peer not in self.bartercast[stats_type]:
self.bartercast[stats_type][peer] = value
else:
self.bartercast[stats_type][peer] += value

def get_top_n_bartercast_statistics(self, key, n):
"""
Returns top n-n/2 barter cast statistics, +n/2 randomly selected statistics from the rest of the list.
The randomly selected statistics are added to make sure that less active peers get a chance to appear in the rankings
as well.
@TODO check if random portion should be larger or smaller
"""
with self._lock:
# shouldn't happen but dont crash the program when bartercast statistics are not available
if not hasattr(self, "bartercast"):
self._logger.error(u"bartercast doesn't exist in statistics")
return []
if not key in getattr(self, "bartercast").keys():
self._logger.error(u"%s doesn't exist in bartercast statistics" % key)
return []
d = getattr(self, "bartercast")[key]
if d is not None:
random_n = n / 2
fixed_n = n - random_n
sorted_list = sorted(d.items(), key=itemgetter(1), reverse=True)
top_stats = sorted_list[0:fixed_n]
self._logger.error("len d: %d, fixed_n: %d" % (len(d), fixed_n))
if len(d) <= fixed_n:
random_stats = []
else:
random_stats = random.sample(sorted_list[fixed_n:len(d)], min(random_n, len(d) - fixed_n))
return top_stats + random_stats
return None

def log_interaction(self, dispersy, type, peer1, peer2, value):
"""
Add statistic for interactions between peer1 and peer2 to the interaction log.
"""
self._init_database(dispersy)
self.db.execute(u"INSERT INTO interaction_log (peer1, peer2, type, value, date) values (?, ?, ?, ?, strftime('%s', 'now'))", (unicode(peer1), unicode(peer2), type, value))

def persist(self, dispersy, key, n=1):
"""
Persists the statistical data with name 'key' in the statistics database.
Note: performs the database update for every n-th call. This is to somewhat control the number of
writes for statistics that are updated often.
"""
if not self.should_persist(key, n):
return

self._init_database(dispersy)
# pickle_object = cPickle.dumps(data)
self._logger.debug("persisting bc data")
# self.db.execute(u"INSERT OR REPLACE INTO statistic (name, object) values (?, ?)", (unicode(key), unicode(pickle_object)))
for t in self.bartercast:
for peer in self.bartercast[t]:
self.db.execute(u"INSERT OR REPLACE INTO statistic (type, peer, value) values (?, ?, ?)", (t, unicode(peer), self.bartercast[t][peer]))
self._logger.debug("data persisted")

def load_statistics(self, dispersy):
self._init_database(dispersy)
data = self.db.execute(u"SELECT type, peer, value FROM statistic")
statistics = defaultdict()
for t in BartercastStatisticTypes.reverse_mapping:
statistics[t] = defaultdict()
for row in data:
t = row[0]
peer = row[1]
value = row[2]
if not t in statistics:
statistics[t] = defaultdict()
statistics[t][peer] = value
print statistics
self.bartercast = statistics
return statistics

# data = self.db.execute(u"SELECT object FROM statistic WHERE name = ? LIMIT 1", [unicode(key)])
# for row in data:
# return cPickle.loads(str(row[0]))
# return defaultdict()

def _init_database(self, dispersy):
if self.db is None:
self.db = StatisticsDatabase(dispersy)
self.db.open()

def should_persist(self, key, n):
"""
Return true and reset counter for key iff the data should be persisted (for every n calls).
Otherwise increases the counter for key.
"""
if key not in self._db_counter:
self._db_counter[key] = 1
else:
self._db_counter[key] = self._db_counter[key] + 1
if n <= self._db_counter[key]:
self._db_counter[key] = 0
return True
return False

LATEST_VERSION = 1

# old
# -- statistic contains a dump of the pickle object of a statistic. Mainly used to backup bartercast statistics.
# CREATE TABLE statistic(
# id INTEGER, -- primary key
# name TEXT, -- name of the statistic
# object TEXT, -- pickle object representing the statistic
# PRIMARY KEY (id),
# UNIQUE (name));

schema = u"""
CREATE TABLE statistic(
id INTEGER, -- primary key
type INTEGER, -- type of interaction
peer TEXT,
value INTEGER,
PRIMARY KEY (id),
UNIQUE (type, peer));
CREATE TABLE option(key TEXT PRIMARY KEY, value BLOB);
INSERT INTO option(key, value) VALUES('database_version', '""" + str(LATEST_VERSION) + """');
CREATE TABLE interaction_log(
id INTEGER, -- primary key
peer1 TEXT,
peer2 TEXT,
type INTEGER, -- type of interaction
value INTEGER,
date INTEGER,
PRIMARY KEY (id));
"""

cleanup = u"""
DELETE FROM statistic;
"""


class StatisticsDatabase(Database):
if __debug__:
__doc__ = schema

def __init__(self, dispersy):
self._dispersy = dispersy
super(StatisticsDatabase, self).__init__(path.join(dispersy.working_directory, u"sqlite", u"statistics.db"))

def open(self):
self._dispersy.database.attach_commit_callback(self.commit)
return super(StatisticsDatabase, self).open()

def close(self, commit=True):
self._dispersy.database.detach_commit_callback(self.commit)
return super(StatisticsDatabase, self).close(commit)

def cleanup(self):
self.executescript(cleanup)

def check_database(self, database_version):
assert isinstance(database_version, unicode)
assert database_version.isdigit()
assert int(database_version) >= 0
database_version = int(database_version)

# setup new database with current database_version
if database_version < 1:
self.executescript(schema)
self.commit()

else:
# upgrade to version 2
if database_version < 2:
# there is no version 2 yet...
# if __debug__: dprint("upgrade database ", database_version, " -> ", 2)
# self.executescript(u"""UPDATE option SET value = '2' WHERE key = 'database_version';""")
# self.commit()
# if __debug__: dprint("upgrade database ", database_version, " -> ", 2, " (done)")
pass

return LATEST_VERSION


def enum(*sequential, **named):
enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = dict((value, key) for key, value in enums.iteritems())
enums['reverse_mapping'] = reverse
return type('Enum', (), enums)

BartercastStatisticTypes = enum(TORRENTS_RECEIVED=1, TUNNELS_CREATED=2, \
TUNNELS_BYTES_SENT=3, TUNNELS_RELAY_BYTES_SENT=4, TUNNELS_EXIT_BYTES_SENT=5, \
TUNNELS_BYTES_RECEIVED=6, TUNNELS_RELAY_BYTES_RECEIVED=7, TUNNELS_EXIT_BYTES_RECEIVED=8)

_barter_statistics = BarterStatistics()
11 changes: 3 additions & 8 deletions Tribler/community/channel/community.py
Expand Up @@ -24,15 +24,10 @@
PlaylistTorrentPayload, MissingChannelPayload, MarkTorrentPayload)
from twisted.python.log import msg
from twisted.internet.task import LoopingCall
from Tribler.dispersy.statistics import BartercastStatisticTypes

if __debug__:
from Tribler.dispersy.tool.lencoder import log

from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics

logger = logging.getLogger(__name__)


def warnDispersyThread(func):
def invoke_func(*args, **kwargs):
if not isInIOThread():
Expand Down Expand Up @@ -354,13 +349,13 @@ def _disp_on_torrent(self, messages):
peer_id = self._peer_db.addOrGetPeerID(authentication_member.public_key)

torrentlist.append((self._channel_id, dispersy_id, peer_id, message.payload.infohash, message.payload.timestamp, message.payload.name, message.payload.files, message.payload.trackers))
self._statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex'))
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex'))
self._channelcast_db.on_torrents_from_dispersy(torrentlist)
else:
for message in messages:
self._channelcast_db.newTorrent(message)
self._logger.debug("torrent received: %s on channel: %s" % (message.payload.infohash, self._master_member))
self._statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex'))
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TORRENTS_RECEIVED, message.authentication.member.mid.encode('hex'))

def _disp_undo_torrent(self, descriptors, redo=False):
for _, _, packet in descriptors:
Expand Down
14 changes: 7 additions & 7 deletions Tribler/community/tunnel/tunnel_community.py
Expand Up @@ -37,7 +37,7 @@
from Tribler.dispersy.resolution import PublicResolution
from Tribler.dispersy.util import call_on_reactor_thread
from Tribler.dispersy.requestcache import NumberCache, RandomNumberCache
from Tribler.dispersy.statistics import BartercastStatisticTypes
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics


class CircuitRequestCache(NumberCache):
Expand Down Expand Up @@ -1063,26 +1063,26 @@ def increase_bytes_sent(self, obj, num_bytes):
if isinstance(obj, Circuit):
obj.bytes_up += num_bytes
self.stats['bytes_up'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_BYTES_SENT, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_BYTES_SENT, obj.mid, num_bytes)
elif isinstance(obj, RelayRoute):
obj.bytes_up += num_bytes
self.stats['bytes_relay_up'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_SENT, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_SENT, obj.mid, num_bytes)
elif isinstance(obj, TunnelExitSocket):
obj.bytes_up += num_bytes
self.stats['bytes_exit'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_SENT, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_SENT, obj.mid, num_bytes)

def increase_bytes_received(self, obj, num_bytes):
if isinstance(obj, Circuit):
obj.bytes_down += num_bytes
self.stats['bytes_down'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_BYTES_RECEIVED, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_BYTES_RECEIVED, obj.mid, num_bytes)
elif isinstance(obj, RelayRoute):
obj.bytes_down += num_bytes
self.stats['bytes_relay_down'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_RECEIVED, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_RELAY_BYTES_RECEIVED, obj.mid, num_bytes)
elif isinstance(obj, TunnelExitSocket):
obj.bytes_down += num_bytes
self.stats['bytes_enter'] += num_bytes
self._statistics.increase_tunnel_stats(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_RECEIVED, obj.mid, num_bytes)
_barter_statistics.dict_inc_bartercast(BartercastStatisticTypes.TUNNELS_EXIT_BYTES_RECEIVED, obj.mid, num_bytes)
2 changes: 1 addition & 1 deletion Tribler/twisted/plugins/bartercast_crawler_plugin.py
Expand Up @@ -25,7 +25,7 @@
class BartercastCrawler(Dispersy):

def __init__(self, endpoint, working_directory, silent=False, crypto=NoVerifyCrypto()):
super(BartercastCrawler, self).__init__(endpoint, working_directory, u":memory:", crypto, load_bartercast=True)
super(BartercastCrawler, self).__init__(endpoint, working_directory, u":memory:", crypto)

# location of persistent storage
self._persistent_storage_filename = os.path.join(working_directory, "persistent-storage.data")
Expand Down

0 comments on commit 5bdfe72

Please sign in to comment.