Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use context managers to handle storage syncing #12

Merged
merged 1 commit into from
Apr 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 6 additions & 10 deletions backend/catalog/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ def __init__(self, name, config):

self._model = RDF.Model(self._store)

def __enter__(self):
return self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we need to sync on open too to get any changes other processes has written to disc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, other updaters will sync when they finish doing their work anyway, so syncing here is unneccessary.


def __exit__(self, type, value, traceback):
self._model.sync()

def _get_linked_work(self, predicate, object):
"""
Return linked work for a source or post (Entry type defined by predicate)
Expand Down Expand Up @@ -412,7 +418,6 @@ def create_work(self, timestamp, user_uri, work_uri, work_data):
})

work.to_model(self._model)
self._model.sync()
return work.get_data()

def update_work(self, timestamp, user_uri, work_uri, work_data):
Expand All @@ -438,7 +443,6 @@ def update_work(self, timestamp, user_uri, work_uri, work_data):
self.delete_work(user_uri=user_uri, work_uri=work_uri)

new_work.to_model(self._model)
self._model.sync()
return new_work.get_data()

def delete_work(self, user_uri, work_uri, linked_entries=False):
Expand All @@ -461,8 +465,6 @@ def delete_work(self, user_uri, work_uri, linked_entries=False):
work_context = RDF.Node(RDF.Uri(work_uri))
self._model.remove_statements_with_context(work_context)

self._model.sync()

def get_work(self, user_uri, work_uri, subgraph=None):
work = Work.from_model(self._model, work_uri)

Expand Down Expand Up @@ -518,7 +520,6 @@ def create_work_source(self, timestamp, user_uri, work_uri, source_uri, source_d
if (statement, work_subject) not in self._model:
self._model.append(statement, context=work_subject)

self._model.sync()
return source.get_data()

def create_stock_source(self, timestamp, user_uri, source_uri, source_data):
Expand Down Expand Up @@ -552,7 +553,6 @@ def create_stock_source(self, timestamp, user_uri, source_uri, source_data):
# TODO: do we need context for user-related stuff?
self._model.append(statement, context=user_subject)

self._model.sync()
return source.get_data()

def update_source(self, timestamp, user_uri, source_uri, source_data):
Expand All @@ -573,7 +573,6 @@ def update_source(self, timestamp, user_uri, source_uri, source_data):
self.delete_source(user_uri=user_uri, source_uri=source_uri, unlink=False)

new_source.to_model(self._model)
self._model.sync()
return new_source.get_data()

def delete_source(self, user_uri, source_uri, unlink=True):
Expand All @@ -596,7 +595,6 @@ def delete_source(self, user_uri, source_uri, unlink=True):
subgraph_context = RDF.Node(uri_string=str(subgraph_uri))
self._model.remove_statements_with_context(subgraph_context)
self._model.remove_statements_with_context(RDF.Node(RDF.Uri(source_uri)))
self._model.sync()

def get_source(self, user_uri, source_uri, subgraph=None):
source = Source.from_model(self._model, source_uri)
Expand Down Expand Up @@ -669,7 +667,6 @@ def create_post(self, timestamp, user_uri, work_uri, post_uri, post_data):
if (statement, work_subject) not in self._model:
self._model.append(statement, context=work_subject)

self._model.sync()
return post.get_data()

def delete_post(self, user_uri, post_uri):
Expand All @@ -691,7 +688,6 @@ def delete_post(self, user_uri, post_uri):
subgraph_context = RDF.Node(uri_string=str(subgraph_uri))
self._model.remove_statements_with_context(subgraph_context)
self._model.remove_statements_with_context(RDF.Node(RDF.Uri(post_uri)))
self._model.sync()

def get_post(self, user_uri, post_uri, subgraph=None):
post = Post.from_model(self._model, post_uri)
Expand Down
126 changes: 71 additions & 55 deletions backend/catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ def create_work(self, user_uri, work_uri, work_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
work_data = self.main_store.create_work(timestamp, user_uri, work_uri, work_data)
with self.main_store as store:
timestamp = int(time.time())
work_data = store.create_work(timestamp, user_uri, work_uri, work_data)

log_data = json.dumps(work_data)
log_event.apply_async(args=('create_work', timestamp, user_uri, work_uri, None, log_data))
log_data = json.dumps(work_data)
log_event.apply_async(args=('create_work', timestamp, user_uri, work_uri, None, log_data))

on_create_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
on_create_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -133,14 +134,15 @@ def update_work(self, user_uri, work_uri, work_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
work_data = self.main_store.update_work(timestamp, user_uri, work_uri, work_data)
with self.main_store as store:
timestamp = int(time.time())
work_data = store.update_work(timestamp, user_uri, work_uri, work_data)

log_data = json.dumps(work_data)
log_event.apply_async(args=('update_work', timestamp, user_uri, work_uri, None, log_data))
log_data = json.dumps(work_data)
log_event.apply_async(args=('update_work', timestamp, user_uri, work_uri, None, log_data))

on_update_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
on_update_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, work_data=work_data)
return work_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -163,12 +165,13 @@ def delete_work(self, user_uri, work_uri):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
self.main_store.delete_work(user_uri, work_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_work(user_uri, work_uri)

log_event.apply_async(args=('delete_work', timestamp, user_uri, work_uri, None, None))
log_event.apply_async(args=('delete_work', timestamp, user_uri, work_uri, None, None))

on_delete_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri)
on_delete_work.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -200,14 +203,15 @@ def create_work_source(self, user_uri, work_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
source_data = self.main_store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('create_work_source', timestamp, user_uri, work_uri, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('create_work_source', timestamp, user_uri, work_uri, source_uri, log_data))

on_create_work_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_create_work_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -236,14 +240,15 @@ def create_stock_source(self, user_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, user_uri):
timestamp = int(time.time())
source_data = self.main_store.create_stock_source(timestamp, user_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.create_stock_source(timestamp, user_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('create_stock_source', timestamp, user_uri, None, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('create_stock_source', timestamp, user_uri, None, source_uri, log_data))

on_create_stock_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_create_stock_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -273,14 +278,15 @@ def update_source(self, user_uri, source_uri, source_data):
"""
try:
with RedisLock(self.lock_db, source_uri):
timestamp = int(time.time())
source_data = self.main_store.update_source(timestamp, user_uri, source_uri, source_data)
with self.main_store as store:
timestamp = int(time.time())
source_data = store.update_source(timestamp, user_uri, source_uri, source_data)

log_data = json.dumps(source_data)
log_event.apply_async(args=('update_source', timestamp, user_uri, None, source_uri, log_data))
log_data = json.dumps(source_data)
log_event.apply_async(args=('update_source', timestamp, user_uri, None, source_uri, log_data))

on_update_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
on_update_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri, source_data=source_data)
return source_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -303,12 +309,13 @@ def delete_source(self, user_uri, source_uri):
"""
try:
with RedisLock(self.lock_db, source_uri):
timestamp = int(time.time())
self.main_store.delete_source(user_uri, source_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_source(user_uri, source_uri)

log_event.apply_async(args=('delete_source', timestamp, user_uri, None, source_uri, None))
log_event.apply_async(args=('delete_source', timestamp, user_uri, None, source_uri, None))

on_delete_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri)
on_delete_source.send(sender=self, timestamp=timestamp, user_uri=user_uri, source_uri=source_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand Down Expand Up @@ -340,14 +347,15 @@ def create_post(self, user_uri, work_uri, post_uri, post_data):
"""
try:
with RedisLock(self.lock_db, work_uri):
timestamp = int(time.time())
post_data = self.main_store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
with self.main_store as store:
timestamp = int(time.time())
post_data = store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)

log_data = json.dumps(post_data)
log_event.apply_async(args=('create_post', timestamp, user_uri, work_uri, post_uri, log_data))
log_data = json.dumps(post_data)
log_event.apply_async(args=('create_post', timestamp, user_uri, work_uri, post_uri, log_data))

on_create_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, post_uri=post_uri, post_data=post_data)
return post_data
on_create_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, work_uri=work_uri, post_uri=post_uri, post_data=post_data)
return post_data
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -371,12 +379,13 @@ def delete_post(self, user_uri, post_uri):
"""
try:
with RedisLock(self.lock_db, post_uri):
timestamp = int(time.time())
self.main_store.delete_post(user_uri, post_uri)
with self.main_store as store:
timestamp = int(time.time())
store.delete_post(user_uri, post_uri)

log_event.apply_async(args=('delete_post', timestamp, user_uri, None, post_uri, None))
log_event.apply_async(args=('delete_post', timestamp, user_uri, None, post_uri, None))

on_delete_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, post_uri=post_uri)
on_delete_post.send(sender=self, timestamp=timestamp, user_uri=user_uri, post_uri=post_uri)
except CatalogError as e:
return error(e)
except LockedError as e:
Expand All @@ -392,7 +401,8 @@ def public_create_work(self, timestamp, user_uri, work_uri, work_data):
See create_work documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_work(timestamp, user_uri, work_uri, work_data)
with self.public_store as store:
store.create_work(timestamp, user_uri, work_uri, work_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -412,7 +422,8 @@ def public_delete_work(self, timestamp, user_uri, work_uri):
See delete_work documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.delete_work(user_uri, work_uri, linked_entries=True)
with self.public_store as store:
store.delete_work(user_uri, work_uri, linked_entries=True)
except EntryNotFoundError:
pass
except LockedError as e:
Expand All @@ -424,7 +435,8 @@ def public_create_work_source(self, timestamp, user_uri, work_uri, source_uri, s
See create_work_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
with self.public_store as store:
store.create_work_source(timestamp, user_uri, work_uri, source_uri, source_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -438,7 +450,8 @@ def public_update_source(self, timestamp, user_uri, source_uri, source_data):
See update_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + source_uri):
self.public_store.update_source(timestamp, user_uri, source_uri, source_data)
with self.public_store as store:
store.update_source(timestamp, user_uri, source_uri, source_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -448,7 +461,8 @@ def public_delete_source(self, timestamp, user_uri, source_uri, unlink=True):
See delete_source documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + source_uri):
self.public_store.delete_source(user_uri, source_uri)
with self.public_store as store:
store.delete_source(user_uri, source_uri)
except EntryNotFoundError:
pass
except LockedError as e:
Expand All @@ -460,7 +474,8 @@ def public_create_post(self, timestamp, user_uri, work_uri, post_uri, post_data)
See create_post documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + work_uri):
self.public_store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
with self.public_store as store:
store.create_post(timestamp, user_uri, work_uri, post_uri, post_data)
except LockedError as e:
raise self.retry(exc=e, countdown=5, max_retries=None)

Expand All @@ -470,7 +485,8 @@ def public_delete_post(self, timestamp, user_uri, post_uri):
See delete_post documentation for description of parameters."""
try:
with RedisLock(self.lock_db, "public." + post_uri):
self.public_store.delete_post(user_uri, post_uri)
with self.public_store as store:
store.delete_post(user_uri, post_uri)
except EntryNotFoundError:
pass
except LockedError as e:
Expand Down