Skip to content

Commit

Permalink
Merge pull request #1230 from corpaul/bartercast_refactor
Browse files Browse the repository at this point in the history
Bartercast statistics community
  • Loading branch information
whirm committed Feb 18, 2015
2 parents c8d5eba + 0d88d28 commit f78d7d0
Show file tree
Hide file tree
Showing 13 changed files with 746 additions and 20 deletions.
2 changes: 2 additions & 0 deletions Tribler/Main/tribler_main.py
Expand Up @@ -500,6 +500,7 @@ def define_communities(*args):
from Tribler.community.metadata.community import MetadataCommunity
from Tribler.community.tunnel.tunnel_community import TunnelSettings
from Tribler.community.tunnel.hidden_community import HiddenTunnelCommunity
from Tribler.community.bartercast4.community import BarterCommunity

# make sure this is only called once
session.remove_observer(define_communities)
Expand All @@ -515,6 +516,7 @@ def define_communities(*args):
# must be called on the Dispersy thread
dispersy.define_auto_load(SearchCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
dispersy.define_auto_load(AllChannelCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
dispersy.define_auto_load(BarterCommunity, session.dispersy_member, load=True)

# load metadata community
# dispersy.define_auto_load(MetadataCommunity, session.dispersy_member, load=True, kargs=default_kwargs)
Expand Down
93 changes: 92 additions & 1 deletion Tribler/Main/vwxGUI/DispersyDebugFrame.py
Expand Up @@ -7,6 +7,8 @@
from Tribler.Main.vwxGUI.GuiUtility import GUIUtility
from Tribler.Main.Utility.GuiDBHandler import startWorker, GUI_PRI_DISPERSY
from Tribler.Main.Utility.utility import compute_ratio, eta_value, size_format
from operator import itemgetter
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics

DATA_NONE = ""

Expand Down Expand Up @@ -77,11 +79,13 @@ def __init__(self, parent, id, dispersy):
self.__community_panel = CommunityPanel(self.__notebook, -1)
self.__rawinfo_panel = RawInfoPanel(self.__notebook, -1)
self.__runtime_panel = RuntimeProfilingPanel(self.__notebook, -1)
self.__sharedstatistics_panel = SharedStatisticsPanel(self.__notebook, -1)

self.__notebook.AddPage(self.__summary_panel, "Summary")
self.__notebook.AddPage(self.__community_panel, "Community")
self.__notebook.AddPage(self.__rawinfo_panel, "Raw Info")
self.__notebook.AddPage(self.__runtime_panel, "Runtime Profiling")
self.__notebook.AddPage(self.__sharedstatistics_panel, "Network Health")

hsizer = wx.BoxSizer(wx.HORIZONTAL)
self.__incstuff_checkbox = wx.CheckBox(self, -1, "include stuff")
Expand Down Expand Up @@ -131,6 +135,7 @@ def do_gui(delayedResult):
self.__community_panel.UpdateInfo(stats)
self.__rawinfo_panel.UpdateInfo(stats)
self.__runtime_panel.UpdateInfo(stats)
self.__sharedstatistics_panel.UpdateInfo(_barter_statistics)
self.Layout()

startWorker(do_gui, do_db, uId=u"DispersyDebugFrame_UpdateInfo", priority=GUI_PRI_DISPERSY)
Expand Down Expand Up @@ -296,7 +301,7 @@ def OnListCtrlSelected(self, event):
self.__detail_panel.UpdateInfo(community_data)

def UpdateInfo(self, stats):
community_list = sorted(stats.communities, key=lambda community:
community_list = sorted(stats.communities, key=lambda community:
(not community.dispersy_enable_candidate_walker,
community.classification, community.cid))
self.__community_data_list = []
Expand Down Expand Up @@ -669,3 +674,89 @@ def UpdateInfo(self, stats):

if prev_selection_idx is not None:
self.__list1.Select(prev_selection_idx)

# --------------------------------------------------
# Shared Statistics Panel
# --------------------------------------------------
class SharedStatisticsPanel(wx.Panel):

def __init__(self, parent, id):
super(SharedStatisticsPanel, self).__init__(parent, id)
self.SetBackgroundColour(LIST_GREY)

self.__info = None
self.__selected_statistic = None

self.__statistic_list = AutoWidthListCtrl(self, -1, style=wx.LC_REPORT | wx.LC_ALIGN_LEFT |
wx.LC_SINGLE_SEL | wx.BORDER_SUNKEN)
self.__statistic_list.InsertColumn(0, "Statistic", width=200)
self.__statistic_list.InsertColumn(1, "Total count")
self.__statistic_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.OnStatisticSelected)

self.__detail_list = AutoWidthListCtrl(self, -1, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.__detail_list.InsertColumn(0, "Pubkey", width=200)
self.__detail_list.InsertColumn(1, "Count")

hsizer = wx.BoxSizer(wx.HORIZONTAL)
hsizer.Add(self.__statistic_list, 1, wx.EXPAND | wx.RIGHT, 2)
hsizer.Add(self.__detail_list, 2, wx.EXPAND)
self.SetSizer(hsizer)

def OnStatisticSelected(self, event):
stat = self.__info[event.GetIndex()][0]
if self.__selected_statistic == stat:
return

self.__selected_statistic = stat
self.__detail_list.UpdateData(self.__info[event.GetIndex()][1])

def UpdateInfo(self, stats):
if not getattr(stats, "bartercast", None):
return

self.__STATISTICS = stats.bartercast.keys()
raw_info = {}

if stats is None:
self.__statistic_list.DeleteAllItems()
self.__detail_list.DeleteAllItems()
return

idx = 0
# initialize info list so we can replace elements
if not self.__info or len(self.__info) < len(self.__STATISTICS):
self.__info = [None] * len(self.__STATISTICS)

for stat in self.__STATISTICS:

self.__info[idx] = (stat, [])
raw_info[stat] = stats.bartercast[stat]

data_list = raw_info[stat]
# data_list.sort(key=lambda kv: kv[1], reverse=True)
data_list = sorted(data_list.items(), key=itemgetter(1), reverse=True)

total_count = 0

# for key, value in data_list.items():
for item in data_list:
key = item[0]
value = item[1]
# @TODO: maintain this total in Statistics?
total_count += value

# only draw updated values if we are inspecting the statistic
# if self.__selected_statistic is stat:
peer_str = "%s" % key
count_str = "%s" % value
self.__info[idx][1].append((peer_str, count_str))

total_count_str = "%s" % total_count

# update GUI
if idx < self.__statistic_list.GetItemCount():
self.__statistic_list.SetStringItem(idx, 0, BartercastStatisticTypes.reverse_mapping[stat])
self.__statistic_list.SetStringItem(idx, 1, total_count_str)
else:
self.__statistic_list.Append([BartercastStatisticTypes.reverse_mapping[stat]])
idx += 1
5 changes: 3 additions & 2 deletions Tribler/community/allchannel/community.py
Expand Up @@ -372,8 +372,9 @@ def disp_create_votecast(self, cid, vote, timestamp, store=True, update=True, fo
else:
communityclass = PreviewChannelCommunity

community = self._get_channel_community(cid)
community = self.dispersy.reclassify_community(community, communityclass)
community_old = self._get_channel_community(cid)
community = self.dispersy.reclassify_community(community_old, communityclass)
community._candidates = community_old._candidates

# check if we need to cancel a previous vote
latest_dispersy_id = self._votecast_db.get_latest_vote_dispersy_id(community._channel_id, None)
Expand Down
Empty file.
164 changes: 164 additions & 0 deletions Tribler/community/bartercast4/community.py
@@ -0,0 +1,164 @@
# Written by Cor-Paul Bezemer
from conversion import StatisticsConversion
from payload import StatisticsRequestPayload, StatisticsResponsePayload

from Tribler.dispersy.authentication import MemberAuthentication
from Tribler.dispersy.community import Community
from Tribler.dispersy.conversion import DefaultConversion
from Tribler.dispersy.destination import CandidateDestination
from Tribler.dispersy.distribution import DirectDistribution
from Tribler.dispersy.message import Message, DelayMessageByProof
from Tribler.dispersy.resolution import PublicResolution
from .statistics import BartercastStatisticTypes, _barter_statistics
from twisted.internet.task import LoopingCall
from twisted.python import log

BARTERCAST_PERSIST_INTERVAL = 120.0


class BarterCommunity(Community):
@classmethod
def get_master_members(cls, dispersy):
# generated: Thu Oct 30 12:59:19 2014
# curve: NID_sect571r1
# len: 571 bits ~ 144 bytes signature
# pub: 170 3081a7301006072a8648ce3d020106052b81040027038192000405ef988346197abe009065e6f9f517263063495554e4d278074feb1be3e81586b44f90b8a11f170f0a059d8f26c259118e6afc775f3d1e7c46462c9de0ec2bb94e480390622056b002c1f121acc52c18a0857ce59e79cf73642a4787fcdc5398d332000fbd44b16f14b005c0910d81cb85392fd036f32a242044c8263e0c6b9dc10b68f9c30540cfbd8a6bb5ccec786e
# pub-sha1 59accbc05521d8b894e8e6ef8d686411384cdec9
#-----BEGIN PUBLIC KEY-----
# MIGnMBAGByqGSM49AgEGBSuBBAAnA4GSAAQF75iDRhl6vgCQZeb59RcmMGNJVVTk
# 0ngHT+sb4+gVhrRPkLihHxcPCgWdjybCWRGOavx3Xz0efEZGLJ3g7Cu5TkgDkGIg
# VrACwfEhrMUsGKCFfOWeec9zZCpHh/zcU5jTMgAPvUSxbxSwBcCRDYHLhTkv0Dbz
# KiQgRMgmPgxrncELaPnDBUDPvYprtczseG4=
#-----END PUBLIC KEY-----
master_key = "3081a7301006072a8648ce3d020106052b81040027038192000405ef988346197abe009065e6f9f517263063495554e4d278074feb1be3e81586b44f90b8a11f170f0a059d8f26c259118e6afc775f3d1e7c46462c9de0ec2bb94e480390622056b002c1f121acc52c18a0857ce59e79cf73642a4787fcdc5398d332000fbd44b16f14b005c0910d81cb85392fd036f32a242044c8263e0c6b9dc10b68f9c30540cfbd8a6bb5ccec786e".decode("HEX")
master = dispersy.get_member(public_key=master_key)
return [master]

def __init__(self, dispersy, master, my_member):
super(BarterCommunity, self).__init__(dispersy, master, my_member)
self._dispersy = dispersy
log.msg("joined BC community")
self.init_database()

# add task for persisting bartercast statistics every BARTERCAST_PERSIST_INTERVAL seconds
self._logger.debug("bartercast persist task started")
self.register_task("bartercast persist",
LoopingCall(self.backup_bartercast_statistics)).start(BARTERCAST_PERSIST_INTERVAL, now=False)

def init_database(self):
log.msg("loading BC statistics from database")
_barter_statistics.load_statistics(self._dispersy)

def initiate_meta_messages(self):
return super(BarterCommunity, self).initiate_meta_messages() + [
Message(self, u"stats-request",
MemberAuthentication(),
PublicResolution(),
DirectDistribution(),
CandidateDestination(),
StatisticsRequestPayload(),
self.check_stats_request,
self.on_stats_request),
Message(self, u"stats-response",
MemberAuthentication(),
PublicResolution(),
DirectDistribution(),
CandidateDestination(),
StatisticsResponsePayload(),
self.check_stats_response,
self.on_stats_response)
]

def initialize(self, integrate_with_tribler=False, auto_join_channel=False):
super(BarterCommunity, self).initialize()

def initiate_conversions(self):
return [DefaultConversion(self), StatisticsConversion(self)]

@property
def dispersy_sync_response_limit(self):
return 1

@property
def dispersy_sync_skip_enable(self):
return False

@property
def dispersy_sync_cache_enable(self):
return False

def create_stats_request(self, candidate, stats_type):
log.msg("Creating stats-request for type %d to member: %s" % (stats_type, candidate._association.mid.encode("hex")))
meta = self.get_meta_message(u"stats-request")
message = meta.impl(authentication=(self._my_member,),
distribution=(self.claim_global_time(),),
destination=(candidate,),
payload=(stats_type,))
self._dispersy._forward([message])

def check_stats_request(self, messages):
for message in messages:
allowed, _ = self._timeline.check(message)
if allowed:
yield message
else:
yield DelayMessageByProof(message)

def on_stats_request(self, messages):
log.msg("IN: stats-request")
for message in messages:
log.msg("stats-request: %s %s" % (message._distribution.global_time, message.payload.stats_type))
# send back stats-response
self.create_stats_response(message.payload.stats_type, message.candidate)

def create_stats_response(self, stats_type, candidate):
log.msg("OUT: stats-response")
meta = self.get_meta_message(u"stats-response")
records = _barter_statistics.get_top_n_bartercast_statistics(stats_type, 5)
log.msg("sending stats for type %d: %s" % (stats_type, records))

message = meta.impl(authentication=(self._my_member,),
distribution=(self.claim_global_time(),),
destination=(candidate,),
payload=(stats_type, records))
self._dispersy._forward([message])

def check_stats_response(self, messages):
for message in messages:
allowed, _ = self._timeline.check(message)
if allowed:
yield message
else:
yield DelayMessageByProof(message)

def on_stats_response(self, messages):
log.msg("IN: stats-response")
for message in messages:
log.msg("stats-response: %s %s %s"
% (message._distribution.global_time, message.payload.stats_type, message.payload.records))
for r in message.payload.records:
_barter_statistics.log_interaction(self._dispersy,
message.payload.stats_type,
message.authentication.member.mid.encode('hex'),
r[0], int(r[1].encode('hex'), 16))

# bartercast accounting stuff
def backup_bartercast_statistics(self):
self._logger.debug("merging bartercast statistics")
_barter_statistics.persist(self, 1)


class BarterCommunityCrawler(BarterCommunity):

def __init__(self, *args, **kargs):
super(BarterCommunityCrawler, self).__init__(*args, **kargs)

def on_introduction_response(self, messages):
super(BarterCommunity, self).on_introduction_response(messages)
for message in messages:
log.msg("in on_introduction_response: Requesting stats from %s" % message.candidate)
for t in BartercastStatisticTypes.reverse_mapping:
self.create_stats_request(message.candidate, t)

def start_walking(self):
self.register_task("take step", LoopingCall(self.take_step)).start(1.0, now=True)
51 changes: 51 additions & 0 deletions Tribler/community/bartercast4/conversion.py
@@ -0,0 +1,51 @@
from struct import pack, unpack_from
from Tribler.dispersy.conversion import BinaryConversion


class StatisticsConversion(BinaryConversion):

MTU_SIZE = 1500

def __init__(self, community):
super(StatisticsConversion, self).__init__(community, "\x02")
self.define_meta_message(chr(1), community.get_meta_message(u"stats-request"),
self._encode_statistics_request, self._decode_statistics_request)
self.define_meta_message(chr(2), community.get_meta_message(u"stats-response"),
self._encode_statistics_response, self._decode_statistics_response)

def _encode_statistics_request(self, message):
stats_type = message.payload.stats_type
return pack("!i", stats_type),

def _decode_statistics_request(self, placeholder, offset, data):
stats_type, = unpack_from("!i", data, offset)
offset += 4
return offset, placeholder.meta.payload.implement(stats_type)

# TODO fix for dictionaries larger than MTU (split message)
def _encode_statistics_response(self, message):
stats_type = message.payload.stats_type
records = message.payload.records
packed = pack("!i", stats_type)
for r in records:
peer_id = r[0].encode('utf8')
value = r[1]
packed = packed + pack("!H%dsi" % len(peer_id), len(peer_id), peer_id, value)
return packed,

def _decode_statistics_response(self, placeholder, offset, data):
stats_type, = unpack_from("!i", data, offset)
offset += 4
records = []
while offset < len(data):
len_key, = unpack_from("!H", data, offset)
if len_key < 1:
break
offset += 2
key, = unpack_from("!%ds" % len_key, data, offset)
offset += len_key
value = data[offset: offset + 4]
offset += 4
r = [key, value]
records.append(r)
return offset, placeholder.meta.payload.implement(stats_type, records)
23 changes: 23 additions & 0 deletions Tribler/community/bartercast4/payload.py
@@ -0,0 +1,23 @@
from Tribler.dispersy.payload import Payload


class StatisticsRequestPayload(Payload):
'''
Request statistics for key 'key' from peer.
'''

class Implementation(Payload.Implementation):

def __init__(self, meta, stats_type):
super(StatisticsRequestPayload.Implementation, self).__init__(meta)
self.stats_type = stats_type


class StatisticsResponsePayload(Payload):

class Implementation(Payload.Implementation):

def __init__(self, meta, stats_type, records):
super(StatisticsResponsePayload.Implementation, self).__init__(meta)
self.stats_type = stats_type
self.records = records

0 comments on commit f78d7d0

Please sign in to comment.