Skip to content

Commit

Permalink
Close #10: Implement ability to prepend to series
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew MacDonald <andrew@maccas.net>
  • Loading branch information
amacd31 committed Nov 22, 2017
1 parent b0c3e70 commit db62beb
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 78 deletions.
224 changes: 146 additions & 78 deletions phildb/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,104 @@ def __convert_and_validate(ts, freq):

return series

def __calculate_offset(freqstr, start_date, first_record_date):
if freqstr[-1] == 'T':
if len(freqstr) == 1:
freq_mult = 1
else:
freq_mult = int(freqstr[:-1])
freqstr = 'T'
else:
freq_mult = 1

if freqstr[-1] == 'S' and len(freqstr) > 1:
freqstr = freqstr[:-1]

return start_date.to_period(freqstr) - pd.to_datetime(first_record_date).to_period(freqstr)

def __write_missing(writer, freq, first_date, last_date, log_entries):
log_entries = log_entries.copy()

missing_dates = pd.date_range(first_date, last_date, freq = freq)
for the_date in missing_dates:
datestamp = calendar.timegm(the_date.utctimetuple())
log_entries['C'].append(
(
datestamp,
MISSING_VALUE,
METADATA_MISSING_VALUE
)
)

data = pack(
entry_format,
datestamp,
MISSING_VALUE,
METADATA_MISSING_VALUE
)

writer.write(data)

return log_entries

def __update_existing_data(tsdb_file, series, log_entries):

log_entries = log_entries.copy()
with open(tsdb_file, 'rb') as reader:
first_record = unpack(entry_format, reader.read(entry_size))

first_record_date = dt(1970, 1, 1) + timedelta(seconds=first_record[0])

offset = __calculate_offset(series.index.freqstr, series.index[0], first_record_date)

with open(tsdb_file, 'r+b') as writer:
existing_records = []

# Read existing overlapping data for comparisons
writer.seek(entry_size * offset, os.SEEK_SET)

for record in iter(lambda: writer.read(entry_size), ""):
if not record: break
existing_records.append(unpack(entry_format, record))

records_length = len(existing_records)

# Start a count for records from the starting write position
rec_count = 0
writer.seek(entry_size * offset, os.SEEK_SET)
for date, value in zip(series.index, series.values):
datestamp = calendar.timegm(date.utctimetuple())
overlapping = rec_count <= records_length - 1

if overlapping and (existing_records[rec_count][1] == value or existing_records[rec_count][2] == MISSING_VALUE):
# Skip writing the entry if it hasn't changed.
writer.seek(entry_size * (rec_count +1) + (entry_size * offset), os.SEEK_SET)
elif overlapping and existing_records[rec_count][1] != value:
log_entries['U'].append(existing_records[rec_count])

log_entries['C'].append(
(
datestamp,
value,
DEFAULT_META_ID
)
)
data = __pack(datestamp, value)
writer.write(data)
else:
data = __pack(datestamp, value)
log_entries['C'].append(
(
datestamp,
value,
DEFAULT_META_ID
)
)
writer.write(data)
rec_count += 1

return log_entries


def write(tsdb_file, ts, freq):
"""
Expand Down Expand Up @@ -114,97 +212,66 @@ def write_regular_data(tsdb_file, series):
reader.seek(entry_size * -1, os.SEEK_END)
last_record = unpack(entry_format, reader.read(entry_size))

first_record_date = dt(1970, 1, 1) + timedelta(seconds=first_record[0])
last_record_date = dt(1970, 1, 1) + timedelta(seconds=last_record[0])
first_record_date = pd.Timestamp(dt(1970, 1, 1) + timedelta(seconds=first_record[0]), freq=series.index.freq)
last_record_date = pd.Timestamp(dt(1970, 1, 1) + timedelta(seconds=last_record[0]), freq=series.index.freq)

freqstr = series.index.freqstr
offset = __calculate_offset(series.index.freqstr, start_date, first_record_date)

if freqstr[-1] == 'T':
if len(freqstr) == 1:
freq_mult = 1
else:
freq_mult = int(freqstr[:-1])
freqstr = 'T'
else:
freq_mult = 1

if freqstr[-1] == 'S' and len(freqstr) > 1:
freqstr = freqstr[:-1]
# We are prepending to existing data
if start_date < first_record_date:
# Keep original data before writing (since a new file needs to be
# written for a prepend operation)
os.rename(tsdb_file, tsdb_file+'.tmp')

offset = start_date.to_period(freqstr) - pd.to_datetime(first_record_date).to_period(freqstr)
# Write all the data up to the original first_record_date
with open(tsdb_file, 'wb') as writer:
for date, value in zip(
series.loc[:first_record_date-1].index,
series.loc[:first_record_date-1].values
):
datestamp = calendar.timegm(date.utctimetuple())
log_entries['C'].append(
(
datestamp,
value,
DEFAULT_META_ID
)
)
data = __pack(datestamp, value)
writer.write(data)

# We are updating existing data
if start_date <= last_record_date:
with open(tsdb_file, 'r+b') as writer:
existing_records = []
# Fill any missing values between the end of the new series and the start of the old
with open(tsdb_file, 'ab') as writer:
log_entries = __write_missing(
writer,
series.index.freq,
end_date + 1,
pd.Timestamp(first_record_date, freq = series.index.freq) - 1,
log_entries
)

# Read existing overlapping data for comparisons
writer.seek(entry_size * offset, os.SEEK_SET)
# Copy over existing data
with open(tsdb_file + '.tmp', 'rb') as original_data, open(tsdb_file, 'ab') as writer:
for chunk in iter(lambda: original_data.read(entry_size), b''):
writer.write(chunk)

for record in iter(lambda: writer.read(entry_size), ""):
if not record: break
existing_records.append(unpack(entry_format, record))
# Remove temporary file now that its contents has been copied
os.remove(tsdb_file+'.tmp')

records_length = len(existing_records)
# Update existing data
if len(series.loc[first_record_date:]) > 0:
log_entries = __update_existing_data(tsdb_file, series.loc[first_record_date:], log_entries)

# Start a count for records from the starting write position
rec_count = 0
writer.seek(entry_size * offset, os.SEEK_SET)
for date, value in zip(series.index, series.values):
datestamp = calendar.timegm(date.utctimetuple())
overlapping = rec_count <= records_length - 1

if overlapping and (existing_records[rec_count][1] == value or existing_records[rec_count][2] == MISSING_VALUE):
# Skip writing the entry if it hasn't changed.
writer.seek(entry_size * (rec_count +1) + (entry_size * offset), os.SEEK_SET)
elif overlapping and existing_records[rec_count][1] != value:
log_entries['U'].append(existing_records[rec_count])

log_entries['C'].append(
(
datestamp,
value,
DEFAULT_META_ID
)
)
data = __pack(datestamp, value)
writer.write(data)
else:
data = __pack(datestamp, value)
log_entries['C'].append(
(
datestamp,
value,
DEFAULT_META_ID
)
)
writer.write(data)
rec_count += 1
# We are updating existing data
elif start_date <= last_record_date:
log_entries = __update_existing_data(tsdb_file, series, log_entries)

# We are appending data
elif start_date > last_record_date:
with open(tsdb_file, 'a+b') as writer:
last_record_date = pd.Timestamp(last_record_date, offset=series.index.freq) + 1

missing_dates = pd.date_range(last_record_date, start_date - 1, freq = series.index.freq)
for the_date in missing_dates:
datestamp = calendar.timegm(the_date.utctimetuple())
log_entries['C'].append(
(
datestamp,
MISSING_VALUE,
METADATA_MISSING_VALUE
)
)

data = pack(
entry_format,
datestamp,
MISSING_VALUE,
METADATA_MISSING_VALUE
)

writer.write(data)
log_entries = __write_missing(writer, series.index.freq, last_record_date, start_date - 1, log_entries)

for date, value in zip(series.index, series.values):
datestamp = calendar.timegm(date.utctimetuple())
Expand All @@ -223,6 +290,7 @@ def write_regular_data(tsdb_file, series):

return log_entries


def write_irregular_data(tsdb_file, series):
"""
Smart write of irregular data.
Expand Down Expand Up @@ -261,7 +329,7 @@ def write_irregular_data(tsdb_file, series):
log_entries['C'].append((datestamp, new_value, meta_id))
log_entries['U'].append((datestamp, orig_value, meta_id))

append_only = len(overlap_idx) == 0
append_only = len(overlap_idx) == 0 and existing.index[-1] < series.index[0]
if append_only:
merged = series
fmode = 'ab'
Expand Down
66 changes: 66 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,72 @@ def test_append_multiple(self):
self.assertEqual(5.0, data.values[4])
self.assertEqual(6.0, data.values[5])

def test_prepend_single(self):
log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,31)], data = [-1.0]), 'D')
created = log_entries['C']
self.assertEqual([(1388448000, -1.0, 0)], created)
modified = log_entries['U']
self.assertEqual([], modified)

data = reader.read(self.tsdb_existing_file)
self.assertEqual(-1.0, data.values[0])

def test_prepend_with_gap(self):
log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,30)], data = [-2.0]), 'D')
created = log_entries['C']
self.assertEqual([(1388361600, -2.0, 0), (1388448000, -9999, 9999)], created)
modified = log_entries['U']
self.assertEqual([], modified)

data = reader.read(self.tsdb_existing_file)
self.assertEqual(-2.0, data.values[0])
self.assertTrue(np.isnan(data.values[1]))

def test_prepend_with_overlap(self):
log_entries = writer.write(
self.tsdb_existing_file,
pd.Series(
index = [
datetime(2013,12,30),
datetime(2013,12,31),
datetime(2014,1,1)
],
data = [
-2.0,
-1.0,
0.0
]
),
'D'
)
created = log_entries['C']
self.assertEqual(
[
(1388361600, -2.0, 0),
(1388448000, -1.0, 0),
(1388534400, 0.0, 0)
],
created
)
modified = log_entries['U']
self.assertEqual([(1388534400, 1.0, 0)], modified)

data = reader.read(self.tsdb_existing_file)
self.assertEqual(-2.0, data.values[0])
self.assertEqual(-1.0, data.values[1])
self.assertEqual(0.0, data.values[2])

def test_prepend_irregular(self):
log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,30)], data = [-1.0]), 'IRR')
created = log_entries['C']
self.assertEqual([(1388361600, -1.0, 0)], created)
modified = log_entries['U']
self.assertEqual([], modified)

data = reader.read(self.tsdb_existing_file)
self.assertEqual(-1.0, data.values[0])
self.assertEqual(1.0, data.values[1])

def test_update_and_append(self):
log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2014,1,2), datetime(2014,1,3), datetime(2014,1,4), datetime(2014,1,5), datetime(2014,1,6)], data = [2.5, 3.0, 4.0, 5.0, 6.0]), 'D')
modified = log_entries['U']
Expand Down

0 comments on commit db62beb

Please sign in to comment.