Skip to content

Commit

Permalink
Merge pull request #68 from MichaelAquilina/target_locking
Browse files Browse the repository at this point in the history
Target locking
  • Loading branch information
MichaelAquilina committed Sep 26, 2017
2 parents b91bce3 + ff1519c commit 3f01f04
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 6 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
boto3>=1.4.0
clint>=0.5.1
filelock>=2.0.12
python-magic>=0.4.12
tabulate>=0.7.7
tqdm>=4.8.4
Expand Down
1 change: 1 addition & 0 deletions s4/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def main(arguments):
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('nose').setLevel(logging.CRITICAL)
logging.getLogger('s3transfer').setLevel(logging.CRITICAL)
logging.getLogger('filelock').setLevel(logging.CRITICAL)

logger = logging.getLogger(__name__)
logger.setLevel(args.log_level)
Expand Down
6 changes: 6 additions & 0 deletions s4/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ def get_client_name(self):
def get_uri(self, key=''):
raise NotImplementedError()

def lock(self, timeout=10):
raise NotImplementedError()

def unlock(self):
raise NotImplementedError()

def put(self, key, sync_object):
raise NotImplementedError()

Expand Down
25 changes: 23 additions & 2 deletions s4/clients/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
except ImportError:
from scandir import scandir

import filelock

import magic

from s4.clients import SyncClient, SyncObject
Expand Down Expand Up @@ -42,10 +44,26 @@ def traverse(path, ignore_files=None):


class LocalSyncClient(SyncClient):
DEFAULT_IGNORE_FILES = ['.index', '.s4lock']

def __init__(self, path):
self.path = path
self.reload_index()
self.reload_ignore_files()
self._lock = filelock.FileLock(self.get_uri('.s4lock'))

def lock(self, timeout=10):
"""
Advisory lock.
Use to ensure that only one LocalSyncClient is working on the Target at the same time.
"""
self._lock.acquire(timeout=timeout)

def unlock(self):
"""
Unlock the active advisory lock.
"""
self._lock.release()

def get_client_name(self):
return 'local'
Expand Down Expand Up @@ -184,9 +202,12 @@ def set_remote_timestamp(self, key, timestamp):
self.index[key]['remote_timestamp'] = timestamp

def reload_ignore_files(self):
self.ignore_files = ['.index']
ignore_path = os.path.join(self.path, '.syncignore')

if os.path.exists(ignore_path):
with open(ignore_path, 'r') as fp:
ignore_list = fp.read().split('\n')
self.ignore_files.extend(ignore_list)
else:
ignore_list = []

self.ignore_files = self.DEFAULT_IGNORE_FILES + ignore_list
11 changes: 10 additions & 1 deletion s4/clients/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import collections
import copy
import fnmatch
import json
import logging
Expand Down Expand Up @@ -44,6 +45,8 @@ def is_ignored_key(key, ignore_files):


class S3SyncClient(SyncClient):
DEFAULT_IGNORE_FILES = ['.index', '.s4lock']

def __init__(self, boto, bucket, prefix):
self.boto = boto
self.bucket = bucket
Expand All @@ -52,6 +55,12 @@ def __init__(self, boto, bucket, prefix):
self._index = None
self._ignore_files = None

def lock(self):
pass

def unlock(self):
pass

def get_client_name(self):
return 's3'

Expand Down Expand Up @@ -222,7 +231,7 @@ def ignore_files(self):
return self._ignore_files

def reload_ignore_files(self):
self._ignore_files = ['.index']
self._ignore_files = copy.copy(self.DEFAULT_IGNORE_FILES)
try:
response = self.boto.get_object(
Bucket=self.bucket,
Expand Down
10 changes: 7 additions & 3 deletions s4/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def __repr__(self):
return 'SyncWorker<{}, {}>'.format(self.client_1.get_uri(), self.client_2.get_uri())

def sync(self, conflict_choice=None, keys=None):
self.client_1.lock()
self.client_2.lock()
try:
deferred_calls, unhandled_events = self.get_sync_states(keys)

Expand Down Expand Up @@ -142,11 +144,13 @@ def sync(self, conflict_choice=None, keys=None):
self.logger.info('Ignoring sync conflict for %s', key)
continue

self.run_deferred_calls(deferred_calls)

except KeyboardInterrupt:
self.logger.warning('Session interrupted by Keyboard Interrupt. Aborting....')
return

self.run_deferred_calls(deferred_calls)
finally:
self.client_1.unlock()
self.client_2.unlock()

def get_sync_states(self, keys=None):
# we store a list of deferred calls to make sure we can handle everything before
Expand Down
10 changes: 10 additions & 0 deletions tests/clients/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def test_equality_with_wrong_type(self):


class TestSyncClient(object):
def test_lock(self):
client = SyncClient()
with pytest.raises(NotImplementedError):
client.lock()

def test_unlock(self):
client = SyncClient()
with pytest.raises(NotImplementedError):
client.unlock()

def test_get_client_name(self):
client = SyncClient()
with pytest.raises(NotImplementedError):
Expand Down
11 changes: 11 additions & 0 deletions tests/clients/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import tempfile

import filelock
import mock
import pytest

Expand Down Expand Up @@ -61,6 +62,16 @@ def test_repr(self):
client = local.LocalSyncClient('/my/test/path')
assert repr(client) == 'LocalSyncClient</my/test/path>'

def test_lock(self, local_client):
local_client2 = local.LocalSyncClient(local_client.path)
local_client.lock(timeout=0.01)
with pytest.raises(filelock.Timeout):
local_client2.lock(timeout=0.01)
local_client.unlock()

local_client.lock(timeout=0.01)
local_client2.unlock()

def test_put_new(self, local_client):
data = b'hi'
local_client.put(
Expand Down

0 comments on commit 3f01f04

Please sign in to comment.