Skip to content

Commit

Permalink
Add FileHuey and full FileStorage implementations.
Browse files Browse the repository at this point in the history
Note: the file-system storage should not be used in production as it
effectively has a concurrency of 1 due to exclusive locks around all
file-system operations.
  • Loading branch information
coleifer committed Jan 8, 2020
1 parent 29e95bb commit d31940b
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 30 deletions.
4 changes: 4 additions & 0 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from huey.registry import Registry
from huey.serializer import Serializer
from huey.storage import BlackHoleStorage
from huey.storage import FileStorage
from huey.storage import MemoryStorage
from huey.storage import PriorityRedisExpireStorage
from huey.storage import PriorityRedisStorage
Expand Down Expand Up @@ -1029,3 +1030,6 @@ class PriorityRedisHuey(Huey):

class PriorityRedisExpireHuey(Huey):
storage_class = PriorityRedisExpireStorage

class FileHuey(Huey):
storage_class = FileStorage
184 changes: 162 additions & 22 deletions huey/storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import deque
import base64
import contextlib
import hashlib
import heapq
Expand Down Expand Up @@ -28,6 +29,7 @@

from huey.constants import EmptyData
from huey.exceptions import ConfigurationError
from huey.utils import FileLock
from huey.utils import text_type
from huey.utils import to_timestamp

Expand Down Expand Up @@ -822,41 +824,178 @@ def flush_results(self):
self.sql('delete from kv where queue=?', (self.name,), True)


class FileStorageMethods(object):
class FileStorage(BaseStorage):
"""
Mixin class implementing the result-store APIs using the filesystem.
Simple file-system storage implementation.
This storage implementation should NOT be used in production as it utilizes
exclusive locks around all file-system operations. This is done to prevent
race-conditions when reading from the file-system.
"""
def __init__(self, name, path, levels=2, **storage_kwargs):
super(FileStorageMethods, self).__init__(name, **storage_kwargs)
MAX_PRIORITY = 0xffff

def __init__(self, name, path, levels=2, use_thread_lock=False,
**storage_kwargs):
super(FileStorage, self).__init__(name, **storage_kwargs)

self.path = path
if os.path.exists(self.path) and not os.path.isdir(self.path):
raise ValueError('path "%s" is not a directory' % path)
if levels < 0 or levels > 4:
raise ValueError('%s levels must be between 0 and 4' % self)

self.queue_path = os.path.join(self.path, 'queue')
self.schedule_path = os.path.join(self.path, 'schedule')
self.result_path = os.path.join(self.path, 'results')
self.levels = levels

if use_thread_lock:
self.lock = threading.Lock()
else:
self.lock_file = os.path.join(self.path, '.lock')
self.lock = FileLock(self.lock_file)

def _flush_dir(self, path):
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)

def enqueue(self, data, priority=None):
priority = priority or 0
if priority < 0: raise ValueError('priority must be a positive number')
if priority > self.MAX_PRIORITY:
raise ValueError('priority must be <= %s' % self.MAX_PRIORITY)

with self.lock:
if not os.path.exists(self.queue_path):
os.makedirs(self.queue_path)

# Encode the filename so that tasks are sorted by priority (desc) and
# timestamp (asc).
prefix = '%04x-%012x' % (
self.MAX_PRIORITY - priority,
int(time.time() * 1000))

base = filename = os.path.join(self.queue_path, prefix)
conflict = 0
while os.path.exists(filename):
conflict += 1
filename = '%s.%03d' % (base, conflict)

with open(filename, 'wb') as fh:
fh.write(data)

def _get_sorted_filenames(self, path):
if not os.path.exists(path):
return ()
return [f for f in sorted(os.listdir(path)) if not f.endswith('.tmp')]

def dequeue(self):
with self.lock:
filenames = self._get_sorted_filenames(self.queue_path)
if not filenames:
return

filename = os.path.join(self.queue_path, filenames[0])
tmp_dest = filename + '.tmp'
os.rename(filename, tmp_dest)

with open(tmp_dest, 'rb') as fh:
data = fh.read()
os.unlink(tmp_dest)
return data

def queue_size(self):
return len(self._get_sorted_filenames(self.queue_path))

def enqueued_items(self, limit=None):
filenames = self._get_sorted_filenames(self.queue_path)[:limit]
accum = []
for filename in filenames:
with open(os.path.join(self.queue_path, filename), 'rb') as fh:
accum.append(fh.read())
return accum

def flush_queue(self):
self._flush_dir(self.queue_path)

def _timestamp_to_prefix(self, ts):
ts = time.mktime(ts.timetuple()) + (ts.microsecond * 1e-6)
return '%012x' % int(ts * 1000)

def add_to_schedule(self, data, ts, utc):
with self.lock:
if not os.path.exists(self.schedule_path):
os.makedirs(self.schedule_path)

ts_prefix = self._timestamp_to_prefix(ts)
base = filename = os.path.join(self.schedule_path, ts_prefix)
conflict = 0
while os.path.exists(filename):
conflict += 1
filename = '%s.%03d' % (base, conflict)

with open(filename, 'wb') as fh:
fh.write(data)

def read_schedule(self, ts):
with self.lock:
prefix = self._timestamp_to_prefix(ts)
accum = []
for basename in self._get_sorted_filenames(self.schedule_path):
if basename[:12] > prefix:
break
filename = os.path.join(self.schedule_path, basename)
new_filename = filename + '.tmp'
os.rename(filename, new_filename)
accum.append(new_filename)

tasks = []
for filename in accum:
with open(filename, 'rb') as fh:
tasks.append(fh.read())
os.unlink(filename)

return tasks

def schedule_size(self):
return len(self._get_sorted_filenames(self.schedule_path))

def scheduled_items(self, limit=None):
filenames = self._get_sorted_filenames(self.schedule_path)[:limit]
accum = []
for filename in filenames:
with open(os.path.join(self.schedule_path, filename), 'rb') as fh:
accum.append(fh.read())
return accum

def flush_schedule(self):
self._flush_dir(self.schedule_path)

def path_for_key(self, key):
if isinstance(key, text_type):
key = key.encode('utf8')
checksum = hashlib.md5(key).hexdigest()
prefix = checksum[:self.levels]
prefix_filename = itertools.chain(prefix, (checksum,))
return os.path.join(self.path, *prefix_filename)
return os.path.join(self.result_path, *prefix_filename)

def put_data(self, key, value, is_result=False):
if isinstance(key, text_type):
key = key.encode('utf8')

filename = self.path_for_key(key)
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)

with open(self.path_for_key(key), 'wb') as fh:
key_len = len(key)
fh.write(struct.pack('>I', key_len))
fh.write(key)
fh.write(value)
with self.lock:
if not os.path.exists(dirname):
os.makedirs(dirname)

with open(self.path_for_key(key), 'wb') as fh:
key_len = len(key)
fh.write(struct.pack('>I', key_len))
fh.write(key)
fh.write(value)

def _unpack_result(self, data):
key_len, = struct.unpack('>I', data[:4])
Expand All @@ -878,13 +1017,15 @@ def peek_data(self, key):

def pop_data(self, key):
filename = self.path_for_key(key)
if not os.path.exists(filename):
return EmptyData

with open(filename, 'rb') as fh:
_, value = self._unpack_result(fh.read())
with self.lock:
if not os.path.exists(filename):
return EmptyData

with open(filename, 'rb') as fh:
_, value = self._unpack_result(fh.read())

os.unlink(filename)
os.unlink(filename)

# If file is corrupt or has been tampered with, return EmptyData.
return value if value is not None else EmptyData
Expand All @@ -893,11 +1034,12 @@ def has_data_for_key(self, key):
return os.path.exists(self.path_for_key(key))

def result_store_size(self):
return sum(len(filenames) for _, _, filenames in os.walk(self.path))
return sum(len(filenames) for _, _, filenames
in os.walk(self.result_path))

def result_items(self):
accum = {}
for root, _, filenames in os.walk(self.path):
for root, _, filenames in os.walk(self.result_path):
for filename in filenames:
path = os.path.join(root, filename)
with open(path, 'rb') as fh:
Expand All @@ -906,6 +1048,4 @@ def result_items(self):
return accum

def flush_results(self):
if os.path.exists(self.path):
shutil.rmtree(self.path)
os.makedirs(self.path)
self._flush_dir(self.result_path)
67 changes: 59 additions & 8 deletions huey/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import itertools
import os
import shutil
import threading
import unittest
try:
from queue import Queue
except ImportError:
from Queue import Queue

from redis.connection import ConnectionPool
from redis import Redis
Expand All @@ -17,7 +22,7 @@
from huey.constants import EmptyData
from huey.consumer import Consumer
from huey.exceptions import ConfigurationError
from huey.storage import FileStorageMethods
from huey.storage import FileStorage
from huey.storage import MemoryStorage
from huey.storage import RedisExpireStorage
from huey.tests.base import BaseTestCase
Expand Down Expand Up @@ -274,19 +279,18 @@ def test_timeout(self):
self.assertEqual(curs.fetchone(), (3000,))


class MemFileStorage(FileStorageMethods, MemoryStorage): pass


class TestFileStorageMethods(StorageTests, BaseTestCase):
path = '/tmp/test-huey-storage'
result_path = '/tmp/test-huey-storage/results'
queue_path = '/tmp/test-huey-storage/queue'

def tearDown(self):
super(TestFileStorageMethods, self).tearDown()
if os.path.exists(self.path):
shutil.rmtree(self.path)

def get_huey(self):
return Huey('test-file-storage', storage_class=MemFileStorage,
return Huey('test-file-storage', storage_class=FileStorage,
path=self.path, levels=2)

def test_filesystem_result_store(self):
Expand All @@ -296,8 +300,9 @@ def test_filesystem_result_store(self):
keys = (b'k1', b'k2', b'kx')
for key in keys:
checksum = hashlib.md5(key).hexdigest()
b0, b1 = checksum[0], checksum[1]
# Default is to use two levels.
key_path = os.path.join(self.path, checksum[0], checksum[1])
key_path = os.path.join(self.result_path, b0, b1)
key_filename = os.path.join(key_path, checksum)

self.assertFalse(os.path.exists(key_filename))
Expand All @@ -313,5 +318,51 @@ def test_filesystem_result_store(self):

# Flushing the results blows away everything.
s.flush_results()
self.assertTrue(os.path.exists(self.path))
self.assertEqual(os.listdir(self.path), [])
self.assertTrue(os.path.exists(self.result_path))
self.assertEqual(os.listdir(self.result_path), [])

def test_fs_multithreaded(self):
def create_tasks(t, n, q):
for i in range(n):
message = str((t * n) + i)
self.huey.storage.enqueue(message.encode('utf8'))
q.put(message)

def dequeue_tasks(q):
while True:
data = self.huey.storage.dequeue()
if data is None:
break
q.put(data.decode('utf8'))

nthreads = 10
ntasks = 50
in_q = Queue()
threads = []
for i in range(nthreads):
t = threading.Thread(target=create_tasks, args=(i, ntasks, in_q))
t.daemon = True
threads.append(t)

for t in threads: t.start()
for t in threads: t.join()

self.assertEqual(self.huey.pending_count(), nthreads * ntasks)

out_q = Queue()
threads = []
for i in range(nthreads):
t = threading.Thread(target=dequeue_tasks, args=(out_q,))
t.daemon = True
threads.append(t)

for t in threads: t.start()
for t in threads: t.join()

self.assertEqual(out_q.qsize(), nthreads * ntasks)
self.assertEqual(self.huey.pending_count(), 0)

# Ensure that the order in which tasks were enqueued is the order in
# which they are dequeued.
for i in range(nthreads * ntasks):
self.assertEqual(in_q.get(), out_q.get())

0 comments on commit d31940b

Please sign in to comment.