Skip to content

Commit

Permalink
Merge pull request pandas-dev#179 from manahl/append-fix
Browse files Browse the repository at this point in the history
Fix issue pandas-dev#178
  • Loading branch information
bmoscon authored Jul 20, 2016
2 parents 9832f62 + 7be6bb3 commit 38574fc
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Feature: #171 allow deleting of values within a date range in ChunkStore
* Bugfix: #172 Fix date range bug when querying dates in the middle of chunks
* Bugfix: #176 Fix overwrite failures in Chunkstore
* Bugfix: #178 - Change how start/end dates are populated in the DB, also fix append so it works as expected.

### 1.25 (2016-05-23)

Expand Down
13 changes: 1 addition & 12 deletions arctic/chunkstore/_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ def to_range(self, start, end):
"""
raise NotImplementedError

def to_start_end(self, item):
"""
turns the data in item to a start/end pair (same as is returned by
to_chunks()
returns
-------
tuple - (start, end)
"""
raise NotImplementedError

def to_mongo(self, range_obj):
"""
takes the range object used for this chunker type
Expand Down Expand Up @@ -62,7 +51,7 @@ def filter(self, data, range_obj):
"""
raise NotImplementedError

def exclude(self, data, range_obj):
def exclude(self, data, range_obj):
"""
Removes data within the bounds of the range object (inclusive)
Expand Down
162 changes: 58 additions & 104 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ast

from bson.binary import Binary
from pandas import Series, DataFrame
from pandas import Series, DataFrame, concat

from ..store._version_store_utils import checksum
from ..decorators import mongo_retry
Expand Down Expand Up @@ -245,49 +245,45 @@ def write(self, symbol, item, chunk_size):
{'$set': doc},
upsert=True)

def append(self, symbol, item):
"""
Appends data from item to symbol's data in the database
def __concat(self, a, b):
return concat([a, b]).sort()

Parameters
----------
symbol: str
the symbol for the given item in the DB
item:
the data to append
"""
def __combine(self, a, b):
return a.combine_first(b)

def __update(self, symbol, item, combine_method=None):
sym = self._get_symbol_info(symbol)
if not sym:
raise NoDataFoundException("Symbol does not exist. Cannot append")
raise NoDataFoundException("Symbol does not exist.")

if isinstance(item, Series) and sym['type'] == 'df':
raise Exception("cannot append a series to a dataframe")
raise Exception("Symbol types do not match")
if isinstance(item, DataFrame) and sym['type'] == 'series':
raise Exception("cannot append a dataframe to a series")
raise Exception("Symbol types do not match")

records = []
ranges = []
dtype = None

new_chunks = []
for start, end, record in self.chunker.to_chunks(item, sym['chunk_size']):
'''
if we have a multiindex there is a chance that part of the append
will overlap an already written chunk, so we need to update
where the date part of the index overlaps
'''
if item.index.nlevels > 1:
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end))
if not df.empty:
if df.equals(record):
continue
record = record.combine_first(df)
self.update(symbol, record)
sym = self._get_symbol_info(symbol)
# read out matching chunks
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end), filter_data=False)
# assuming they exist, update them and store the original chunk
# range for later use
if not df.empty:
record = combine_method(record, df)
if record is None or record.equals(df):
continue

new_chunks.append(False)
sym['append_count'] += len(record)
sym['len'] -= len(df)
else:
new_chunks.append(True)
sym['chunk_count'] += 1

r, dtype = serialize(record, string_max_len=self.STRING_MAX)
if str(dtype) != sym['dtype']:
raise Exception("Dtype mismatch - cannot append")
raise Exception('Dtype mismatch.')
records.append(r)
ranges.append((start, end))

Expand All @@ -299,38 +295,59 @@ def append(self, symbol, item):

item = item.astype(dtype)

if str(dtype) != sym['dtype']:
raise Exception("Dtype mismatch - cannot append")

data = item.tostring()
sym['len'] += len(item)
if len(item) > 0:
sym['chunk_count'] += len(records)
sym['append_count'] += len(records)
sym['append_size'] += len(data)

chunks = [r.tostring() for r in records]
chunks = compress_array(chunks)

for chunk, rng in zip(chunks, ranges):
bulk = self._collection.initialize_unordered_bulk_op()
for chunk, rng, new_chunk in zip(chunks, ranges, new_chunks):
start = rng[0]
end = rng[-1]
end = rng[1]

segment = {'data': Binary(chunk)}
segment['start'] = start
segment['end'] = end
self._collection.update_one({'symbol': symbol, 'sha': checksum(symbol, segment)},
{'$set': segment},
upsert=True)
sha = checksum(symbol, segment)
segment['sha'] = sha
if new_chunk:
# new chunk
bulk.find({'symbol': symbol, 'sha': sha}
).upsert().update_one({'$set': segment})
else:
bulk.find({'symbol': symbol, 'start': start, 'end': end}
).update_one({'$set': segment})
if len(chunks) > 0:
bulk.execute()

self._symbols.replace_one({'symbol': symbol}, sym)

def append(self, symbol, item):
"""
Appends data from item to symbol's data in the database.
Is not idempotent
Parameters
----------
symbol: str
the symbol for the given item in the DB
item:
the data to append
"""
self.__update(symbol, item, combine_method=self.__concat)

def update(self, symbol, item):
"""
Merges data from item onto existing data in the database for symbol
data that exists in symbol and item for the same index/multiindex will
be overwritten by the data in item.
Is idempotent
Parameters
----------
symbol: str
Expand All @@ -339,70 +356,7 @@ def update(self, symbol, item):
the data to update
"""

sym = self._get_symbol_info(symbol)
if not sym:
raise NoDataFoundException("Symbol does not exist. Cannot update")


records = []
ranges = []
orig_ranges = []
for start, end, record in self.chunker.to_chunks(item, sym['chunk_size']):
# read out matching chunks
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end))
# assuming they exist, update them and store the original chunk
# range for later use
if not df.empty:
if df.equals(record):
continue
record = record.combine_first(df)
orig_ranges.append((self.chunker.to_start_end(record)))
else:
orig_ranges.append((None, None))

r, dtype = serialize(record, string_max_len=self.STRING_MAX)
if str(dtype) != sym['dtype']:
raise Exception('Dtype mismatch - cannot update')
records.append(r)
ranges.append((start, end))

if len(records) > 0:
chunks = [r.tostring() for r in records]
lens = [len(i) for i in chunks]
chunks = compress_array(chunks)

seg_count = 0
seg_len = 0

bulk = self._collection.initialize_unordered_bulk_op()
for chunk, rng, orig_rng, rec_len in zip(chunks, ranges, orig_ranges, lens):
start = rng[0]
end = rng[1]
orig_start = orig_rng[0]
if orig_start is None:
sym['len'] += rec_len
seg_count += 1
seg_len += rec_len
segment = {'data': Binary(chunk)}
segment['start'] = start
segment['end'] = end
sha = checksum(symbol, segment)
segment['sha'] = sha
if orig_start is None:
# new chunk
bulk.find({'symbol': symbol, 'sha': sha, 'start': segment['start']}
).upsert().update_one({'$set': segment})
else:
bulk.find({'symbol': symbol, 'start': orig_start}
).update_one({'$set': segment})
if len(chunks) > 0:
bulk.execute()

if seg_count != 0:
sym['chunk_count'] += seg_count
sym['append_size'] += seg_len
sym['append_count'] += seg_count
self._symbols.replace_one({'symbol': symbol}, sym)
self.__update(symbol, item, combine_method=self.__combine)

def get_info(self, symbol):
sym = self._get_symbol_info(symbol)
Expand Down
33 changes: 18 additions & 15 deletions arctic/chunkstore/date_chunker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import calendar
import pandas as pd
from pandas import Timestamp
from datetime import datetime as dt

from ._chunker import Chunker
from ..date import DateRange
Expand All @@ -21,22 +23,26 @@ def _get_date_chunk(self, date, chunk_size):
elif chunk_size == 'D':
return date.strftime('%Y-%m-%d')

def _get_date_range(self, df):
def _get_date_range(self, df, chunk_size):
"""
get minx/max dates in the index of the dataframe
get minx/max dates for the chunk
returns
-------
A tuple (start date, end date)
"""
dates = df.index.get_level_values('date')
start = dates.min()
end = dates.max()
if isinstance(start, Timestamp):
start = start.to_pydatetime()
if isinstance(end, Timestamp):
end = end.to_pydatetime()
return start, end
date = df.index.get_level_values('date')[0]

if isinstance(date, Timestamp):
date = date.to_pydatetime()

if chunk_size == 'M':
_, end_day = calendar.monthrange(date.year, date.month)
return dt(date.year, date.month, 1), dt(date.year, date.month, end_day)
elif chunk_size == 'Y':
return dt(date.year, 1, 1), dt(date.year, 12, 31)
else:
return date, date

def to_chunks(self, df, chunk_size):
"""
Expand Down Expand Up @@ -64,15 +70,12 @@ def to_chunks(self, df, chunk_size):
ret = df.xs(slice(date, date), level='date', drop_level=False)
else:
ret = df[date: date]
start, end = self._get_date_range(ret)
start, end = self._get_date_range(ret, chunk_size)
yield start, end, ret

def to_range(self, start, end):
return DateRange(start, end)

def to_start_end(self, data):
return self._get_date_range(data)

def to_mongo(self, range_obj):
if range_obj.start and range_obj.end:
return {'$and': [{'start': {'$lte': range_obj.end}}, {'end': {'$gte': range_obj.start}}]}
Expand All @@ -84,7 +87,7 @@ def to_mongo(self, range_obj):
return {}

def filter(self, data, range_obj):
return data.ix[range_obj.start:range_obj.end]
return data[range_obj.start:range_obj.end]

def exclude(self, data, range_obj):
return data[(data.index.get_level_values('date') < range_obj.start) | (data.index.get_level_values('date') > range_obj.end)]
Loading

0 comments on commit 38574fc

Please sign in to comment.