diff --git a/src/allmydata/storage/account.py b/src/allmydata/storage/account.py index c0915c5927..ddd942c528 100644 --- a/src/allmydata/storage/account.py +++ b/src/allmydata/storage/account.py @@ -32,14 +32,6 @@ def __init__(self, owner_num, pubkey_vs, server, leasedb): self.connected = False self.connected_since = None self.connection = None - self.can_write = True - self.can_read = True - self.can_save = True - import random - self.account_message = { - "message": "free storage! %d" % random.randint(0,10), - "fancy": "free pony if you knew how to ask", - } self.debug = True def is_static(self): @@ -158,50 +150,12 @@ def remote_advise_corrupt_share(self, share_type, storage_index, shnum, reason): return self.server.client_advise_corrupt_share( share_type, storage_index, shnum, reason) - - # these are the non-RIStorageServer methods, some remote, some local - - def get_account_attribute(self, name): - return self._leasedb.get_account_attribute(self.owner_num, name) - - def set_account_attribute(self, name, value): - self._leasedb.set_account_attribute(self.owner_num, name, value) - def get_account_creation_time(self): return self._leasedb.get_account_creation_time(self.owner_num) - def remote_get_status(self): - return self.get_status() - - def get_status(self): - return { "write": self.can_write, - "read": self.can_read, - "save": self.can_save, - } - - def remote_get_account_message(self): - return self.account_message - - def set_nickname(self, nickname): - if len(nickname) > 1000: - raise ValueError("nickname too long") - self.set_account_attribute("nickname", nickname) - - def get_nickname(self): - n = self.get_account_attribute("nickname") - if n: - return n - return u"" - def get_id(self): return self.pubkey_vs - def remote_get_current_usage(self): - return self.get_current_usage() - - def get_current_usage(self): - return self._leasedb.get_account_usage(self.owner_num) - def get_leases(self, storage_index): return self._leasedb.get_leases(storage_index, self.owner_num) @@ -209,40 +163,24 @@ def connection_from(self, rx): self.connected = True self.connected_since = time.time() self.connection = rx - rhost = rx.getPeer() - from twisted.internet import address - if isinstance(rhost, address.IPv4Address): - rhost_s = "%s:%d" % (rhost.host, rhost.port) - elif "LoopbackAddress" in str(rhost): - rhost_s = "loopback" - else: - rhost_s = str(rhost) - self.set_account_attribute("last_connected_from", rhost_s) + #rhost = rx.getPeer() + #from twisted.internet import address + #if isinstance(rhost, address.IPv4Address): + # rhost_s = "%s:%d" % (rhost.host, rhost.port) + #elif "LoopbackAddress" in str(rhost): + # rhost_s = "loopback" + #else: + # rhost_s = str(rhost) + #self.set_account_attribute("last_connected_from", rhost_s) rx.notifyOnDisconnect(self._disconnected) def _disconnected(self): self.connected = False self.connected_since = None self.connection = None - self.set_account_attribute("last_seen", int(time.time())) + #self.set_account_attribute("last_seen", int(time.time())) self.disconnected_since = None - def _send_status(self): - self.connection.callRemoteOnly("status", self.get_status()) - - def _send_account_message(self): - self.connection.callRemoteOnly("account_message", self.account_message) - - def set_status(self, write, read, save): - self.can_write = write - self.can_read = read - self.can_save = save - self._send_status() - - def set_account_message(self, message): - self.account_message = message - self._send_account_message() - def get_connection_status(self): # starts as: connected=False, connected_since=None, # last_connected_from=None, last_seen=None @@ -251,14 +189,14 @@ def get_connection_status(self): # after disconnect: connected=False, connected_since=None, # last_connected_from=HOST, last_seen=STOP - last_seen = int_or_none(self.get_account_attribute("last_seen")) - last_connected_from = self.get_account_attribute("last_connected_from") + #last_seen = int_or_none(self.get_account_attribute("last_seen")) + #last_connected_from = self.get_account_attribute("last_connected_from") created = int_or_none(self.get_account_creation_time()) return {"connected": self.connected, "connected_since": self.connected_since, - "last_connected_from": last_connected_from, - "last_seen": last_seen, + #"last_connected_from": last_connected_from, + #"last_seen": last_seen, "created": created, } diff --git a/src/allmydata/storage/accountant.py b/src/allmydata/storage/accountant.py index 7883cbb9a8..500cbdccee 100644 --- a/src/allmydata/storage/accountant.py +++ b/src/allmydata/storage/accountant.py @@ -12,12 +12,10 @@ share a prefix with 'account.py' so my tab-autocomplete will work nicely. """ -import simplejson, weakref +import weakref from twisted.application import service -from foolscap.api import Referenceable -from allmydata.util import keyutil, log from allmydata.storage.leasedb import LeaseDB, AccountingCrawler from allmydata.storage.account import Account @@ -29,39 +27,27 @@ def __init__(self, storage_server, dbfile, statefile): self._leasedb = LeaseDB(dbfile) self._active_accounts = weakref.WeakValueDictionary() self._accountant_window = None - self._anonymous_account = Account(0, None, + self._anonymous_account = Account(LeaseDB.ANONYMOUS_ACCOUNTID, None, self.storage_server, self._leasedb) + self._starter_account = Account(LeaseDB.STARTER_LEASE_ACCOUNTID, None, + self.storage_server, self._leasedb) crawler = AccountingCrawler(storage_server, statefile, self._leasedb) self._accounting_crawler = crawler crawler.setServiceParent(self) - def get_accountant_window(self, tub): - if not self._accountant_window: - self._accountant_window = AccountantWindow(self, tub) - return self._accountant_window - def get_leasedb(self): return self._leasedb def set_expiration_policy(self, policy): self._accounting_crawler.set_expiration_policy(policy) - # methods used by AccountantWindow - - def get_account(self, pubkey_vs): - if pubkey_vs not in self._active_accounts: - ownernum = self._leasedb.get_or_allocate_ownernum(pubkey_vs) - a = Account(ownernum, pubkey_vs, self.storage_server, self._leasedb) - self._active_accounts[pubkey_vs] = a - # the client's RemoteReference will keep the Account alive. When - # it disconnects, that reference will lapse, and it will be - # removed from the _active_accounts WeakValueDictionary - return self._active_accounts[pubkey_vs] # note: a is still alive - def get_anonymous_account(self): return self._anonymous_account + def get_starter_account(self): + return self._starter_account + def get_accounting_crawler(self): return self._accounting_crawler @@ -73,51 +59,3 @@ def get_all_accounts(self): else: yield Account(ownerid, pubkey_vs, self.storage_server, self._leasedb) - - -class AccountantWindow(Referenceable): - def __init__(self, accountant, tub): - self.accountant = accountant - self.tub = tub - - def remote_get_account(self, msg, sig, pubkey_vs): - print "GETTING ACCOUNT", msg - vk = keyutil.parse_pubkey(pubkey_vs) - vk.verify(sig, msg) - account = self.accountant.get_account(pubkey_vs) - msg_d = simplejson.loads(msg.decode("utf-8")) - rxFURL = msg_d["please-give-Account-to-rxFURL"].encode("ascii") - account.set_nickname(msg_d["nickname"]) - d = self.tub.getReference(rxFURL) - def _got_rx(rx): - account.connection_from(rx) - d = rx.callRemote("account", account) - d.addCallback(lambda ign: account._send_status()) - d.addCallback(lambda ign: account._send_account_message()) - return d - d.addCallback(_got_rx) - d.addErrback(log.err, umid="nFYfcA") - return d - - -# XXX TODO new idea: move all leases into the DB. Do not store leases in -# shares at all. The crawler will exist solely to discover shares that -# have been manually added to disk (via 'scp' or some out-of-band means), -# and will add 30- or 60- day "migration leases" to them, to keep them -# alive until their original owner does a deep-add-lease and claims them -# properly. Better migration tools ('tahoe storage export'?) will create -# export files that include both the share data and the lease data, and -# then an import tool will both put the share in the right place and -# update the recipient node's lease DB. -# -# I guess the crawler will also be responsible for deleting expired -# shares, since it will be looking at both share files on disk and leases -# in the DB. -# -# So the DB needs a row per share-on-disk, and a separate table with -# leases on each bucket. When it sees a share-on-disk that isn't in the -# first table, it adds the migration-lease. When it sees a share-on-disk -# that is in the first table but has no leases in the second table (i.e. -# expired), it deletes both the share and the first-table row. When it -# sees a row in the first table but no share-on-disk (i.e. manually -# deleted share), it deletes the row (and any leases). diff --git a/src/allmydata/storage/leasedb.py b/src/allmydata/storage/leasedb.py index acc8d42cc0..b75765be60 100644 --- a/src/allmydata/storage/leasedb.py +++ b/src/allmydata/storage/leasedb.py @@ -8,7 +8,7 @@ a share that has expired. """ -import os, time, re, simplejson +import os, time, simplejson from twisted.python.filepath import FilePath @@ -136,6 +136,7 @@ def int_or_none(s): class LeaseDB: RETAINED_HISTORY_ENTRIES = 10 + ANONYMOUS_ACCOUNTID = 0 STARTER_LEASE_ACCOUNTID = 1 STARTER_LEASE_DURATION = 2*MONTH @@ -342,60 +343,6 @@ def get_history(self): rows = self._cursor.fetchall() return dict(rows) - # account management - - def get_account_usage(self, accountid): - self._cursor.execute("SELECT SUM(`used_space`) FROM shares" - " WHERE `storage_index`, `shnum` IN" - " (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`" - " WHERE `account_id`=?)", - (accountid,)) - row = self._cursor.fetchone() - if not row or not row[0]: # XXX why did I need the second clause? - return 0 - return row[0] - - def get_account_attribute(self, accountid, name): - self._cursor.execute("SELECT `value` FROM `account_attributes`" - " WHERE `account_id`=? AND `name`=?", - (accountid, name)) - row = self._cursor.fetchone() - if row: - return row[0] - return None - - def set_account_attribute(self, accountid, name, value): - if self.debug: print "SET_ACCOUNT_ATTRIBUTE", accountid, name, value - self._cursor.execute("SELECT `id` FROM `account_attributes`" - " WHERE `account_id`=? AND `name`=?", - (accountid, name)) - row = self._cursor.fetchone() - if row: - attrid = row[0] - self._cursor.execute("UPDATE `account_attributes`" - " SET `value`=?" - " WHERE `id`=?", - (value, attrid)) - else: - self._cursor.execute("INSERT INTO `account_attributes`" - " VALUES (?,?,?,?)", - (None, accountid, name, value)) - self.commit(always=True) - - def get_or_allocate_ownernum(self, pubkey_vs): - if not re.search(r'^[a-zA-Z0-9+-_]+$', pubkey_vs): - raise BadAccountName("unacceptable characters in pubkey") - self._cursor.execute("SELECT `id` FROM `accounts` WHERE `pubkey_vs`=?", - (pubkey_vs,)) - row = self._cursor.fetchone() - if row: - return row[0] - self._cursor.execute("INSERT INTO `accounts` VALUES (?,?,?)", - (None, pubkey_vs, int(time.time()))) - accountid = self._cursor.lastrowid - self.commit(always=True) - return accountid - def get_account_creation_time(self, owner_num): self._cursor.execute("SELECT `creation_time` from `accounts`" " WHERE `id`=?", diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 89ea5d3443..4390baba2c 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -29,9 +29,9 @@ # 6: implement other sorts of IStorageClient classes: S3, etc -import re, time, simplejson +import re, time from zope.interface import implements -from foolscap.api import eventually, Referenceable +from foolscap.api import eventually from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer from allmydata.util import log, base32 from allmydata.util.assertutil import precondition @@ -62,12 +62,10 @@ class StorageFarmBroker: I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, tub, permute_peers, client_key=None, client_info={}): + def __init__(self, tub, permute_peers): self.tub = tub assert permute_peers # False not implemented yet self.permute_peers = permute_peers - self.client_key = client_key - self.client_info = client_info # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its # own Reconnector, and will give us a RemoteReference when we ask @@ -77,7 +75,7 @@ def __init__(self, tub, permute_peers, client_key=None, client_info={}): # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy(), self.tub) + s = NativeStorageServer(serverid, ann.copy()) s.rref = rref s._is_connected = True self.servers[serverid] = s @@ -94,8 +92,7 @@ def _got_announcement(self, key_s, ann): precondition(isinstance(key_s, str), key_s) precondition(key_s.startswith("v0-"), key_s) assert ann["service-name"] == "storage" - s = NativeStorageServer(key_s, ann, self.tub, self.client_key, - client_info=self.client_info) + s = NativeStorageServer(key_s, ann) serverid = s.get_serverid() old = self.servers.get(serverid) if old: @@ -161,7 +158,7 @@ def get_longname(self): def get_nickname(self): return "?" -class NativeStorageServer(Referenceable): +class NativeStorageServer: """I hold information about a storage server that we want to connect to. If we are connected, I hold the RemoteReference, their host address, and the their version information. I remember information about when we were @@ -189,14 +186,10 @@ class NativeStorageServer(Referenceable): "application-version": "unknown: no get_version()", } - def __init__(self, key_s, ann, tub, client_key=None, min_shares=1, - client_info={}): + def __init__(self, key_s, ann, min_shares=1): self.key_s = key_s self.announcement = ann - self.tub = tub - self.client_key = client_key self.min_shares = min_shares - self.client_info = client_info assert "anonymous-storage-FURL" in ann, ann furl = str(ann["anonymous-storage-FURL"]) @@ -228,12 +221,6 @@ def __init__(self, key_s, ann, tub, client_key=None, min_shares=1, self._reconnector = None self._trigger_cb = None - self.account_status = {"write": True, "read": True, "save": True} - # use "retain", not "save" - self.account_message = {} - self._latest_claimed_usage = None - self._latest_claimed_usage_time = None - # Special methods used by copy.copy() and copy.deepcopy(). When those are # used in allmydata.immutable.filenode to copy CheckResults during # repair, we want it to treat the IServer instances as singletons, and @@ -280,41 +267,9 @@ def get_announcement_time(self): return self.announcement_time def start_connecting(self, tub, trigger_cb): + furl = str(self.announcement["anonymous-storage-FURL"]) self._trigger_cb = trigger_cb - furl = self.announcement.get("accountant-FURL") - if furl and self.client_key: - self.accounting_enabled = True - self._reconnector = tub.connectTo(str(furl), self._got_accountant) - # _got_accountant() pings the other end, which fires our - # remote_account() method, which does - # add_version_to_remote_reference() and then vectors to - # _got_versioned_service() - else: - self.accounting_enabled = False - furl = self.announcement["anonymous-storage-FURL"] - self._reconnector = tub.connectTo(str(furl), self._got_connection) - # _got_connection() does add_version_to_remote_reference() and - # then vector to _got_versioned_service() - - def _got_accountant(self, rref): - log.msg(format="got AccountingWindow on %(name)s, doing upgrade", - name=self.get_name(), - facility="tahoe.storage_broker", umid="bWHpsA") - print "doing upgrade" - # the AccountantWindow we're talking to can upgrade us to a real - # Account. We are the receiver. - me = self.tub.registerReference(self) - nickname = self.client_info.get("nickname", u"") - msg_d = { "please-give-Account-to-rxFURL": me, - "nickname": nickname } - msg = simplejson.dumps(msg_d).encode("utf-8") - print msg - sk,vk_vs = self.client_key - sig = sk.sign(msg) - d = rref.callRemote("get_account", msg, sig, vk_vs) - d.addErrback(log.err, format="storageclient._got_accountant", - name=self.get_name(), umid="DNi3tw") - return d + self._reconnector = tub.connectTo(furl, self._got_connection) def _got_connection(self, rref): lp = log.msg(format="got connection to %(name)s, getting versions", @@ -363,44 +318,5 @@ def try_to_connect(self): # used when the broker wants us to hurry up self._reconnector.reset() - def get_claimed_usage(self): - # return (bytes, when). If we've never been told our usage, both will - # be None. Asking returns the previous value, and sends off a request - # for an update. To get an up-to-date value, call this twice, not too - # fast. - if self.rref and self.accounting_enabled: - d = self.rref.callRemote("get_current_usage") - def _got(usage): - self._latest_claimed_usage = usage - self._latest_claimed_usage_time = time.time() - d.addCallback(_got) - d.addErrback(log.err, umid="ivcMgA") - return self._latest_claimed_usage, self._latest_claimed_usage_time - return None, None - - def get_account_status(self): - if self.rref and self.accounting_enabled: - return self.account_status - # pre-accounting servers always allow everything, mostly - return {"write": True, "read": True, "save": True} - - def get_account_message(self): - if self.rref and self.accounting_enabled: - # Servers use this to advertise new features to the client's - # user. If we recognize a feature, we should suppress the - # message, because we'll have other feature-specific code which - # knows how to call additional methods to get the correct - # information. If we don't recognize a feature, we should display - # the message, to let our user know that they could e.g. use this - # server if only they installed a plugin for some new payment - # type. - - # for example, if we had code to handle a Bitcoin-based payment - # schme, we'd do this: - #if "bitcoin_v1" in message: - # del message["bitcoin_v1"] - return self.account_message - return {} - class UnknownServerTypeError(Exception): pass diff --git a/src/allmydata/test/test_leasedb.py b/src/allmydata/test/test_leasedb.py index 5da2034ac4..5497ee0699 100644 --- a/src/allmydata/test/test_leasedb.py +++ b/src/allmydata/test/test_leasedb.py @@ -25,31 +25,6 @@ def test_create(self): l2 = LeaseDB(dbfilename) self.failUnlessEqual(set(l2.get_all_accounts()), BASE_ACCOUNTS) - def test_accounts(self): - dbfilename = self.make("accounts") - l = LeaseDB(dbfilename) - one = l.get_or_allocate_ownernum("one") - self.failUnlessEqual(one, 2) - one_a = l.get_or_allocate_ownernum("one") - self.failUnlessEqual(one_a, 2) - two = l.get_or_allocate_ownernum("two") - self.failUnlessEqual(two, 3) - anon = l.get_or_allocate_ownernum("anonymous") - self.failUnlessEqual(anon, 0) - anon = l.get_or_allocate_ownernum("starter") - self.failUnlessEqual(anon, 1) - self.failUnlessEqual(set(l.get_all_accounts()), - BASE_ACCOUNTS.union(set([(2, u"one"), (3, u"two")]))) - - l.set_account_attribute(one, "name", u"value") - # This column only stores unicode. - self.failUnlessEqual(l.get_account_attribute(one, "name"), u"value") - - l.set_account_attribute(one, "name", u"updated") - self.failUnlessEqual(l.get_account_attribute(one, "name"), u"updated") - self.failUnlessEqual(l.get_account_attribute(one, "missing"), None) - - self.failUnlessEqual(l.get_account_attribute(two, "name"), None) AB=("abtnioga6deziyqd64gm65qbnu", 0) DE=("dekrcoczhdj5xh6zd4v62xhdnu", 1) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 495cfbc2c8..869b077259 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -3236,7 +3236,7 @@ def test_expire_age(self): ep = ExpirationPolicy(enabled=True, mode="age", override_lease_duration=2000) server = InstrumentedStorageServer(basedir, "\x00" * 20, expiration_policy=ep) ss = server.get_accountant().get_anonymous_account() - ss2 = server.get_accountant().get_account("somekey") + ss2 = server.get_accountant().get_starter_account() # make it start sooner than usual. lc = server.get_accounting_crawler() @@ -3373,7 +3373,7 @@ def test_expire_cutoff_date(self): ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then) server = InstrumentedStorageServer(basedir, "\x00" * 20, expiration_policy=ep) ss = server.get_accountant().get_anonymous_account() - ss2 = server.get_accountant().get_account("somekey") + ss2 = server.get_accountant().get_starter_account() # make it start sooner than usual. lc = server.get_accounting_crawler() @@ -3515,7 +3515,7 @@ def test_only_immutable(self): ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then, sharetypes=("immutable",)) server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep) ss = server.get_accountant().get_anonymous_account() - ss2 = server.get_accountant().get_account("somekey") + ss2 = server.get_accountant().get_starter_account() lc = server.get_accounting_crawler() lc.slow_start = 0 webstatus = StorageStatus(ss) @@ -3571,7 +3571,7 @@ def test_only_mutable(self): ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then, sharetypes=("mutable",)) server = StorageServer(basedir, "\x00" * 20, expiration_policy=ep) ss = server.get_accountant().get_anonymous_account() - ss2 = server.get_accountant().get_account("somekey") + ss2 = server.get_accountant().get_starter_account() lc = server.get_accounting_crawler() lc.slow_start = 0 webstatus = StorageStatus(ss)