Skip to content

Commit

Permalink
Supporting PEP8 E301.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Jan 13, 2015
1 parent c2ac4bb commit 3e49d1c
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 1 deletion.
1 change: 1 addition & 0 deletions benchmarks/race.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def setup_context():
a_lock1 = threading.Lock()
a_lock2 = threading.Lock()
a_lock3 = threading.Lock()

class A(threading.Thread):
def run(self):
ctx = setup_context()
Expand Down
2 changes: 2 additions & 0 deletions documentation_samples/msgprop.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ def main():
print '-' * 20

req = GetNotesRequest(on_or_before=42)

class M(ndb.Model):
req = MessageProperty(GetNotesRequest)

m = M(req=req)
print m
print m.put().get()
Expand Down
58 changes: 58 additions & 0 deletions ndb/context_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def testContext_AutoBatcher_Limit(self):
conn=conn,
auto_batcher_class=MyAutoBatcher,
config=real_config)

@tasklets.tasklet
def foo():
es = [model.Model(key=model.Key('Foo', None)) for _ in range(49)]
Expand All @@ -156,6 +157,7 @@ def foo():
ks = yield fs
self.assertEqual(len(ks), 49)
self.assertTrue(all(isinstance(k, model.Key) for k in ks))

foo().get_result()
self.assertEqual(len(MyAutoBatcher._log), 4)
for name, todo in MyAutoBatcher._log[2:]:
Expand All @@ -168,8 +170,10 @@ def foo():
def testContext_AutoBatcher_Errors(self):
# Test that errors are properly distributed over all Futures.
self.ExpectWarnings()

class Blobby(model.Model):
blob = model.BlobProperty()

ent1 = Blobby()
ent2 = Blobby(blob='x' * 2000000)
fut1 = self.ctx.put(ent1)
Expand All @@ -192,6 +196,7 @@ def testContext_MultiRpc(self):
# gives more assurance that it works.
config = datastore_rpc.Configuration(max_get_keys=3, max_put_entities=3)
self.ctx._conn = model.make_connection(config, default_model=model.Expando)

@tasklets.tasklet
def foo():
ents = [model.Expando() for _ in range(10)]
Expand Down Expand Up @@ -264,6 +269,7 @@ def testContext_CacheMisses(self):
def testContext_CachePolicy(self):
def should_cache(unused_key):
return False

@tasklets.tasklet
def foo():
key1 = model.Key(flat=('Foo', 1))
Expand Down Expand Up @@ -307,15 +313,20 @@ def testContext_NamespaceBonanza(self):
# correct namespace.
def assertNone(expr):
self.assertTrue(expr is None, repr(expr))

def assertNotNone(expr):
self.assertTrue(expr is not None, repr(expr))

def assertLocked(expr):
self.assertTrue(expr is context._LOCKED, repr(expr))

def assertProtobuf(expr, ent):
self.assertEqual(expr,
ent._to_pb(set_key=False).SerializePartialToString())

class Foo(model.Model):
pass

k1 = model.Key(Foo, 1, namespace='a')
k2 = model.Key(Foo, 2, namespace='b')
mk1 = self.ctx._memcache_prefix + k1.urlsafe()
Expand Down Expand Up @@ -481,6 +492,7 @@ class Foo(model.Model):

def testContext_MemcachePolicy(self):
badkeys = []

def tracking_add_async(*args, **kwds):
try:
res = save_add_async(*args, **kwds)
Expand All @@ -491,6 +503,7 @@ def tracking_add_async(*args, **kwds):
except Exception, err:
track.append((args, kwds, None, err))
raise

@tasklets.tasklet
def foo():
k1, k2 = yield self.ctx.put(ent1), self.ctx.put(ent2)
Expand Down Expand Up @@ -572,9 +585,11 @@ def foo():
self.assertTrue(key2 in self.ctx._cache) # Whitebox.
self.assertEqual(key1, key1a)
self.assertEqual(key2, key2a)

@tasklets.tasklet
def callback(ent):
return ent

qry = query.Query(kind='Foo')
results = yield self.ctx.map_query(qry, callback)
self.assertEqual(results, [ent1, ent2])
Expand All @@ -596,6 +611,7 @@ def testContext_MapQuery(self):
@tasklets.tasklet
def callback(ent):
return ent.key.flat()[-1]

@tasklets.tasklet
def foo():
yield self.create_entities()
Expand All @@ -621,6 +637,7 @@ def foo():
def testContext_MapQuery_NonTaskletCallback(self):
def callback(ent):
return ent.key.flat()[-1]

@tasklets.tasklet
def foo():
yield self.create_entities()
Expand All @@ -632,9 +649,11 @@ def foo():

def testContext_MapQuery_CustomFuture(self):
mfut = tasklets.QueueFuture()

@tasklets.tasklet
def callback(ent):
return ent.key.flat()[-1]

@tasklets.tasklet
def foo():
yield self.create_entities()
Expand All @@ -653,23 +672,28 @@ def foo():

def testContext_MapQuery_KeysOnly(self):
qo = query.QueryOptions(keys_only=True)

@tasklets.tasklet
def callback(key):
return key.pairs()[-1]

@tasklets.tasklet
def foo():
yield self.create_entities()
qry = query.Query(kind='Foo')
res = yield self.ctx.map_query(qry, callback, options=qo)
raise tasklets.Return(res)

res = foo().get_result()
self.assertEqual(set(res), set([('Foo', 1), ('Foo', 2), ('Foo', 3)]))

def testContext_MapQuery_Cursors(self):
qo = query.QueryOptions(produce_cursors=True)

@tasklets.tasklet
def callback(ent):
return ent.key.pairs()[-1]

@tasklets.tasklet
def foo():
yield self.create_entities()
Expand Down Expand Up @@ -706,6 +730,7 @@ def foo():
key = model.Key(flat=('Foo', 1))
ent = model.Expando(key=key, bar=1)
yield self.ctx.put(ent)

@tasklets.tasklet
def callback():
ctx = tasklets.get_context()
Expand All @@ -721,9 +746,11 @@ def callback():
def testContext_TransactionException(self):
self.ExpectWarnings()
key = model.Key('Foo', 1)

@tasklets.tasklet
def foo():
ent = model.Expando(key=key, bar=1)

@tasklets.tasklet
def callback():
yield ent.put_async()
Expand All @@ -735,9 +762,11 @@ def callback():
def testContext_TransactionRollback(self):
self.ExpectWarnings()
key = model.Key('Foo', 1)

@tasklets.tasklet
def foo():
ent = model.Expando(key=key, bar=1)

@tasklets.tasklet
def callback():
yield ent.put_async()
Expand All @@ -752,11 +781,14 @@ def testContext_TransactionRollbackException(self):

class CustomException(Exception):
pass

def bad_transaction(*arg, **kwargs):
return datastore_rpc.datastore_pb.Transaction()

@tasklets.tasklet
def foo():
ent = model.Expando(key=key, bar=1)

@tasklets.tasklet
def callback():
# Cause rollback to return an exception
Expand All @@ -775,9 +807,11 @@ def callback():
def testContext_TransactionAddTask(self):
self.ExpectWarnings()
key = model.Key('Foo', 1)

@tasklets.tasklet
def foo():
ent = model.Expando(key=key, bar=1)

@tasklets.tasklet
def callback():
ctx = tasklets.get_context()
Expand All @@ -793,6 +827,7 @@ def testContext_TransactionXG(self):

key1 = model.Key('Foo', 1)
key2 = model.Key('Foo', 2)

@tasklets.tasklet
def tx():
ctx = tasklets.get_context()
Expand Down Expand Up @@ -894,8 +929,10 @@ def outer_callback():

def testTransaction_OnCommit(self):
self.ExpectWarnings()

class Counter(model.Model):
count = model.IntegerProperty(default=0)

@model.transactional
def trans1(fail=False, bad=None):
tasklets.get_context().call_on_commit(lambda: log.append('A'))
Expand Down Expand Up @@ -947,6 +984,7 @@ def testDefaultContextTransaction(self):
@tasklets.synctasklet
def outer():
ctx1 = tasklets.get_context()

@tasklets.tasklet
def inner():
ctx2 = tasklets.get_context()
Expand All @@ -963,9 +1001,11 @@ def inner():

def testExplicitTransactionClearsDefaultContext(self):
old_ctx = tasklets.get_context()

@tasklets.synctasklet
def outer():
ctx1 = tasklets.get_context()

@tasklets.tasklet
def inner():
ctx = tasklets.get_context()
Expand All @@ -991,13 +1031,15 @@ def testKindError(self):
# be satisfied from the cache, so the adapter we're testing will never get
# called.
ctx.set_cache_policy(lambda unused_key: False)

@tasklets.tasklet
def foo():
# Foo class is declared in query_test, so let's get a unusual class name.
key1 = model.Key(flat=('ThisModelClassDoesntExist', 1))
ent1 = model.Expando(key=key1, foo=42, bar='hello')
yield ctx.put(ent1)
yield ctx.get(key1)

self.assertRaises(model.KindError, foo().check_success)

def testMemcachePolicy(self):
Expand Down Expand Up @@ -1046,6 +1088,7 @@ class EmptyModel(model.Model):

def testMemcacheAPI(self):
self.ExpectErrors()

@tasklets.tasklet
def foo():
ctx = tasklets.get_context()
Expand Down Expand Up @@ -1112,12 +1155,15 @@ def testMemcacheErrors(self):
# See issue 94. http://goo.gl/E7OBH
# Install an error handler.
save_create_rpc = memcache.create_rpc

def fake_check_success(*args):
raise apiproxy_errors.Error('fake error')

def fake_create_rpc(*args, **kwds):
rpc = save_create_rpc(*args, **kwds)
rpc.check_success = fake_check_success
return rpc

try:
memcache.create_rpc = fake_create_rpc
val = self.ctx.memcache_get('key2').get_result()
Expand Down Expand Up @@ -1375,12 +1421,14 @@ def _start_test_server(self):
else:
self.fail('Could not find an unused port in 10 tries')
s.listen(1)

def run():
c, addr = s.accept()
s.close()
c.recv(1000) # Throw away request.
c.send('HTTP/1.0 200 Ok\r\n\r\n') # Emptiest response.
c.close()

t = threading.Thread(target=run)
t.setDaemon(True)
t.start()
Expand Down Expand Up @@ -1424,8 +1472,10 @@ class Blobby(model.Model):

def testDatastoreConnectionIsRestored(self):
# See issue 209. http://goo.gl/7TEyM

class TestData(model.Model):
pass

@tasklets.tasklet
def txn():
conn1 = datastore._GetConnection()
Expand All @@ -1434,6 +1484,7 @@ def txn():
yield TestData().put_async()
conn2 = datastore._GetConnection()
self.assertEqual(conn1, conn2)

@tasklets.synctasklet
def many_txns():
# Exactly how many transactions are needed to make this fail
Expand All @@ -1446,6 +1497,7 @@ def many_txns():
yield ts
conn_c = datastore._GetConnection()
self.assertEqual(conn_b, conn_c)

conn_before = datastore._GetConnection()
many_txns()
conn_after = datastore._GetConnection()
Expand All @@ -1455,8 +1507,10 @@ def testMemcacheAndContextCache(self):
self.ctx.set_datastore_policy(True)
self.ctx.set_cache_policy(False)
self.ctx.set_memcache_policy(True)

class EmptyModel(model.Model):
pass

key = EmptyModel().put()
self.ctx.get(key).get_result() # pull entity into memcache
self.ctx.set_cache_policy(True)
Expand All @@ -1480,8 +1534,10 @@ def setUp(self):

def testGetFutureCachingOn(self):
self.ctx.set_memcache_policy(False)

class EmptyModel(model.Model):
pass

key = EmptyModel().put()
MyAutoBatcher.reset_log() # TODO Find out why put calls get_tasklet
self.ctx.set_cache_policy(True)
Expand All @@ -1496,8 +1552,10 @@ class EmptyModel(model.Model):

def testGetFutureCachingOff(self):
self.ctx.set_memcache_policy(False)

class EmptyModel(model.Model):
pass

key = EmptyModel().put()
MyAutoBatcher.reset_log() # TODO Find out why put calls get_tasklet
f1, f2 = self.ctx.get(key), self.ctx.get(key)
Expand Down
2 changes: 2 additions & 0 deletions ndb/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,13 @@ def queue_rpc(self, rpc, callback=None, *args, **kwds):
if len(rpcs) > 1:
# Don't call the callback until all sub-rpcs have completed.
rpc.__done = False

def help_multi_rpc_along(r=rpc, c=callback, a=args, k=kwds):
if r.state == _FINISHING and not r.__done:
r.__done = True
c(*a, **k)
# TODO: And again, what about exceptions?

callback = help_multi_rpc_along
args = ()
kwds = {}
Expand Down

0 comments on commit 3e49d1c

Please sign in to comment.