Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Teach client to use connections.yaml for better networking features #7

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 116 additions & 55 deletions src/allmydata/client.py
@@ -1,4 +1,7 @@
import os, stat, time, weakref, yaml
import os, stat, time, weakref, yaml, importlib
from twisted.python.filepath import FilePath
from foolscap.furl import decode_furl
from foolscap.api import Tub, eventually
from allmydata import node

from zope.interface import implements
Expand Down Expand Up @@ -104,6 +107,25 @@ def stopService(self):
c.stop()
return service.Service.stopService(self)

def load_plugins(transport_dict):
"""
load_plugins( transport_dict ) -> plugins_dict
transform a transport specification dict into.
plugins_dict of type plugin_name -> plugin_handler
"""
plugins = {}
def getattr_qualified(obj, name):
for attr in name.split("."):
obj = getattr(obj, attr)
return obj
for name in transport_dict.keys():
handler_dict = transport_dict[name]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for name, handler_dict in transport_dict.items():

handler_module = importlib.import_module(handler_dict['handler_module'])
handler_func = getattr_qualified(handler_module, handler_dict['handler_name'])
handler_args = handler_dict['parameters']
handler = handler_func(**handler_args)
plugins[name] = handler
return plugins

class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
Expand All @@ -128,12 +150,14 @@ class Client(node.Node, pollmixin.PollMixin):
"max_segment_size": 128*KiB,
}

def __init__(self, basedir="."):
def __init__(self, basedir=".", testing=False):
node.Node.__init__(self, basedir)
self.testing = testing
self.introducer_clients = []
self.started_timestamp = time.time()
self.logSource="Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
self.init_introducer_clients()
self.load_connections()
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
Expand Down Expand Up @@ -179,42 +203,86 @@ def _sequencer(self):
nonce = _make_secret().strip()
return seqnum, nonce

def init_introducer_clients(self):
self.introducer_furls = []
def load_connections_from_yaml(self, furl):
connections_filepath = FilePath(os.path.join(self.basedir, "private", "connections.yaml"))
if connections_filepath.exists():
exists = True
with connections_filepath.open() as f:
connections = yaml.load(f)
f.close()
else:
exists = False
Copy link

@leif leif Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moore would say to set exists = False before the if :)

edit: but, see subsequent comment at https://github.com/david415/tahoe-lafs/pull/7/files#r60120614 about how this if maybe shouldn't be here anyway.

connections = { 'introducers' : {},
'servers' : {},
'transport_plugins' : {
'tcp' : {
'handler_module' : 'foolscap.connection_plugins',
'handler_name': 'DefaultTCP',
'parameters' : {}
},
},
}
new_connections = connections.copy()
new_connections['introducers'][u'default'] = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why unicode here?

new_connections['introducers']['default']['furl'] = furl
connections_filepath.setContent(yaml.dump(new_connections))
return connections, exists

def load_connections(self):
"""
Load the connections.yaml file if it exists, otherwise
create a default configuration. Abort startup and report
an error to the user if the tahoe.cfg contains an introducer
FURL which is also found in the connections.yaml.
"""
# read introducer from tahoe.cfg and abort + error an introducer furl is specified
# which is also found in our connections.yaml
Copy link

@leif leif Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is wrong, there should be an error if a furl is specified in tahoe.cfg and that furl is not in connections.yaml (if it exists)

self.introducer_furls = [] # XXX
tahoe_cfg_introducer_furl = self.get_config("client", "introducer.furl", None)
self.warn_flag = False
# Try to load ""BASEDIR/private/introducers" cfg file
cfg = os.path.join(self.basedir, "private", "introducers")
if os.path.exists(cfg):
f = open(cfg, 'r')
for introducer_furl in f.read().split('\n'):
introducer_furl_stripped = introducer_furl.strip()
if introducer_furl_stripped.startswith('#') or not introducer_furl_stripped:
continue
self.introducer_furls.append(introducer_furl_stripped)
f.close()
furl_count = len(self.introducer_furls)

# read furl from tahoe.cfg
ifurl = self.get_config("client", "introducer.furl", None)
if ifurl and ifurl not in self.introducer_furls:
self.introducer_furls.append(ifurl)
f = open(cfg, 'a')
f.write(ifurl)
f.write('\n')
f.close()
if furl_count > 1:

connections, connections_yaml_exists = self.load_connections_from_yaml(tahoe_cfg_introducer_furl)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing the (deprecated) tahoe_cfg_introducer_furl to load_connections_from_yaml seems weird. I think load_connections_from_yaml is trying to do 3 different things: check if the file exists, load it if it does, and otherwise populate the default connections dictionary using the deprecated furl. Perhaps these should be two or three functions? Also, we want to be able to populate the default introducer entry with a furl passed from the commandline in the future, not just from the deprecated tahoe.cfg option.

introducers = connections['introducers']
transports = connections['transport_plugins']
if self.tub is None:
return
plugins = load_plugins(connections['transport_plugins'])
self.tub.removeAllConnectionHintHandlers()
for name, handler in plugins.items():
self.tub.addConnectionHintHandler(name, handler)

found = False
count = 0
if tahoe_cfg_introducer_furl is not None and connections_yaml_exists:
count += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count is only ever 0 or 1? seems like this must be leftover from something else.

for nick in introducers.keys():
if tahoe_cfg_introducer_furl == introducers[nick]['furl']:
found = True
break
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could replace this for loop with

intro_furls = [intro['furl'] for intro in introducers.items()]
if tahoe_cfg_introducer_furl in intro_furls:
    ...

if not found and count > 0:
log.err("Introducer furl %s specified in both tahoe.cfg and connections.yaml; please fix impossible configuration.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log line is incorrect (and matches the incorrect comment): the actual error condition, which i believe is correctly implemented (but am not certain due to the obtuse logic) is when the introducer furl is in tahoe.cfg but NOT in the yaml (but the yaml already existed).

reactor.stop()
if found and count > 0:
log.err("Introducer furl %s specified in both tahoe.cfg was also found in connections.yaml")
self.warn_flag = True
self.log("introducers config file modified.")

# create a pool of introducer_clients
self.introducer_clients = []
for introducer_furl in self.introducer_furls:
ic = IntroducerClient(self.tub, introducer_furl,
self.nickname,
str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION),
self.get_app_versions())
introducers[u'default'] = { 'furl': tahoe_cfg_introducer_furl,
'subscribe_only': False }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tahoe_cfg_introducer_furl is deprecated (should be None in new configs) so we don't want to use it unconditionally here.

for nickname in introducers.keys():
if introducers[nickname].has_key('transport_plugins'):
plugins = load_plugins(introducers[nickname]['transport_plugins'])
introducer_cache_filepath = FilePath(os.path.join(self.basedir, "private", nickname))
self.introducer_furls.append(introducers[nickname]['furl']) # XXX
ic = IntroducerClient(introducers[nickname]['furl'],
nickname,
str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION),
self.get_app_versions(),
introducer_cache_filepath,
introducers[nickname]['subscribe_only'],
plugins)
self.introducer_clients.append(ic)

# init introducer_clients as usual
for ic in self.introducer_clients:
self.init_introducer_client(ic)
Expand Down Expand Up @@ -383,30 +451,23 @@ def init_client_storage_broker(self):
# (and everybody else who wants to use storage servers)
ps = self.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
sb = storage_client.StorageFarmBroker(permute_peers=True, preferred_peers=preferred_peers)
self.storage_broker = sb
self.init_client_static_storage_config()
sb.setServiceParent(self)

# initialize StorageFarmBroker with our static server selection
connections, yaml_exists= self.load_connections_from_yaml(None)
servers = connections['servers']
for server_id in servers.keys():
plugins = load_plugins(servers[server_id]['transport_plugins'])
if self.testing:
self.storage_broker.got_static_announcement(servers[server_id]['key_s'], servers[server_id]['announcement'], plugins)
else:
eventually(self.storage_broker.got_static_announcement, servers[server_id]['key_s'], servers[server_id]['announcement'], plugins)

for ic in self.introducer_clients:
sb.use_introducer(ic)

def init_client_static_storage_config(self):
if os.path.exists(os.path.join(self.basedir, "storage_servers.yaml")):
f = open("storage_servers.yaml")
server_params = yaml.safe_load(f)
f.close()
for serverid, params in server_params.items():
server_type = params.pop("type")
if server_type == "tahoe-foolscap":
ann = { 'nickname': server_params[serverid]['nickname'], 'anonymous-storage-FURL':server_params[serverid]['furl'], 'permutation-seed-base32':server_params[serverid]['seed'], 'service-name':'storage','my-version':'unknown'}
s = storage_client.NativeStorageServer(serverid, ann.copy())
sb._got_announcement(serverid, ann)
#add_server(s.get_serverid(), s)
else:
msg = ("unrecognized server type '%s' in "
"tahoe.cfg [client-server-selection]server.%s.type"
% (server_type, serverid))
raise storage_client.UnknownServerTypeError(msg)

def get_storage_broker(self):
return self.storage_broker

Expand Down
68 changes: 60 additions & 8 deletions src/allmydata/introducer/client.py
@@ -1,14 +1,16 @@

import time
import time, os, yaml
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable, eventually, RemoteInterface
from foolscap.api import Tub
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import IIntroducerClient, \
RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
make_index, get_tubid_string_from_ann, get_tubid_string
from allmydata import storage_client
from allmydata.util import log
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.keyutil import BadSignatureError
Expand Down Expand Up @@ -42,14 +44,22 @@ class StubClient(Referenceable): # for_v1
V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"

class IntroducerClient(service.Service, Referenceable):
class IntroducerClient(service.MultiService, Referenceable):
implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)

def __init__(self, tub, introducer_furl,
def __init__(self, introducer_furl,
nickname, my_version, oldest_supported,
app_versions):
self._tub = tub
app_versions, cache_filepath, subscribe_only, plugins):
service.MultiService.__init__(self)

self._tub = Tub()
#self._tub.setOption("expose-remote-exception-types", False) # XXX
for name, handler in plugins.items():
self._tub.addConnectionHintHandler(name, handler)
self.introducer_furl = introducer_furl
self.cache_filepath = cache_filepath
self.subscribe_only = subscribe_only
self.plugins = plugins

assert type(nickname) is unicode
self._nickname = nickname
Expand Down Expand Up @@ -103,16 +113,49 @@ def _debug_retired(self, res):
return res

def startService(self):
service.Service.startService(self)
service.MultiService.startService(self)
self._introducer_error = None
self._tub.setServiceParent(self)
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
level=log.WEIRD, failure=failure, umid="c5MqUQ")
self.load_announcements()
d = self._tub.getReference(self.introducer_furl)
d.addErrback(connect_failed)

def load_announcements(self):
if self.cache_filepath.exists():
with self.cache_filepath.open() as f:
servers = yaml.load(f)
f.close()
if not isinstance(servers, list):
msg = "Invalid cached storage server announcements. No list encountered."
self.log(msg,
level=log.WEIRD)
raise storage_client.UnknownServerTypeError(msg)
for server_params in servers:
if not isinstance(server_params, dict):
msg = "Invalid cached storage server announcement encountered. No key/values found in %s" % server_params
self.log(msg,
level=log.WEIRD)
raise storage_client.UnknownServerTypeError(msg)
eventually(self._got_announcement_cb, server_params['key_s'], server_params['ann'], self.plugins)

def _save_announcement(self, ann):
if self.cache_filepath.exists():
with self.cache_filepath.open() as f:
announcements = yaml.load(f)
f.close()
else:
announcements = []
if ann in announcements:
return
announcements.append(ann)
ann_yaml = yaml.dump(announcements)
self.cache_filepath.setContent(ann_yaml)

def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
Expand Down Expand Up @@ -150,13 +193,14 @@ def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)

def subscribe_to(self, service_name, cb, *args, **kwargs):
self._got_announcement_cb = cb
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
for index,(ann,key_s,when) in self._inbound_announcements.items():
servicename = index[0]
if servicename == service_name:
eventually(cb, key_s, ann, *args, **kwargs)
eventually(cb, key_s, ann, self.plugins, *args, **kwargs)

def _maybe_subscribe(self):
if not self._publisher:
Expand Down Expand Up @@ -215,6 +259,9 @@ def create_announcement_dict(self, service_name, ann):
return ann_d

def publish(self, service_name, ann, current_seqnum, current_nonce, signing_key=None):
if self.subscribe_only:
# no operation for subscribe-only mode
return
# we increment the seqnum every time we publish something new
ann_d = self.create_announcement_dict(service_name, ann)
self._outbound_announcements[service_name] = ann_d
Expand Down Expand Up @@ -347,7 +394,12 @@ def _process_announcement(self, ann, key_s):

for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, key_s, ann, *args, **kwargs)
eventually(cb, key_s, ann, self.plugins, *args, **kwargs)

server_params = {}
server_params['ann'] = ann
server_params['key_s'] = key_s
self._save_announcement(server_params)

def connected_to_introducer(self):
return bool(self._publisher)
Expand Down