Skip to content

Commit

Permalink
Add code around sql-storage methods to reconnect if conn unusable.
Browse files Browse the repository at this point in the history
Fixes / intended to address #619

In testing locally when I restarted a postgres db, one iteration of the
scheduler and enqueueing failed, and then went back to working the next
run.
  • Loading branch information
coleifer committed Sep 15, 2021
1 parent 730e7e9 commit 9b0ce1a
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions huey/contrib/sql_huey.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ def schedule(self, *columns):
def kv(self, *columns):
return self.KV.select(*columns).where(self.KV.queue == self.name)

def check_conn(self):
if not self.database.is_connection_usable():
self.database.close()
self.database.connect()

def enqueue(self, data, priority=None):
self.check_conn()
self.Task.create(queue=self.name, data=data, priority=priority or 0)

def dequeue(self):
self.check_conn()
query = (self.tasks(self.Task.id, self.Task.data)
.order_by(self.Task.priority.desc(), self.Task.id)
.limit(1))
Expand Down Expand Up @@ -116,9 +123,11 @@ def flush_queue(self):
self.Task.delete().where(self.Task.queue == self.name).execute()

def add_to_schedule(self, data, timestamp, utc):
self.check_conn()
self.Schedule.create(queue=self.name, data=data, timestamp=timestamp)

def read_schedule(self, timestamp):
self.check_conn()
query = (self.schedule(self.Schedule.id, self.Schedule.data)
.where(self.Schedule.timestamp <= timestamp)
.tuples())
Expand Down Expand Up @@ -154,6 +163,7 @@ def flush_schedule(self):
.execute())

def put_data(self, key, value, is_result=False):
self.check_conn()
if isinstance(self.database, PostgresqlDatabase):
(self.KV
.insert(queue=self.name, key=key, value=value)
Expand All @@ -164,6 +174,7 @@ def put_data(self, key, value, is_result=False):
self.KV.replace(queue=self.name, key=key, value=value).execute()

def peek_data(self, key):
self.check_conn()
try:
kv = self.kv(self.KV.value).where(self.KV.key == key).get()
except self.KV.DoesNotExist:
Expand All @@ -172,6 +183,7 @@ def peek_data(self, key):
return kv.value

def pop_data(self, key):
self.check_conn()
query = self.kv().where(self.KV.key == key)
if self.database.for_update:
query = query.for_update()
Expand All @@ -188,9 +200,11 @@ def pop_data(self, key):
return kv.value if dq.execute() == 1 else EmptyData

def has_data_for_key(self, key):
self.check_conn()
return self.kv().where(self.KV.key == key).exists()

def put_if_empty(self, key, value):
self.check_conn()
try:
with self.database.atomic():
self.KV.insert(queue=self.name, key=key, value=value).execute()
Expand Down

2 comments on commit 9b0ce1a

@Loches525
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know if new version will be released recently?

@coleifer
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.4.1 released.

Please sign in to comment.