Skip to content

Commit

Permalink
Merge pull request #12 from commonsmachinery/workers-concurrency
Browse files Browse the repository at this point in the history
Use context managers to handle storage syncing
  • Loading branch information
petli committed Apr 14, 2014
2 parents 438fe34 + ac97e14 commit d0a131a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 65 deletions.
16 changes: 6 additions & 10 deletions backend/catalog/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ def __init__(self, name, config):

self._model = RDF.Model(self._store)

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
self._model.sync()

def _get_linked_work(self, predicate, object):
"""
Return linked work for a source or post (Entry type defined by predicate)
Expand Down Expand Up @@ -412,7 +418,6 @@ def create_work(self, timestamp, user_uri, work_uri, work_data):
})

work.to_model(self._model)
self._model.sync()
return work.get_data()

def update_work(self, timestamp, user_uri, work_uri, work_data):
Expand All @@ -438,7 +443,6 @@ def update_work(self, timestamp, user_uri, work_uri, work_data):
self.delete_work(user_uri=user_uri, work_uri=work_uri)

new_work.to_model(self._model)
self._model.sync()
return new_work.get_data()

def delete_work(self, user_uri, work_uri, linked_entries=False):
Expand All @@ -461,8 +465,6 @@ def delete_work(self, user_uri, work_uri, linked_entries=False):
work_context = RDF.Node(RDF.Uri(work_uri))
self._model.remove_statements_with_context(work_context)

self._model.sync()

def get_work(self, user_uri, work_uri, subgraph=None):
work = Work.from_model(self._model, work_uri)

Expand Down Expand Up @@ -518,7 +520,6 @@ def create_work_source(self, timestamp, user_uri, work_uri, source_uri, source_d
if (statement, work_subject) not in self._model:
self._model.append(statement, context=work_subject)

self._model.sync()
return source.get_data()

def create_stock_source(self, timestamp, user_uri, source_uri, source_data):
Expand Down Expand Up @@ -552,7 +553,6 @@ def create_stock_source(self, timestamp, user_uri, source_uri, source_data):
# TODO: do we need context for user-related stuff?
self._model.append(statement, context=user_subject)

self._model.sync()
return source.get_data()

def update_source(self, timestamp, user_uri, source_uri, source_data):
Expand All @@ -573,7 +573,6 @@ def update_source(self, timestamp, user_uri, source_uri, source_data):
self.delete_source(user_uri=user_uri, source_uri=source_uri, unlink=False)

new_source.to_model(self._model)
self._model.sync()
return new_source.get_data()

def delete_source(self, user_uri, source_uri, unlink=True):
Expand All @@ -596,7 +595,6 @@ def delete_source(self, user_uri, source_uri, unlink=True):
subgraph_context = RDF.Node(uri_string=str(subgraph_uri))
self._model.remove_statements_with_context(subgraph_context)
self._model.remove_statements_with_context(RDF.Node(RDF.Uri(source_uri)))
self._model.sync()

def get_source(self, user_uri, source_uri, subgraph=None):
source = Source.from_model(self._model, source_uri)
Expand Down Expand Up @@ -669,7 +667,6 @@ def create_post(self, timestamp, user_uri, work_uri, post_uri, post_data):
if (statement, work_subject) not in self._model:
self._model.append(statement, context=work_subject)

self._model.sync()
return post.get_data()

def delete_post(self, user_uri, post_uri):
Expand All @@ -691,7 +688,6 @@ def delete_post(self, user_uri, post_uri):
subgraph_context = RDF.Node(uri_string=str(subgraph_uri))
self._model.remove_statements_with_context(subgraph_context)
self._model.remove_statements_with_context(RDF.Node(RDF.Uri(post_uri)))
self._model.sync()

def get_post(self, user_uri, post_uri, subgraph=None):
post = Post.from_model(self._model, post_uri)
Expand Down
126 changes: 71 additions & 55 deletions backend/catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ def create_work(self, user_uri, work_uri, work_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
work_data = self.main_store.create_work(timestamp, user_uri, work_uri, work_data)
with self.main_store as store:
timestamp = int(time.time())
work_data = store.create_work(timestamp, user_uri, work_uri, work_data)

log_data = json.dumps(work_data)
log_event.apply_async(args=('create_work', timestamp, user_uri, work_uri, None, log_data))
log_data = json.dumps(work_data)
log_event.apply_async(args=('create_work', timestamp, user_uri, work_uri, None, log_data))

on_create_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
on_create_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -133,14 +134,15 @@ def update_work(self, user_uri, work_uri, work_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
work_data = self.main_store.update_work(timestamp, user_uri, work_uri, work_data)
with self.main_store as store:
timestamp = int(time.time())
work_data = store.update_work(timestamp, user_uri, work_uri, work_data)

log_data = json.dumps(work_data)
log_event.apply_async(args=('update_work', timestamp, user_uri, work_uri, None, log_data))
log_data = json.dumps(work_data)
log_event.apply_async(args=('update_work', timestamp, user_uri, work_uri, None, log_data))

on_update_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
on_update_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -163,12 +165,13 @@ def delete_work(self, user_uri, work_uri):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
self.main_store.delete_work(user_uri, work_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_work(user_uri, work_uri)

log_event.apply_async(args=('delete_work', timestamp, user_uri, work_uri, None, None))
log_event.apply_async(args=('delete_work', timestamp, user_uri, work_uri, None, None))

on_delete_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri)
on_delete_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -200,14 +203,15 @@ def create_work_source(self, user_uri, work_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
source_data = self.main_store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('create_work_source', timestamp, user_uri, work_uri, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('create_work_source', timestamp, user_uri, work_uri, source_uri, log_data))

on_create_work_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_create_work_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -236,14 +240,15 @@ def create_stock_source(self, user_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, user_uri):
timestamp = int(time.time())
source_data = self.main_store.create_stock_source(timestamp, user_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.create_stock_source(timestamp, user_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('create_stock_source', timestamp, user_uri, None, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('create_stock_source', timestamp, user_uri, None, source_uri, log_data))

on_create_stock_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_create_stock_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -273,14 +278,15 @@ def update_source(self, user_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, source_uri):
timestamp = int(time.time())
source_data = self.main_store.update_source(timestamp, user_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.update_source(timestamp, user_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('update_source', timestamp, user_uri, None, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('update_source', timestamp, user_uri, None, source_uri, log_data))

on_update_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_update_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -303,12 +309,13 @@ def delete_source(self, user_uri, source_uri):
"""
try:
with RedisLock(self.lock_db, source_uri):
timestamp = int(time.time())
self.main_store.delete_source(user_uri, source_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_source(user_uri, source_uri)

log_event.apply_async(args=('delete_source', timestamp, user_uri, None, source_uri, None))
log_event.apply_async(args=('delete_source', timestamp, user_uri, None, source_uri, None))

on_delete_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri)
on_delete_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -340,14 +347,15 @@ def create_post(self, user_uri, work_uri, post_uri, post_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
post_data = self.main_store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
with self.main_store as store:
timestamp = int(time.time())
post_data = store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)

log_data = json.dumps(post_data)
log_event.apply_async(args=('create_post', timestamp, user_uri, work_uri, post_uri, log_data))
log_data = json.dumps(post_data)
log_event.apply_async(args=('create_post', timestamp, user_uri, work_uri, post_uri, log_data))

on_create_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, post_uri=post_uri, post_data=post_data)
return post_data
on_create_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, post_uri=post_uri, post_data=post_data)
return post_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -371,12 +379,13 @@ def delete_post(self, user_uri, post_uri):
"""
try:
with RedisLock(self.lock_db, post_uri):
timestamp = int(time.time())
self.main_store.delete_post(user_uri, post_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_post(user_uri, post_uri)

log_event.apply_async(args=('delete_post', timestamp, user_uri, None, post_uri, None))
log_event.apply_async(args=('delete_post', timestamp, user_uri, None, post_uri, None))

on_delete_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, post_uri=post_uri)
on_delete_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, post_uri=post_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -392,7 +401,8 @@ def public_create_work(self, timestamp, user_uri, work_uri, work_data):
See create_work documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_work(timestamp, user_uri, work_uri, work_data)
with self.public_store as store:
store.create_work(timestamp, user_uri, work_uri, work_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -412,7 +422,8 @@ def public_delete_work(self, timestamp, user_uri, work_uri):
See delete_work documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.delete_work(user_uri, work_uri, linked_entries=True)
with self.public_store as store:
store.delete_work(user_uri, work_uri, linked_entries=True)
except EntryNotFoundError:
pass
except LockedError as e:
Expand All @@ -424,7 +435,8 @@ def public_create_work_source(self, timestamp, user_uri, work_uri, source_uri, s
See create_work_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
with self.public_store as store:
store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -438,7 +450,8 @@ def public_update_source(self, timestamp, user_uri, source_uri, source_data):
See update_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + source_uri):
self.public_store.update_source(timestamp, user_uri, source_uri, source_data)
with self.public_store as store:
store.update_source(timestamp, user_uri, source_uri, source_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -448,7 +461,8 @@ def public_delete_source(self, timestamp, user_uri, source_uri, unlink=True):
See delete_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + source_uri):
self.public_store.delete_source(user_uri, source_uri)
with self.public_store as store:
store.delete_source(user_uri, source_uri)
except EntryNotFoundError:
pass
except LockedError as e:
Expand All @@ -460,7 +474,8 @@ def public_create_post(self, timestamp, user_uri, work_uri, post_uri, post_data)
See create_post documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
with self.public_store as store:
store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -470,7 +485,8 @@ def public_delete_post(self, timestamp, user_uri, post_uri):
See delete_post documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + post_uri):
self.public_store.delete_post(user_uri, post_uri)
with self.public_store as store:
store.delete_post(user_uri, post_uri)
except EntryNotFoundError:
pass
except LockedError as e:
Expand Down

0 comments on commit d0a131a

Please sign in to comment.