Skip to content

Commit

Permalink
Merge pull request pandas-dev#574 from scriada/add-host-to-VersionedItem
Browse files Browse the repository at this point in the history
add host to VersionedItem (adds feature pandas-dev#595)
  • Loading branch information
dimosped committed Aug 7, 2018
2 parents bdb2868 + b79c082 commit 7b9a31d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -5,6 +5,7 @@
* Feature: #571 Removed the Cython LZ4 code, use the latest python-lz4
* Feature: #557 Threadpool based compression. Speed imrpovement and tuning benchmarks.
* Bugfix: fix tickstore unicode handling, support both unicode and utf-8 arrays
* Feature: #595 add host attribute to VersionedItem.

### 1.67.1 (2018-07-11)
* Bugfix: #579 Fix symbol corruption due to restore_version and append
Expand Down
2 changes: 1 addition & 1 deletion arctic/store/audit.py
Expand Up @@ -92,7 +92,7 @@ def __init__(self, version_store, symbol, user, log, modify_timeseries=None, aud
versions = [x['version'] for x in self._version_store.list_versions(self._symbol, latest_only=True)]
versions.append(0)
self.base_ts = VersionedItem(symbol=self._symbol, library=None,
version=versions[0], metadata=None, data=None)
version=versions[0], metadata=None, data=None, host=None)
except OperationFailure:
#TODO: Current errors in mongo "Incorrect Number of Segments Returned"
# This workaround should be removed once underlying problem is resolved.
Expand Down
19 changes: 13 additions & 6 deletions arctic/store/version_store.py
Expand Up @@ -390,7 +390,8 @@ def _do_read(self, symbol, version, from_version=None, **kwargs):
handler = self._read_handler(version, symbol)
data = handler.read(self._arctic_lib, version, symbol, from_version=from_version, **kwargs)
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=data)
metadata=version.pop('metadata', None), data=data,
host=self._arctic_lib.arctic.mongo_host)
_do_read_retry = mongo_retry(_do_read)

@mongo_retry
Expand All @@ -416,7 +417,8 @@ def read_metadata(self, symbol, as_of=None, allow_secondary=None):
"""
_version = self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary))
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=_version['version'],
metadata=_version.pop('metadata', None), data=None)
metadata=_version.pop('metadata', None), data=None,
host=self._arctic_lib.arctic.mongo_host)

def _read_metadata(self, symbol, as_of=None, read_preference=None):
if read_preference is None:
Expand Down Expand Up @@ -496,7 +498,8 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser

if len(data) == 0 and previous_version is not None:
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version['version'],
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None,
host=self._arctic_lib.arctic.mongo_host)

if upsert and previous_version is None:
return self.write(symbol=symbol, data=data, prune_previous_version=prune_previous_version, metadata=metadata)
Expand Down Expand Up @@ -549,7 +552,8 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
self._insert_version(version)

return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None,
host=self._arctic_lib.arctic.mongo_host)

def _publish_change(self, symbol, version):
if self._publish_changes:
Expand Down Expand Up @@ -605,7 +609,8 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
logger.debug('Finished writing versions for %s', symbol)

return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None,
host=self._arctic_lib.arctic.mongo_host)

def _add_new_version_using_reference(self, symbol, new_version, reference_version, prune_previous_version):
# Attention: better not use this method following an append.
Expand Down Expand Up @@ -650,7 +655,8 @@ def _add_new_version_using_reference(self, symbol, new_version, reference_versio
self._publish_change(symbol, new_version)

return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=new_version['version'],
metadata=new_version.get('metadata'), data=None)
metadata=new_version.get('metadata'), data=None,
host=self._arctic_lib.arctic.mongo_host)

@mongo_retry
def write_metadata(self, symbol, metadata, prune_previous_version=True, **kwargs):
Expand Down Expand Up @@ -738,6 +744,7 @@ def restore_version(self, symbol, as_of, prune_previous_version=True):
if self._last_version_seqnum(symbol) == version_to_restore['version']:
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(),
version=version_to_restore['version'],
host=self._arctic_lib.arctic.mongo_host,
metadata=version_to_restore.pop('metadata', None), data=None)

# Read the existing data from as_of
Expand Down
6 changes: 3 additions & 3 deletions arctic/store/versioned_item.py
@@ -1,7 +1,7 @@
from collections import namedtuple


class VersionedItem(namedtuple('VersionedItem', ['symbol', 'library', 'data', 'version', 'metadata'])):
class VersionedItem(namedtuple('VersionedItem', ['symbol', 'library', 'data', 'version', 'metadata', 'host'])):
"""
Class representing a Versioned object in VersionStore.
"""
Expand All @@ -12,8 +12,8 @@ def __repr__(self):
return str(self)

def __str__(self):
return "VersionedItem(symbol=%s,library=%s,data=%s,version=%s,metadata=%s" % \
(self.symbol, self.library, type(self.data), self.version, self.metadata)
return "VersionedItem(symbol=%s,library=%s,data=%s,version=%s,metadata=%s,host=%s)" % \
(self.symbol, self.library, type(self.data), self.version, self.metadata, self.host)


ChangedItem = namedtuple('ChangedItem', ['symbol', 'orig_version', 'new_version', 'changes'])
9 changes: 6 additions & 3 deletions tests/unit/store/test_version_item.py
Expand Up @@ -8,10 +8,11 @@ def test_versioned_item_str():
library="ONEMINUTE",
data=pd.DataFrame(),
version=1.0,
host='myhost',
metadata={'metadata': 'foo'})

expected = "VersionedItem(symbol=sym,library=ONEMINUTE," + \
"data=<class 'pandas.core.frame.DataFrame'>,version=1.0,metadata={'metadata': 'foo'}"
"data=<class 'pandas.core.frame.DataFrame'>,version=1.0,metadata={'metadata': 'foo'},host=myhost)"
assert str(item) == expected
assert repr(item) == expected

Expand All @@ -21,7 +22,8 @@ def test_versioned_item_str_handles_none():
library=None,
data=None,
version=None,
metadata=None)
metadata=None,
host=None)

assert str(item)

Expand All @@ -31,5 +33,6 @@ def test_versioned_item_metadata_dict():
library="test_lib",
data=None,
version=1.2,
metadata=None)
metadata=None,
host=None)
assert(item.metadata_dict() == {'symbol': 'test', 'library': 'test_lib', 'version': 1.2})
28 changes: 20 additions & 8 deletions tests/unit/store/test_version_store.py
Expand Up @@ -157,7 +157,8 @@ def test_write_check_quota():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version':1})),
_versions=Mock(insert_one=lambda x:None),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._collection.database.connection.nodes = []
vs._write_handler.return_value = write_handler
Expand Down Expand Up @@ -327,7 +328,8 @@ def _create_mock_versionstore():
vs._add_new_version_using_reference.side_effect = lambda *args: VersionStore._add_new_version_using_reference(vs, *args)
vs._last_version_seqnum = lambda version: VersionStore._last_version_seqnum(vs, version)
vs.write.return_value = VersionedItem(symbol=TEST_SYMBOL, library=vs._arctic_lib.get_name(),
version=TPL_VERSION['version'] + 1, metadata=META_TO_WRITE, data=None)
version=TPL_VERSION['version'] + 1, metadata=META_TO_WRITE, data=None,
host=vs._arctic_lib.arctic.mongo_host)
return vs


Expand All @@ -351,6 +353,7 @@ def test_write_metadata_with_previous_data():

expected_ret_val = VersionedItem(symbol=TEST_SYMBOL,
library=vs._arctic_lib.get_name(),
host=vs._arctic_lib.arctic.mongo_host,
version=TPL_VERSION['version'] + 1,
metadata=META_TO_WRITE,
data=None)
Expand All @@ -376,6 +379,7 @@ def test_write_empty_metadata():

expected_ret_val = VersionedItem(symbol=TEST_SYMBOL,
library=vs._arctic_lib.get_name(),
host=vs._arctic_lib.arctic.mongo_host,
version=TPL_VERSION['version'] + 1,
metadata=None,
data=None)
Expand Down Expand Up @@ -416,9 +420,11 @@ def test_restore_version():

LASTEST_VERSION = dict(TPL_VERSION, version=TPL_VERSION['version']+1, metadata={'something': 'different'})
last_item = VersionedItem(symbol=TEST_SYMBOL, library=vs._arctic_lib.get_name(),
host=vs._arctic_lib.arctic.mongo_host,
version=LASTEST_VERSION, metadata=LASTEST_VERSION['metadata'], data="hello world")
new_version = dict(LASTEST_VERSION, version=LASTEST_VERSION['version'] + 1)
new_item = VersionedItem(symbol=TEST_SYMBOL, library=vs._arctic_lib.get_name(),
host=vs._arctic_lib.arctic.mongo_host,
version=new_version, metadata=new_version['metadata'], data=last_item.data)

vs.write.return_value = new_item
Expand Down Expand Up @@ -479,7 +485,8 @@ def test_write_error_clean_retry():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._collection.database.connection.nodes = []
Expand All @@ -498,7 +505,8 @@ def test_write_insert_version_duplicatekey():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._versions.insert_one.side_effect = [DuplicateKeyError("dup key error"), None]
Expand All @@ -518,7 +526,8 @@ def test_write_insert_version_operror():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._versions.insert_one.side_effect = [OperationFailure("mongo op error"), None]
Expand All @@ -541,7 +550,8 @@ def test_append_error_clean_retry():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._collection.database.connection.nodes = []
Expand All @@ -562,7 +572,8 @@ def test_append_insert_version_duplicatekey():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._versions.insert_one.side_effect = [DuplicateKeyError("dup key error"), None]
Expand All @@ -583,7 +594,8 @@ def test_append_insert_version_operror():
_collection=Mock(),
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
_arctic_lib=create_autospec(ArcticLibraryBinding),
_arctic_lib=create_autospec(ArcticLibraryBinding,
arctic=create_autospec(Arctic, mongo_host='some_host')),
_publish_changes=False)
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._versions.insert_one.side_effect = [OperationFailure("mongo op error"), None]
Expand Down

0 comments on commit 7b9a31d

Please sign in to comment.