diff --git a/CHANGES.md b/CHANGES.md index 25dc50e2f81e5..e98f13d01d68b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ ## Changelog +### 1.27 + + * Bugfix: #187 Compatibility with latest version of pytest-dbfixtures + * Feature: #182 Improve ChunkStore read/write performance + ### 1.26 (2016-07-20) * Bugfix: Faster TickStore querying for multiple symbols simultaneously diff --git a/arctic/chunkstore/_chunker.py b/arctic/chunkstore/_chunker.py index a6f05207355e3..24d5f0c89bb88 100644 --- a/arctic/chunkstore/_chunker.py +++ b/arctic/chunkstore/_chunker.py @@ -1,3 +1,7 @@ +START = 's' +END = 'e' + + class Chunker(object): def to_chunks(self, data, *args, **kwargs): @@ -60,3 +64,15 @@ def exclude(self, data, range_obj): data, filtered by range_obj """ raise NotImplementedError + + def chunk_to_str(self, chunk_id): + """ + Converts parts of a chunk range (start or end) to a string. These + chunk ids/indexes/markers are produced by to_chunks. + (See to_chunks) + + returns + ------- + string + """ + raise NotImplementedError diff --git a/arctic/chunkstore/chunkstore.py b/arctic/chunkstore/chunkstore.py index 5c01063d276b0..7c450ed7535d5 100644 --- a/arctic/chunkstore/chunkstore.py +++ b/arctic/chunkstore/chunkstore.py @@ -1,55 +1,56 @@ import logging import pymongo -import numpy as np -import ast +import hashlib from bson.binary import Binary -from pandas import Series, DataFrame, concat +from pandas import concat, DataFrame, Series -from ..store._version_store_utils import checksum from ..decorators import mongo_retry from .._util import indent -from ..serialization.pandas_serializer import (serialize, deserialize, - DataFrameSerializer, - SeriesSerializer, - PandasSerializer) -from .date_chunker import DateChunker -from ..exceptions import UnhandledDtypeException, NoDataFoundException -from .._compression import compress_array, decompress +from ..serialization.numpy_arrays import FrametoArraySerializer, DATA, VALUES, COLUMNS, TYPE + +from .date_chunker import DateChunker, START, END +from ..exceptions import NoDataFoundException logger = logging.getLogger(__name__) CHUNK_STORE_TYPE = 'ChunkStoreV1' +SYMBOL = 'sy' +SHA = 'sh' +CHUNK_SIZE = 'cs' +CHUNK_COUNT = 'cc' +APPEND_COUNT = 'ac' +ROWS = 'r' -class ChunkStore(object): - STRING_MAX = 16 +class ChunkStore(object): @classmethod def initialize_library(cls, arctic_lib, **kwargs): ChunkStore(arctic_lib)._ensure_index() @mongo_retry def _ensure_index(self): - self._symbols.create_index([("symbol", pymongo.ASCENDING)], + self._symbols.create_index([(SYMBOL, pymongo.ASCENDING)], unique=True, background=True) - self._collection.create_index([('symbol', pymongo.HASHED)], + self._collection.create_index([(SYMBOL, pymongo.HASHED)], background=True) - self._collection.create_index([('symbol', pymongo.ASCENDING), - ('sha', pymongo.ASCENDING)], + self._collection.create_index([(SYMBOL, pymongo.ASCENDING), + (SHA, pymongo.ASCENDING)], unique=True, background=True) - self._collection.create_index([('symbol', pymongo.ASCENDING), - ('start', pymongo.ASCENDING), - ('end', pymongo.ASCENDING)], + self._collection.create_index([(SYMBOL, pymongo.ASCENDING), + (START, pymongo.ASCENDING), + (END, pymongo.ASCENDING)], unique=True, background=True) @mongo_retry - def __init__(self, arctic_lib, chunker=DateChunker()): + def __init__(self, arctic_lib, chunker=DateChunker(), serializer=FrametoArraySerializer()): self.chunker = chunker + self.serializer = serializer self._arctic_lib = arctic_lib # Do we allow reading from secondaries @@ -72,14 +73,28 @@ def __str__(self): def __repr__(self): return str(self) + def _checksum(self, symbol, doc): + """ + Checksum the passed in dictionary + """ + sha = hashlib.sha1() + sha.update(symbol.encode('ascii')) + sha.update(self.chunker.chunk_to_str(doc[START]).encode('ascii')) + sha.update(self.chunker.chunk_to_str(doc[END]).encode('ascii')) + for k in doc[DATA][COLUMNS]: + sha.update(doc[DATA][DATA][k][VALUES]) + return Binary(sha.digest()) + def delete(self, symbol, chunk_range=None): """ Delete all chunks for a symbol, or optionally, chunks within a range Parameters ---------- - symbol : `str` + symbol : str symbol name for the item + chunk_range: range object + a date range to delete """ if chunk_range: # read out chunks that fall within the range and filter out @@ -88,13 +103,13 @@ def delete(self, symbol, chunk_range=None): df = self.chunker.exclude(df, chunk_range) # remove chunks, and update any remaining data - query = {'symbol': symbol} + query = {SYMBOL: symbol} query.update(self.chunker.to_mongo(chunk_range)) self._collection.delete_many(query) self.update(symbol, df) else: - query = {"symbol": symbol} + query = {SYMBOL: symbol} self._collection.delete_many(query) self._collection.symbols.delete_many(query) @@ -106,12 +121,12 @@ def list_symbols(self): ------- list of str """ - return self._symbols.distinct("symbol") + return self._symbols.distinct(SYMBOL) def _get_symbol_info(self, symbol): - return self._symbols.find_one({'symbol': symbol}) + return self._symbols.find_one({SYMBOL: symbol}) - def read(self, symbol, chunk_range=None, filter_data=True): + def read(self, symbol, chunk_range=None, columns=None, filter_data=True): """ Reads data for a given symbol from the database. @@ -122,35 +137,33 @@ def read(self, symbol, chunk_range=None, filter_data=True): chunk_range: object corresponding range object for the specified chunker (for DateChunker it is a DateRange object) + columns: list of str + subset of columns to read back (index will always be included, if + one exists) filter: boolean - perform chunk level filtering on the data (see filter() in _chunker) + perform chunk level filtering on the data (see filter in _chunker) only applicable when chunk_range is specified Returns ------- - A dataframe or series + DataFrame or Series """ sym = self._get_symbol_info(symbol) if not sym: raise NoDataFoundException('No data found for %s' % (symbol)) - spec = {'symbol': symbol, + spec = {SYMBOL: symbol, } if chunk_range: spec.update(self.chunker.to_mongo(chunk_range)) segments = [] - for _, x in enumerate(self._collection.find(spec, sort=[('start', pymongo.ASCENDING)],)): - segments.append(decompress(x['data'])) - - data = b''.join(segments) + for _, x in enumerate(self._collection.find(spec, sort=[(START, pymongo.ASCENDING)],)): + segments.append(x[DATA]) - dtype = PandasSerializer._dtype(sym['dtype'], sym.get('dtype_metadata', {})) - records = np.fromstring(data, dtype=dtype).reshape(sym.get('shape', (-1))) - - data = deserialize(records, sym['type']) + data = self.serializer.deserialize(segments, columns) if not filter_data or chunk_range is None: return data @@ -164,107 +177,87 @@ def write(self, symbol, item, chunk_size): ---------- symbol: str the symbol that will be used to reference the written data - item: dataframe or series + item: Dataframe or Series the data to write the database chunk_size: ? A chunk size that is understood by the specified chunker """ + if not isinstance(item, (DataFrame, Series)): + raise Exception("Can only chunk DataFrames and Series") + previous_shas = [] doc = {} - doc['symbol'] = symbol - doc['chunk_size'] = chunk_size - - if isinstance(item, Series): - doc['type'] = SeriesSerializer.TYPE - elif isinstance(item, DataFrame): - doc['type'] = DataFrameSerializer.TYPE - else: - raise Exception("Can only chunk Series and DataFrames") - previous_shas = [] + doc[SYMBOL] = symbol + doc[CHUNK_SIZE] = chunk_size + doc[ROWS] = len(item) + doc[TYPE] = 'dataframe' if isinstance(item, DataFrame) else 'series' + sym = self._get_symbol_info(symbol) if sym: - previous_shas = set([Binary(x['sha']) for x in self._collection.find({'symbol': symbol}, - projection={'sha': True, '_id': False}, + previous_shas = set([Binary(x[SHA]) for x in self._collection.find({SYMBOL: symbol}, + projection={SHA: True, '_id': False}, )]) - records = [] - ranges = [] - dtype = None - - for start, end, record in self.chunker.to_chunks(item, chunk_size): - r, dtype = serialize(record, string_max_len=self.STRING_MAX) - # if symbol exists, dtypes better match - if sym and str(dtype) != sym['dtype']: - raise Exception('Dtype mismatch - cannot write chunk') - records.append(r) - ranges.append((start, end)) - - item = np.array([r for record in records for r in record]).flatten() - for record in records: - if record.dtype.hasobject: - raise UnhandledDtypeException() - - doc['dtype'] = str(dtype) - doc['shape'] = (-1,) + item.shape[1:] - doc['dtype_metadata'] = dict(dtype.metadata or {}) - doc['len'] = len(item) - - chunks = [r.tostring() for r in records] - chunks = compress_array(chunks) op = False bulk = self._collection.initialize_unordered_bulk_op() - for chunk, rng in zip(chunks, ranges): - start = rng[0] - end = rng[1] - chunk = {'data': Binary(chunk)} - chunk['start'] = start - chunk['end'] = end - chunk['symbol'] = symbol - chunk['sha'] = checksum(symbol, chunk) - - if chunk['sha'] not in previous_shas: + chunk_count = 0 + + for start, end, record in self.chunker.to_chunks(item, chunk_size): + chunk_count += 1 + data = self.serializer.serialize(record) + doc[COLUMNS] = data[COLUMNS] + + chunk = {DATA: data} + chunk[START] = start + chunk[END] = end + chunk[SYMBOL] = symbol + chunk[SHA] = self._checksum(symbol, chunk) + + if chunk[SHA] not in previous_shas: op = True - bulk.find({'symbol': symbol, 'start': start, 'end': end}, + bulk.find({SYMBOL: symbol, START: start, END: end}, ).upsert().update_one({'$set': chunk}) else: # already exists, dont need to update in mongo - previous_shas.remove(chunk['sha']) + previous_shas.remove(chunk[SHA]) if op: bulk.execute() - doc['chunk_count'] = len(chunks) - doc['append_size'] = 0 - doc['append_count'] = 0 + doc[CHUNK_COUNT] = chunk_count + doc[APPEND_COUNT] = 0 if previous_shas: - mongo_retry(self._collection.delete_many)({'symbol': symbol, 'sha': {'$in': list(previous_shas)}}) + mongo_retry(self._collection.delete_many)({SYMBOL: symbol, SHA: {'$in': list(previous_shas)}}) - mongo_retry(self._symbols.update_one)({'symbol': symbol}, + mongo_retry(self._symbols.update_one)({SYMBOL: symbol}, {'$set': doc}, upsert=True) def __concat(self, a, b): - return concat([a, b]).sort() + return concat([a, b]).sort_index() - def __combine(self, a, b): - return a.combine_first(b) + def __take_new(self, a, b): + return a def __update(self, symbol, item, combine_method=None): + if not isinstance(item, (DataFrame, Series)): + raise Exception("Can only chunk DataFrames and Series") + sym = self._get_symbol_info(symbol) if not sym: raise NoDataFoundException("Symbol does not exist.") + + if sym[TYPE] == 'series' and not isinstance(item, Series): + raise Exception("Cannot combine Series and DataFrame") + if sym[TYPE] == 'dataframe' and not isinstance(item, DataFrame): + raise Exception("Cannot combine DataFrame and Series") - if isinstance(item, Series) and sym['type'] == 'df': - raise Exception("Symbol types do not match") - if isinstance(item, DataFrame) and sym['type'] == 'series': - raise Exception("Symbol types do not match") - records = [] - ranges = [] - new_chunks = [] - for start, end, record in self.chunker.to_chunks(item, sym['chunk_size']): + bulk = self._collection.initialize_unordered_bulk_op() + op = False + for start, end, record in self.chunker.to_chunks(item, sym[CHUNK_SIZE]): # read out matching chunks df = self.read(symbol, chunk_range=self.chunker.to_range(start, end), filter_data=False) # assuming they exist, update them and store the original chunk @@ -274,56 +267,35 @@ def __update(self, symbol, item, combine_method=None): if record is None or record.equals(df): continue - new_chunks.append(False) - sym['append_count'] += len(record) - sym['len'] -= len(df) + sym[APPEND_COUNT] += len(record) + sym[ROWS] += len(record) - len(df) + new_chunk = False + else: + new_chunk = True + sym[CHUNK_COUNT] += 1 + sym[ROWS] += len(record) + + data = self.serializer.serialize(record) + op = True + + segment = {DATA: data} + segment[TYPE] = 'dataframe' if isinstance(record, DataFrame) else 'series' + segment[START] = start + segment[END] = end + sha = self._checksum(symbol, segment) + segment[SHA] = sha + if new_chunk: + # new chunk + bulk.find({SYMBOL: symbol, SHA: sha} + ).upsert().update_one({'$set': segment}) else: - new_chunks.append(True) - sym['chunk_count'] += 1 - - r, dtype = serialize(record, string_max_len=self.STRING_MAX) - if str(dtype) != sym['dtype']: - raise Exception('Dtype mismatch.') - records.append(r) - ranges.append((start, end)) - - if len(records) > 0: - item = np.array([r for record in records for r in record]).flatten() - - if sym.get('shape', [-1]) != [-1, ] + list(item.shape)[1:]: - raise UnhandledDtypeException() - - item = item.astype(dtype) - - data = item.tostring() - sym['len'] += len(item) - if len(item) > 0: - sym['append_size'] += len(data) - - chunks = [r.tostring() for r in records] - chunks = compress_array(chunks) - - bulk = self._collection.initialize_unordered_bulk_op() - for chunk, rng, new_chunk in zip(chunks, ranges, new_chunks): - start = rng[0] - end = rng[1] - - segment = {'data': Binary(chunk)} - segment['start'] = start - segment['end'] = end - sha = checksum(symbol, segment) - segment['sha'] = sha - if new_chunk: - # new chunk - bulk.find({'symbol': symbol, 'sha': sha} - ).upsert().update_one({'$set': segment}) - else: - bulk.find({'symbol': symbol, 'start': start, 'end': end} - ).update_one({'$set': segment}) - if len(chunks) > 0: - bulk.execute() - - self._symbols.replace_one({'symbol': symbol}, sym) + bulk.find({SYMBOL: symbol, START: start, END: end} + ).update_one({'$set': segment}) + + if op: + bulk.execute() + + self._symbols.replace_one({SYMBOL: symbol}, sym) def append(self, symbol, item): """ @@ -335,16 +307,14 @@ def append(self, symbol, item): ---------- symbol: str the symbol for the given item in the DB - item: + item: DataFrame or Series the data to append """ self.__update(symbol, item, combine_method=self.__concat) def update(self, symbol, item): """ - Merges data from item onto existing data in the database for symbol - data that exists in symbol and item for the same index/multiindex will - be overwritten by the data in item. + Overwrites data in DB with data in item for the given symbol. Is idempotent @@ -352,22 +322,16 @@ def update(self, symbol, item): ---------- symbol: str the symbol for the given item in the DB - item: + item: DataFrame or Series the data to update """ - self.__update(symbol, item, combine_method=self.__combine) + self.__update(symbol, item, combine_method=self.__take_new) def get_info(self, symbol): sym = self._get_symbol_info(symbol) ret = {} - dtype = PandasSerializer._dtype(sym['dtype'], sym['dtype_metadata']) - length = sym['len'] - ret['size'] = dtype.itemsize * length - ret['chunk_count'] = sym['chunk_count'] - ret['dtype'] = sym['dtype'] - ret['type'] = sym['type'] - ret['rows'] = length - ret['col_names'] = sym['dtype_metadata'] - ret['dtype'] = ast.literal_eval(sym['dtype']) + ret['chunk_count'] = sym[CHUNK_COUNT] + ret['rows'] = sym[ROWS] + ret['col_names'] = sym[COLUMNS] return ret diff --git a/arctic/chunkstore/date_chunker.py b/arctic/chunkstore/date_chunker.py index c5e5e386cc13b..298235d19d6d9 100644 --- a/arctic/chunkstore/date_chunker.py +++ b/arctic/chunkstore/date_chunker.py @@ -1,93 +1,106 @@ -import calendar import pandas as pd -from pandas import Timestamp -from datetime import datetime as dt -from ._chunker import Chunker +from ._chunker import Chunker, START, END from ..date import DateRange class DateChunker(Chunker): - def _get_date_chunk(self, date, chunk_size): - ''' - format date appropriately for the chunk size - - returns - ------- - Formatted date string - ''' - if chunk_size == 'Y': - return date.strftime('%Y') - elif chunk_size == 'M': - return date.strftime('%Y-%m') - elif chunk_size == 'D': - return date.strftime('%Y-%m-%d') - - def _get_date_range(self, df, chunk_size): + def to_chunks(self, df, chunk_size): """ - get minx/max dates for the chunk + chunks the dataframe/series by dates returns ------- - A tuple (start date, end date) + generator that produces tuples: (start date, end date, + dataframe/series) """ - date = df.index.get_level_values('date')[0] - - if isinstance(date, Timestamp): - date = date.to_pydatetime() + if chunk_size not in ('D', 'M', 'Y'): + raise Exception("Chunk size must be one of D, M, Y") - if chunk_size == 'M': - _, end_day = calendar.monthrange(date.year, date.month) - return dt(date.year, date.month, 1), dt(date.year, date.month, end_day) - elif chunk_size == 'Y': - return dt(date.year, 1, 1), dt(date.year, 12, 31) + if 'date' in df.index.names: + dates = df.index.get_level_values('date') + elif 'date' in df.columns: + dates = pd.DatetimeIndex(df.date) else: - return date, date + raise Exception("Data must be datetime indexed or have a column named 'date'") - def to_chunks(self, df, chunk_size): + for period, g in df.groupby(dates.to_period(chunk_size)): + start, end = period.start_time.to_pydatetime(warn=False), period.end_time.to_pydatetime(warn=False) + yield start, end, g + + def to_range(self, start, end): """ - chunks the dataframe/series by dates + takes start, end from to_chunks and returns a "range" that can be used + as the argument to methods require a chunk_range returns ------- - generator that produces tuples: (start dt, end dt, dataframe/series) + A range object (dependent on type of chunker) """ - if chunk_size not in ('D', 'M', 'Y'): - raise Exception("Chunk size must be one of D, M, Y") - - if 'date' not in df.index.names: - raise Exception("Data must be datetime indexed and have an index column named 'date'") - - dates = [pd.to_datetime(d) for d in df.index.get_level_values('date').drop_duplicates()] - key_array = [self._get_date_chunk(d, chunk_size) for d in dates] + return DateRange(start, end) - for date in set(key_array): - if df.index.nlevels > 1: - ''' - can't slice with partial date on multi-index. Support coming in - pandas 0.18.1 - ''' - ret = df.xs(slice(date, date), level='date', drop_level=False) - else: - ret = df[date: date] - start, end = self._get_date_range(ret, chunk_size) - yield start, end, ret + def chunk_to_str(self, chunk_id): + """ + Converts parts of a chunk range (start or end) to a string. These + chunk ids/indexes/markers are produced by to_chunks. + (See to_chunks) - def to_range(self, start, end): - return DateRange(start, end) + returns + ------- + string + """ + return chunk_id.strftime("%Y-%m-%d") def to_mongo(self, range_obj): + """ + takes the range object used for this chunker type + and converts it into a string that can be use for a + mongo query that filters by the range + + returns + ------- + string + """ if range_obj.start and range_obj.end: - return {'$and': [{'start': {'$lte': range_obj.end}}, {'end': {'$gte': range_obj.start}}]} + return {'$and': [{START: {'$lte': range_obj.end}}, {END: {'$gte': range_obj.start}}]} elif range_obj.start: - return {'end': {'$gte': range_obj.start}} + return {END: {'$gte': range_obj.start}} elif range_obj.end: - return {'start': {'$lte': range_obj.end}} + return {START: {'$lte': range_obj.end}} else: return {} def filter(self, data, range_obj): - return data[range_obj.start:range_obj.end] + """ + ensures data is properly subset to the range in range_obj. + (Depending on how the chunking is implemented, it might be possible + to specify a chunk range that reads out more than the actual range + eg: date range, chunked monthly. read out 2016-01-01 to 2016-01-02. + This will read ALL of January 2016 but it should be subset to just + the first two days) + + returns + ------- + data, filtered by range_obj + """ + if 'date' in data.index.names: + return data[range_obj.start:range_obj.end] + elif 'date' in data.columns: + return data[(data.date >= range_obj.start) & (data.date <= range_obj.end)] + else: + return data def exclude(self, data, range_obj): - return data[(data.index.get_level_values('date') < range_obj.start) | (data.index.get_level_values('date') > range_obj.end)] + """ + Removes data within the bounds of the range object (inclusive) + + returns + ------- + data, filtered by range_obj + """ + if 'date' in data.index.names: + return data[(data.index.get_level_values('date') < range_obj.start) | (data.index.get_level_values('date') > range_obj.end)] + elif 'date' in data.columns: + return data[(data.date < range_obj.start) | (data.date > range_obj.end)] + else: + return data diff --git a/arctic/fixtures/arctic.py b/arctic/fixtures/arctic.py index 595a60431ebab..f5223d0c46575 100644 --- a/arctic/fixtures/arctic.py +++ b/arctic/fixtures/arctic.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) -mongo_proc2 = mongo_proc(executable="mongod", port="?", +mongo_proc2 = mongo_proc(executable="mongod", port=-1, params='--nojournal ' '--noauth ' '--nohttpinterface ' diff --git a/arctic/fixtures/mongo.py b/arctic/fixtures/mongo.py index 91832955d3630..fd772225922a5 100644 --- a/arctic/fixtures/mongo.py +++ b/arctic/fixtures/mongo.py @@ -27,7 +27,7 @@ from pytest_dbfixtures.utils import get_config, try_import, get_process_fixture -def mongo_proc(executable=None, params=None, host=None, port=None, +def mongo_proc(executable=None, params=None, host=None, port=-1, logs_prefix=''): """ Mongo process factory. @@ -37,7 +37,7 @@ def mongo_proc(executable=None, params=None, host=None, port=None, :param str host: hostname :param str port: exact port (e.g. '8000') or randomly selected port: - '?' - any random available port + -1 - any random available port '2000-3000' - random available port from a given range '4002,4003' - random of 4002 or 4003 ports :param str logs_prefix: prefix for log filename @@ -73,7 +73,7 @@ def mongo_proc_fixture(request): mongo_params = params or config.mongo.params mongo_host = host or config.mongo.host - mongo_port = get_port(port or config.mongo.port) + mongo_port = get_port(port) or get_port(config.mongo.port) logsdir = path(request.config.getvalue('logsdir')) mongo_logpath = logsdir / '{prefix}mongo.{port}.log'.format( diff --git a/arctic/serialization/numpy_arrays.py b/arctic/serialization/numpy_arrays.py new file mode 100644 index 0000000000000..b3bf992e897d9 --- /dev/null +++ b/arctic/serialization/numpy_arrays.py @@ -0,0 +1,186 @@ +import logging +import numpy as np +import numpy.ma as ma +import pandas as pd + +from bson import Binary, SON + +from .._compress import compress, decompress + + +DATA = 'd' +MASK = 'm' +VALUES = 'v' +TYPE = 't' +NAME = 'n' +COLUMNS = 'c' +INDEX = 'i' + + +class NumpyArrayConverter(object): + """ + Converts a Numpy ndarray to and from PyMongo SON representation: + + { + type: ' 0: + a = a.copy() + np.putmask(a, mask, '') + else: + mask = None + + if pd.lib.infer_dtype(a) == 'mixed': + a = np.array([s.encode('ascii') for s in a]) + a = a.astype('O') + + type_ = pd.lib.infer_dtype(a) + if type_ in ['unicode', 'string']: + max_len = pd.lib.max_len_string_array(a) + return a.astype('U{:d}'.format(max_len)), mask + else: + raise ValueError('Cannot store arrays with {} dtype'.format(type_)) + + +class FrameConverter(object): + """ + Converts a Pandas Dataframe to and from PyMongo SON representation: + + { + columns: [col1, col2, col3], + data: { + col1: { , + col2: { , + col3: { , + } + } + """ + + def __init__(self): + self.converter = NumpyArrayConverter() + + def docify(self, df): + """ + Convert a Pandas DataFrame to SON. + + Parameters + ---------- + df: DataFrame + The Pandas DataFrame to encode + """ + doc = SON({DATA: {}}, c=[str(c) for c in df.columns]) + for c in df: + meta = {NAME: str(c)} + try: + doc[DATA][str(c)] = self.converter.docify(df[c].values, meta) + except Exception as e: + typ = pd.lib.infer_dtype(df[c]) + msg = "Column '{}' type is {}".format(str(c), typ) + logging.info(msg) + raise e + return doc + + def objify(self, doc, columns=None): + """ + Decode a Pymongo SON object into an Pandas DataFrame + """ + cols = columns or doc[COLUMNS] + data = {c: self.converter.objify(doc[DATA][c]) for c in cols} + return pd.DataFrame(data, columns=cols)[cols] + + +class FrametoArraySerializer(object): + def __init__(self): + self.converter = FrameConverter() + + def serialize(self, df): + if isinstance(df, pd.Series): + dtype = 'series' + df = df.to_frame() + else: + dtype = 'dataframe' + + if df.index.names != [None]: + index = df.index.names + df = df.reset_index() + ret = self.converter.docify(df) + ret[INDEX] = index + ret[TYPE] = dtype + return ret + ret = self.converter.docify(df) + ret[TYPE] = dtype + return ret + + def deserialize(self, data, columns=None): + if data == []: + return pd.DataFrame() + + if isinstance(data, list): + if columns and INDEX in data[0]: + columns.extend(data[0][INDEX]) + df = pd.concat([self.converter.objify(d, columns) for d in data]) + else: + df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=True) + dtype = data[0][TYPE] + if INDEX in data[0]: + df = df.set_index(data[0][INDEX]) + else: + df = self.converter.objify(data, columns) + dtype = data[TYPE] + if INDEX in data: + df = df.set_index(data[INDEX]) + if dtype == 'series': + return df[df.columns[0]] + return df diff --git a/arctic/serialization/pandas_serializer.py b/arctic/serialization/numpy_records.py similarity index 87% rename from arctic/serialization/pandas_serializer.py rename to arctic/serialization/numpy_records.py index 20fe26aba6416..0b7231409a636 100644 --- a/arctic/serialization/pandas_serializer.py +++ b/arctic/serialization/numpy_records.py @@ -11,24 +11,6 @@ DTN64_DTYPE = 'datetime64[ns]' -def serialize(item, string_max_len=None): - if isinstance(item, Series): - return SeriesSerializer().serialize(item, string_max_len=string_max_len) - if isinstance(item, DataFrame): - return DataFrameSerializer().serialize(item, string_max_len=string_max_len) - else: - raise Exception("Cannot serialize data of type %s" % (type(item))) - - -def deserialize(item, dtype): - if dtype == SeriesSerializer.TYPE: - return SeriesSerializer().deserialize(item) - if dtype == DataFrameSerializer.TYPE: - return DataFrameSerializer().deserialize(item) - else: - raise Exception("Cannot deserialize data of type %s" % (dtype)) - - def _to_primitive(arr, string_max_len=None): if arr.dtype.hasobject: if len(arr) > 0: @@ -41,15 +23,6 @@ def _to_primitive(arr, string_max_len=None): class PandasSerializer(object): - - @staticmethod - def _dtype(string, metadata=None): - if metadata is None: - metadata = {} - if string.startswith('['): - return np.dtype(eval(string), metadata=metadata) - return np.dtype(string, metadata=metadata) - def _index_to_records(self, df): metadata = {} index = df.index diff --git a/arctic/store/_pandas_ndarray_store.py b/arctic/store/_pandas_ndarray_store.py index 9341ab52291d8..01285d9765630 100644 --- a/arctic/store/_pandas_ndarray_store.py +++ b/arctic/store/_pandas_ndarray_store.py @@ -7,7 +7,7 @@ import numpy as np -from arctic.serialization.pandas_serializer import SeriesSerializer, DataFrameSerializer +from arctic.serialization.numpy_records import SeriesSerializer, DataFrameSerializer from .._compression import compress, decompress from ..date._util import to_pandas_closed_closed from ..exceptions import ArcticException diff --git a/setup.py b/setup.py index 73d8aa40aa4cd..55861cfeebb6e 100644 --- a/setup.py +++ b/setup.py @@ -100,7 +100,7 @@ def run_tests(self): "mockextras", "pytest", "pytest-cov", - "pytest-dbfixtures", + "pytest-dbfixtures>=0.15.0", "pytest-timeout", "pytest-xdist", ], diff --git a/tests/integration/chunkstore/test_chunkstore.py b/tests/integration/chunkstore/test_chunkstore.py index fa4a24c7320d8..f41e93525a0b1 100644 --- a/tests/integration/chunkstore/test_chunkstore.py +++ b/tests/integration/chunkstore/test_chunkstore.py @@ -21,6 +21,18 @@ def test_write_dataframe(chunkstore_lib): assert_frame_equal(df, read_df) +def test_write_dataframe_noindex(chunkstore_lib): + df = DataFrame(data={'data': [1, 2, 3], + 'date': [dt(2016, 1, 1), + dt(2016, 1, 2), + dt(2016, 1, 3)] + } + ) + chunkstore_lib.write('test_df', df, 'D') + read_df = chunkstore_lib.read('test_df') + assert_frame_equal(df, read_df) + + def test_overwrite_dataframe(chunkstore_lib): df = DataFrame(data={'data': [1, 2, 3, 4]}, index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1), @@ -42,6 +54,25 @@ def test_overwrite_dataframe(chunkstore_lib): assert_frame_equal(dg, read_df) +def test_overwrite_dataframe_noindex(chunkstore_lib): + df = DataFrame(data={'data': [1, 2, 3, 4], + 'date': [dt(2016, 1, 1), + dt(2016, 1, 2), + dt(2016, 1, 3), + dt(2016, 1, 4)]}) + + df2 = DataFrame(data={'data': [5, 6, 7, 8], + 'date': [dt(2016, 1, 1), + dt(2016, 1, 2), + dt(2016, 1, 3), + dt(2016, 1, 4)]}) + + chunkstore_lib.write('test_df', df, 'D') + chunkstore_lib.write('test_df', df2, 'D') + read_df = chunkstore_lib.read('test_df') + assert_frame_equal(df2, read_df) + + def test_overwrite_dataframe_monthly(chunkstore_lib): df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6]}, index=MultiIndex.from_tuples([(dt(2016, 1, 5), 1), @@ -68,25 +99,6 @@ def test_overwrite_dataframe_monthly(chunkstore_lib): assert_frame_equal(dg, read_df) -def test_overwrite_series(chunkstore_lib): - s = pd.Series([1], index=pd.date_range('2016-01-01', - '2016-01-01', - name='date'), - name='vals') - - chunkstore_lib.write('test', s, 'D') - chunkstore_lib.write('test', s + 1, 'D') - assert_series_equal(chunkstore_lib.read('test'), s + 1) - - -def test_overwrite_series_monthly(chunkstore_lib): - s = pd.Series([1, 2], index=pd.Index(data=[dt(2016, 1, 1), dt(2016, 2, 1)], name='date'), name='vals') - - chunkstore_lib.write('test', s, 'M') - chunkstore_lib.write('test', s + 1, 'M') - assert_series_equal(chunkstore_lib.read('test'), s + 1) - - def test_write_read_with_daterange(chunkstore_lib): df = DataFrame(data={'data': [1, 2, 3]}, index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1), @@ -94,10 +106,32 @@ def test_write_read_with_daterange(chunkstore_lib): (dt(2016, 1, 3), 1)], names=['date', 'id']) ) + + dg = DataFrame(data={'data': [1, 2]}, + index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1), + (dt(2016, 1, 2), 1)], + names=['date', 'id']) + ) + chunkstore_lib.write('test_df', df, 'D') read_df = chunkstore_lib.read('test_df', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 2))) - assert(len(read_df.index.get_level_values('date')) == 2) + assert_frame_equal(read_df, dg) + + +def test_write_read_with_daterange_noindex(chunkstore_lib): + df = DataFrame(data={'data': [1, 2, 3], + 'date': [dt(2016, 1, 1), + dt(2016, 1, 2), + dt(2016, 1, 3)]}) + + dg = DataFrame(data={'data': [1, 2], + 'date': [dt(2016, 1, 1), + dt(2016, 1, 2)]}) + + chunkstore_lib.write('test_df', df, 'D') + read_df = chunkstore_lib.read('test_df', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 2))) + assert_frame_equal(read_df, dg) def test_store_single_index_df(chunkstore_lib): df = DataFrame(data=[1, 2, 3], @@ -151,18 +185,6 @@ def test_open_closed(chunkstore_lib): assert_frame_equal(df, ret) -def test_pandas_datetime_index_store_series(chunkstore_lib): - df = Series(data=[1, 2, 3], - index=Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3)], - name='date'), - name='data') - chunkstore_lib.write('chunkstore_test', df, chunk_size='D') - s = chunkstore_lib.read('chunkstore_test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 3))) - assert_series_equal(s, df) - - def test_monthly_df(chunkstore_lib): df = DataFrame(data=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], index=Index(data=[dt(2016, 1, 1), @@ -197,19 +219,6 @@ def test_yearly_df(chunkstore_lib): assert_frame_equal(df, ret) -def test_yearly_series(chunkstore_lib): - df = Series(data=[1, 2, 3], - index=Index(data=[dt(2016, 1, 1), - dt(2016, 2, 1), - dt(2016, 3, 3)], - name='date'), - name='data') - - chunkstore_lib.write('chunkstore_test', df, chunk_size='Y') - ret = chunkstore_lib.read('chunkstore_test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 3, 3))) - assert_series_equal(df, ret) - - def test_append_daily(chunkstore_lib): df = DataFrame(data=[1, 2, 3], index=Index(data=[dt(2016, 1, 1), @@ -290,34 +299,6 @@ def test_append_existing_chunk(chunkstore_lib): assert_frame_equal(ret, pd.concat([df, df2])) -def test_append_exceptions(chunkstore_lib): - df = DataFrame(data=[1, 2, 3], - index=Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3)], - name='date'), - columns=['data']) - s = Series(data=[1, 2, 3], - index=Index(data=[dt(2016, 1, 1), - dt(2016, 2, 1), - dt(2016, 3, 3)], - name='date'), - name='data') - - chunkstore_lib.write('df', df, chunk_size='D') - chunkstore_lib.write('s', s, chunk_size='D') - - with pytest.raises(Exception) as e: - chunkstore_lib.append('df', s) - - assert("Symbol types do not match" in str(e)) - - with pytest.raises(Exception) as e: - chunkstore_lib.append('s', df) - - assert("Symbol types do not match" in str(e)) - - def test_store_objects_df(chunkstore_lib): df = DataFrame(data=['1', '2', '3'], index=Index(data=[dt(2016, 1, 1), @@ -331,19 +312,6 @@ def test_store_objects_df(chunkstore_lib): assert_frame_equal(df, ret) -def test_store_objects_series(chunkstore_lib): - df = Series(data=['1', '2', '3'], - index=Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3)], - name='date'), - name='data') - - chunkstore_lib.write('chunkstore_test', df, chunk_size='D') - ret = chunkstore_lib.read('chunkstore_test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 3))) - assert_series_equal(df, ret) - - def test_empty_range(chunkstore_lib): df = DataFrame(data={'data': [1], 'values': [10]}, index=Index(data=[dt(2016, 1, 1)], name='date')) @@ -459,43 +427,6 @@ def test_update_same_df(chunkstore_lib): assert(sym == chunkstore_lib._get_symbol_info('chunkstore_test')) -def test_update_series(chunkstore_lib): - df = Series(data=[1, 2, 3], - index=pd.Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3)], name='date'), - name='data') - df2 = Series(data=[20, 30, 40], - index=pd.Index(data=[dt(2016, 1, 2), - dt(2016, 1, 3), - dt(2016, 1, 4)], name='date'), - name='data') - - equals = Series(data=[1, 20, 30, 40], - index=pd.Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3), - dt(2016, 1, 4)], name='date'), - name='data') - - chunkstore_lib.write('chunkstore_test', df, chunk_size='D') - chunkstore_lib.update('chunkstore_test', df2) - assert_series_equal(chunkstore_lib.read('chunkstore_test') , equals) - - -def test_update_same_series(chunkstore_lib): - df = Series(data=[1, 2, 3], - index=pd.Index(data=[dt(2016, 1, 1), - dt(2016, 1, 2), - dt(2016, 1, 3)], name='date'), - name='data') - chunkstore_lib.write('chunkstore_test', df, chunk_size='D') - - sym = chunkstore_lib._get_symbol_info('chunkstore_test') - chunkstore_lib.update('chunkstore_test', df) - assert(sym == chunkstore_lib._get_symbol_info('chunkstore_test')) - - def test_df_with_multiindex(chunkstore_lib): df = DataFrame(data=[1, 2, 3], index=MultiIndex.from_tuples([(dt(2016, 1, 1), 2), @@ -545,7 +476,7 @@ def gen_daily_data(month, days, securities): closep = [round(x + random.uniform(-5.0, 5.0), 1) for x in openp] index_list = [(dt(2016, month, day), s) for s in securities] - yield DataFrame(data={'open': openp, 'close':closep}, + yield DataFrame(data={'open': openp, 'close': closep}, index=MultiIndex.from_tuples(index_list, names=['date', 'security'])) @@ -581,18 +512,15 @@ def helper(chunkstore_lib, name, chunk_size): read_info = chunkstore_lib.read(name) assert_frame_equal(written_df, read_info) - df = write_random_data(chunkstore_lib, name, 1, [1], list(range(1, 11)), update=True, chunk_size=chunk_size) - - read_info = chunkstore_lib.read(name, chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 1, 30))) - assert_frame_equal(written_df['2016-01-02':], read_info['2016-01-02':]) + df = write_random_data(chunkstore_lib, name, 1, list(range(1, 31)), list(range(1, 101)), chunk_size=chunk_size) - read_info = chunkstore_lib.read(name, chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 1))) - written_df = df.combine_first(written_df) - assert_frame_equal(written_df[:'2016-01-01'], read_info [:'2016-01-01']) + read_info = chunkstore_lib.read(name) + assert_frame_equal(df, read_info) + r = read_info df = write_random_data(chunkstore_lib, name, 2, list(range(1, 29)), list(range(1, 501)), append=True, chunk_size=chunk_size) read_info = chunkstore_lib.read(name) - assert_frame_equal(pd.concat([written_df, df]), read_info) + assert_frame_equal(pd.concat([r, df]), read_info) for chunk_size in ['D', 'M', 'Y']: helper(chunkstore_lib, 'test_data_' + chunk_size, chunk_size) @@ -602,7 +530,8 @@ def test_multiple_actions_monthly_data(chunkstore_lib): def helper(chunkstore_lib, chunk_size, name, df, append): chunkstore_lib.write(name, df, chunk_size=chunk_size) - assert_frame_equal(chunkstore_lib.read(name) , df) + r = chunkstore_lib.read(name) + assert_frame_equal(r, df) chunkstore_lib.append(name, append) @@ -610,7 +539,12 @@ def helper(chunkstore_lib, chunk_size, name, df, append): chunkstore_lib.update(name, append) - assert_frame_equal(chunkstore_lib.read(name), pd.concat([df, append])) + if chunk_size is not "Y": + assert_frame_equal(chunkstore_lib.read(name), pd.concat([df, append])) + else: + # chunksize is the entire DF, so we'll overwrite the whole thing + # with the update when its yearly chunking + assert_frame_equal(chunkstore_lib.read(name), append) df = [] for month in range(1, 4): @@ -652,15 +586,9 @@ def test_get_info(chunkstore_lib): ) chunkstore_lib.write('test_df', df, 'D') info = {'rows': 3, - 'dtype': [('date', '1 dimensional arrays, saving as Blob') @@ -50,7 +50,7 @@ def test_can_convert_to_records_without_objects_returns_true_otherwise(): store._to_records = Mock(return_value=(np.rec.array([(1356998400000000000, 'a')], dtype=[('index', '