Skip to content

Commit

Permalink
Removed the ResponseCache. Instead, the MDMFSlotReadProxy initialized…
Browse files Browse the repository at this point in the history
… by ServerMap is kept around so Retrieve can access it. The ReadProxy has a cache of the first 1000 bytes initially read from each share by the ServerMap. We're able to satisfy a number of requests out of this cache, so roundtrips are reduced from 84 to 60 in test_deepcheck_mdmf. There is still some mystery about under what conditions the cache has fewer than 1000 bytes. Also this breaks some existing unit tests that depend on the inner behavior of ResponseCache.

Signed-off-by: Andrew Miller <amiller@dappervision.com>
  • Loading branch information
amiller committed Apr 4, 2012
1 parent 21008a4 commit cbdfb00
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 125 deletions.
55 changes: 0 additions & 55 deletions src/allmydata/mutable/common.py
@@ -1,6 +1,4 @@

from allmydata.util.spans import DataSpans

MODE_CHECK = "MODE_CHECK" # query all peers
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
Expand Down Expand Up @@ -58,56 +56,3 @@ def __str__(self):

class UnknownVersionError(BadShareError):
"""The share we received was of a version we don't recognize."""

class ResponseCache:
"""I cache share data, to reduce the number of round trips used during
mutable file operations. All of the data in my cache is for a single
storage index, but I will keep information on multiple shares for
that storage index.
I maintain a highest-seen sequence number, and will flush all entries
each time this number increases (this doesn't necessarily imply that
all entries have the same sequence number).
My cache is indexed by a (verinfo, shnum) tuple.
My cache entries are DataSpans instances, each representing a set of
non-overlapping byteranges.
"""

def __init__(self):
self.cache = {}
self.seqnum = None

def _clear(self):
# also used by unit tests
self.cache = {}

def add(self, verinfo, shnum, offset, data):
seqnum = verinfo[0]
if seqnum > self.seqnum:
self._clear()
self.seqnum = seqnum

index = (verinfo, shnum)
if index in self.cache:
self.cache[index].add(offset, data)
else:
spans = DataSpans()
spans.add(offset, data)
self.cache[index] = spans

def read(self, verinfo, shnum, offset, length):
"""Try to satisfy a read request from cache.
Returns data, or None if the cache did not hold the entire requested span.
"""

# TODO: perhaps return a DataSpans object representing the fragments
# that we have, instead of only returning a hit if we can satisfy the
# whole request from cache.

index = (verinfo, shnum)
if index in self.cache:
return self.cache[index].get(offset, length)
else:
return None
7 changes: 1 addition & 6 deletions src/allmydata/mutable/filenode.py
Expand Up @@ -17,7 +17,7 @@
from allmydata.mutable.publish import Publish, MutableData,\
TransformingUploadable
from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
ResponseCache, UncoordinatedWriteError
UncoordinatedWriteError
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
from allmydata.mutable.retrieve import Retrieve
from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
Expand Down Expand Up @@ -65,7 +65,6 @@ def __init__(self, storage_broker, secret_holder,
self._required_shares = default_encoding_parameters["k"]
self._total_shares = default_encoding_parameters["n"]
self._sharemap = {} # known shares, shnum-to-[nodeids]
self._cache = ResponseCache()
self._most_recent_size = None
# filled in after __init__ if we're being created for the first time;
# filled in by the servermap updater before publishing, otherwise.
Expand Down Expand Up @@ -180,10 +179,6 @@ def _populate_privkey(self, privkey):
self._privkey = privkey
def _populate_encprivkey(self, encprivkey):
self._encprivkey = encprivkey
def _add_to_cache(self, verinfo, shnum, offset, data):
self._cache.add(verinfo, shnum, offset, data)
def _read_from_cache(self, verinfo, shnum, offset, length):
return self._cache.read(verinfo, shnum, offset, length)

def get_write_enabler(self, server):
seed = server.get_foolscap_write_enabler_seed()
Expand Down
16 changes: 13 additions & 3 deletions src/allmydata/mutable/layout.py
Expand Up @@ -1191,7 +1191,8 @@ def __init__(self,
rref,
storage_index,
shnum,
data=""):
data="",
data_is_everything=False):
# Start the initialization process.
self._rref = rref
self._storage_index = storage_index
Expand Down Expand Up @@ -1222,8 +1223,14 @@ def __init__(self,

# If the user has chosen to initialize us with some data, we'll
# try to satisfy subsequent data requests with that data before
# asking the storage server for it. If
# asking the storage server for it.
self._data = data

# If the provided data is known to be complete, then we know there's
# nothing to be gained by querying the server, so we should just
# partially satisfy requests with what we have.
self._data_is_everything = data_is_everything

# The way callers interact with cache in the filenode returns
# None if there isn't any cached data, but the way we index the
# cached data requires a string, so convert None to "".
Expand Down Expand Up @@ -1734,15 +1741,18 @@ def _build_verinfo(ignored):

def _read(self, readvs, force_remote=False):
unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
#print 'cache satisfiability:', readvs, len(self._data), not unsatisfiable, force_remote
# TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all
# requests are satisfiable before running it.
if not unsatisfiable and not force_remote:
if not unsatisfiable or self._data_is_everything:
results = [self._data[offset:offset+length]
for (offset, length) in readvs]
results = {self.shnum: results}
return defer.succeed(results)
else:
#print self, 'callRemote("slot_readv") called', self.shnum, \
# self._storage_index.encode('hex'), self._rref, readvs
return self._rref.callRemote("slot_readv",
self._storage_index,
[self.shnum],
Expand Down
16 changes: 7 additions & 9 deletions src/allmydata/mutable/retrieve.py
Expand Up @@ -9,7 +9,7 @@
RemoteException
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util import hashutil, log, mathutil, deferredutil, spans
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
Expand Down Expand Up @@ -290,12 +290,8 @@ def _setup_download(self):
# KiB, so we ask for that much.
# TODO: Change the cache methods to allow us to fetch all of the
# data that they have, then change this method to do that.
any_cache = self._node._read_from_cache(self.verinfo, shnum,
0, 1000)
reader = MDMFSlotReadProxy(server.get_rref(),
self._storage_index,
shnum,
any_cache)
reader = self.servermap.proxies[(server.get_serverid(),
self._storage_index, shnum)]
reader.server = server
self.readers[shnum] = reader
assert len(self.remaining_sharemap) >= k
Expand Down Expand Up @@ -766,6 +762,7 @@ def _validate_block(self, results, segnum, reader, server, started):

block_and_salt, blockhashes, sharehashes = results
block, salt = block_and_salt
assert type(block) is str, (block, salt)

blockhashes = dict(enumerate(blockhashes))
self.log("the reader gave me the following blockhashes: %s" % \
Expand Down Expand Up @@ -838,12 +835,13 @@ def _get_needed_hashes(self, reader, segnum):
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
d1 = reader.get_blockhashes(needed, force_remote=True)
# TODO is force_remote necessary here?
d1 = reader.get_blockhashes(needed, force_remote=False)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
d2 = reader.get_sharehashes(need, force_remote=True)
d2 = reader.get_sharehashes(need, force_remote=False)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict
Expand Down
26 changes: 10 additions & 16 deletions src/allmydata/mutable/servermap.py
Expand Up @@ -119,6 +119,7 @@ def __init__(self):
self._bad_shares = {} # maps (server,shnum) to old checkstring
self._last_update_mode = None
self._last_update_time = 0
self.proxies = {}
self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
# where blockhashes is a list of bytestrings (the result of
# layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
Expand Down Expand Up @@ -603,6 +604,10 @@ def _do_read(self, server, storage_index, shnums, readv):
# we ignore success
d2.addErrback(self._add_lease_failed, server, storage_index)
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
import traceback
#print 'servermap called remote', server, \
# storage_index.encode('hex'), shnums, readv
#print storage_index.encode('hex'), traceback.extract_stack()[-6:-5]
return d


Expand Down Expand Up @@ -631,19 +636,6 @@ def _got_corrupt_share(self, e, shnum, server, data, lp):
self._servermap.add_problem(f)


def _cache_good_sharedata(self, verinfo, shnum, now, data):
"""
If one of my queries returns successfully (which means that we
were able to and successfully did validate the signature), I
cache the data that we initially fetched from the storage
server. This will help reduce the number of roundtrips that need
to occur when the file is downloaded, or when the file is
updated.
"""
if verinfo:
self._node._add_to_cache(verinfo, shnum, 0, data)


def _got_results(self, datavs, server, readsize, storage_index, started):
lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
name=server.get_name(),
Expand Down Expand Up @@ -672,10 +664,14 @@ def _done_processing(ignored=None):

for shnum,datav in datavs.items():
data = datav[0]
#print 'read:', readsize, 'got:', len(data)
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
data)
data,
data_is_everything=len(data)<readsize)
self._servermap.proxies[(server.get_serverid(),
storage_index, shnum)] = reader
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
Expand Down Expand Up @@ -752,8 +748,6 @@ def _done_processing(ignored=None):
self._got_signature_one_share(results, shnum, server, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
self._got_corrupt_share(error, shnum, server, data, lp))
dl.addCallback(lambda verinfo, shnum=shnum, data=data:
self._cache_good_sharedata(verinfo, shnum, now, data))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
# that we found on this server are done processing. When dl fires,
Expand Down
42 changes: 6 additions & 36 deletions src/allmydata/test/test_mutable.py
Expand Up @@ -21,7 +21,7 @@
from allmydata.scripts import debug

from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
from allmydata.mutable.common import ResponseCache, \
from allmydata.mutable.common import \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
NotEnoughServersError, CorruptShareError
Expand Down Expand Up @@ -632,8 +632,8 @@ def _created(n):
d.addCallback(_created)
return d


def test_response_cache_memory_leak(self):
raise Skip("The ResponseCache has been excised")
d = self.nodemaker.create_mutable_file("contents")
def _created(n):
d = n.download_best_version()
Expand Down Expand Up @@ -2437,40 +2437,8 @@ def _cache_check(_, read_expect, write_expect):
d.addCallback(_download_check)
d.addCallback(_cache_check, 2000, 2090)
return d
test_cache.todo = "Implement MDMFSlotReadProxy cache counting"

class Utils(unittest.TestCase):
def test_cache(self):
c = ResponseCache()
# xdata = base62.b2a(os.urandom(100))[:100]
xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
c.add("v1", 1, 0, xdata)
c.add("v1", 1, 2000, ydata)
self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)

# test joining fragments
c = ResponseCache()
c.add("v1", 1, 0, xdata[:10])
c.add("v1", 1, 10, xdata[10:20])
self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])

class Exceptions(unittest.TestCase):
def test_repr(self):
Expand Down Expand Up @@ -2564,7 +2532,9 @@ def _got_smap1(smap):
# now attempt to retrieve the old version with the old servermap.
# This will look like someone has changed the file since we
# updated the servermap.
d.addCallback(lambda res: n._cache._clear())

# What cache?
# d.addCallback(lambda res: n._cache._clear())
d.addCallback(lambda res: log.msg("starting doomed read"))
d.addCallback(lambda res:
self.shouldFail(NotEnoughSharesError,
Expand Down

0 comments on commit cbdfb00

Please sign in to comment.