diff --git a/phildb/writer.py b/phildb/writer.py index d79399c..17d94d6 100644 --- a/phildb/writer.py +++ b/phildb/writer.py @@ -45,6 +45,104 @@ def __convert_and_validate(ts, freq): return series +def __calculate_offset(freqstr, start_date, first_record_date): + if freqstr[-1] == 'T': + if len(freqstr) == 1: + freq_mult = 1 + else: + freq_mult = int(freqstr[:-1]) + freqstr = 'T' + else: + freq_mult = 1 + + if freqstr[-1] == 'S' and len(freqstr) > 1: + freqstr = freqstr[:-1] + + return start_date.to_period(freqstr) - pd.to_datetime(first_record_date).to_period(freqstr) + +def __write_missing(writer, freq, first_date, last_date, log_entries): + log_entries = log_entries.copy() + + missing_dates = pd.date_range(first_date, last_date, freq = freq) + for the_date in missing_dates: + datestamp = calendar.timegm(the_date.utctimetuple()) + log_entries['C'].append( + ( + datestamp, + MISSING_VALUE, + METADATA_MISSING_VALUE + ) + ) + + data = pack( + entry_format, + datestamp, + MISSING_VALUE, + METADATA_MISSING_VALUE + ) + + writer.write(data) + + return log_entries + +def __update_existing_data(tsdb_file, series, log_entries): + + log_entries = log_entries.copy() + with open(tsdb_file, 'rb') as reader: + first_record = unpack(entry_format, reader.read(entry_size)) + + first_record_date = dt(1970, 1, 1) + timedelta(seconds=first_record[0]) + + offset = __calculate_offset(series.index.freqstr, series.index[0], first_record_date) + + with open(tsdb_file, 'r+b') as writer: + existing_records = [] + + # Read existing overlapping data for comparisons + writer.seek(entry_size * offset, os.SEEK_SET) + + for record in iter(lambda: writer.read(entry_size), ""): + if not record: break + existing_records.append(unpack(entry_format, record)) + + records_length = len(existing_records) + + # Start a count for records from the starting write position + rec_count = 0 + writer.seek(entry_size * offset, os.SEEK_SET) + for date, value in zip(series.index, series.values): + datestamp = calendar.timegm(date.utctimetuple()) + overlapping = rec_count <= records_length - 1 + + if overlapping and (existing_records[rec_count][1] == value or existing_records[rec_count][2] == MISSING_VALUE): + # Skip writing the entry if it hasn't changed. + writer.seek(entry_size * (rec_count +1) + (entry_size * offset), os.SEEK_SET) + elif overlapping and existing_records[rec_count][1] != value: + log_entries['U'].append(existing_records[rec_count]) + + log_entries['C'].append( + ( + datestamp, + value, + DEFAULT_META_ID + ) + ) + data = __pack(datestamp, value) + writer.write(data) + else: + data = __pack(datestamp, value) + log_entries['C'].append( + ( + datestamp, + value, + DEFAULT_META_ID + ) + ) + writer.write(data) + rec_count += 1 + + return log_entries + def write(tsdb_file, ts, freq): """ @@ -114,97 +212,66 @@ def write_regular_data(tsdb_file, series): reader.seek(entry_size * -1, os.SEEK_END) last_record = unpack(entry_format, reader.read(entry_size)) - first_record_date = dt(1970, 1, 1) + timedelta(seconds=first_record[0]) - last_record_date = dt(1970, 1, 1) + timedelta(seconds=last_record[0]) + first_record_date = pd.Timestamp(dt(1970, 1, 1) + timedelta(seconds=first_record[0]), freq=series.index.freq) + last_record_date = pd.Timestamp(dt(1970, 1, 1) + timedelta(seconds=last_record[0]), freq=series.index.freq) - freqstr = series.index.freqstr + offset = __calculate_offset(series.index.freqstr, start_date, first_record_date) - if freqstr[-1] == 'T': - if len(freqstr) == 1: - freq_mult = 1 - else: - freq_mult = int(freqstr[:-1]) - freqstr = 'T' - else: - freq_mult = 1 - - if freqstr[-1] == 'S' and len(freqstr) > 1: - freqstr = freqstr[:-1] + # We are prepending to existing data + if start_date < first_record_date: + # Keep original data before writing (since a new file needs to be + # written for a prepend operation) + os.rename(tsdb_file, tsdb_file+'.tmp') - offset = start_date.to_period(freqstr) - pd.to_datetime(first_record_date).to_period(freqstr) + # Write all the data up to the original first_record_date + with open(tsdb_file, 'wb') as writer: + for date, value in zip( + series.loc[:first_record_date-1].index, + series.loc[:first_record_date-1].values + ): + datestamp = calendar.timegm(date.utctimetuple()) + log_entries['C'].append( + ( + datestamp, + value, + DEFAULT_META_ID + ) + ) + data = __pack(datestamp, value) + writer.write(data) - # We are updating existing data - if start_date <= last_record_date: - with open(tsdb_file, 'r+b') as writer: - existing_records = [] + # Fill any missing values between the end of the new series and the start of the old + with open(tsdb_file, 'ab') as writer: + log_entries = __write_missing( + writer, + series.index.freq, + end_date + 1, + pd.Timestamp(first_record_date, freq = series.index.freq) - 1, + log_entries + ) - # Read existing overlapping data for comparisons - writer.seek(entry_size * offset, os.SEEK_SET) + # Copy over existing data + with open(tsdb_file + '.tmp', 'rb') as original_data, open(tsdb_file, 'ab') as writer: + for chunk in iter(lambda: original_data.read(entry_size), b''): + writer.write(chunk) - for record in iter(lambda: writer.read(entry_size), ""): - if not record: break - existing_records.append(unpack(entry_format, record)) + # Remove temporary file now that its contents has been copied + os.remove(tsdb_file+'.tmp') - records_length = len(existing_records) + # Update existing data + if len(series.loc[first_record_date:]) > 0: + log_entries = __update_existing_data(tsdb_file, series.loc[first_record_date:], log_entries) - # Start a count for records from the starting write position - rec_count = 0 - writer.seek(entry_size * offset, os.SEEK_SET) - for date, value in zip(series.index, series.values): - datestamp = calendar.timegm(date.utctimetuple()) - overlapping = rec_count <= records_length - 1 - - if overlapping and (existing_records[rec_count][1] == value or existing_records[rec_count][2] == MISSING_VALUE): - # Skip writing the entry if it hasn't changed. - writer.seek(entry_size * (rec_count +1) + (entry_size * offset), os.SEEK_SET) - elif overlapping and existing_records[rec_count][1] != value: - log_entries['U'].append(existing_records[rec_count]) - - log_entries['C'].append( - ( - datestamp, - value, - DEFAULT_META_ID - ) - ) - data = __pack(datestamp, value) - writer.write(data) - else: - data = __pack(datestamp, value) - log_entries['C'].append( - ( - datestamp, - value, - DEFAULT_META_ID - ) - ) - writer.write(data) - rec_count += 1 + # We are updating existing data + elif start_date <= last_record_date: + log_entries = __update_existing_data(tsdb_file, series, log_entries) # We are appending data elif start_date > last_record_date: with open(tsdb_file, 'a+b') as writer: last_record_date = pd.Timestamp(last_record_date, offset=series.index.freq) + 1 - missing_dates = pd.date_range(last_record_date, start_date - 1, freq = series.index.freq) - for the_date in missing_dates: - datestamp = calendar.timegm(the_date.utctimetuple()) - log_entries['C'].append( - ( - datestamp, - MISSING_VALUE, - METADATA_MISSING_VALUE - ) - ) - - data = pack( - entry_format, - datestamp, - MISSING_VALUE, - METADATA_MISSING_VALUE - ) - - writer.write(data) + log_entries = __write_missing(writer, series.index.freq, last_record_date, start_date - 1, log_entries) for date, value in zip(series.index, series.values): datestamp = calendar.timegm(date.utctimetuple()) @@ -223,6 +290,7 @@ def write_regular_data(tsdb_file, series): return log_entries + def write_irregular_data(tsdb_file, series): """ Smart write of irregular data. @@ -261,7 +329,7 @@ def write_irregular_data(tsdb_file, series): log_entries['C'].append((datestamp, new_value, meta_id)) log_entries['U'].append((datestamp, orig_value, meta_id)) - append_only = len(overlap_idx) == 0 + append_only = len(overlap_idx) == 0 and existing.index[-1] < series.index[0] if append_only: merged = series fmode = 'ab' diff --git a/tests/test_writer.py b/tests/test_writer.py index 630b873..dcad471 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -139,6 +139,72 @@ def test_append_multiple(self): self.assertEqual(5.0, data.values[4]) self.assertEqual(6.0, data.values[5]) + def test_prepend_single(self): + log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,31)], data = [-1.0]), 'D') + created = log_entries['C'] + self.assertEqual([(1388448000, -1.0, 0)], created) + modified = log_entries['U'] + self.assertEqual([], modified) + + data = reader.read(self.tsdb_existing_file) + self.assertEqual(-1.0, data.values[0]) + + def test_prepend_with_gap(self): + log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,30)], data = [-2.0]), 'D') + created = log_entries['C'] + self.assertEqual([(1388361600, -2.0, 0), (1388448000, -9999, 9999)], created) + modified = log_entries['U'] + self.assertEqual([], modified) + + data = reader.read(self.tsdb_existing_file) + self.assertEqual(-2.0, data.values[0]) + self.assertTrue(np.isnan(data.values[1])) + + def test_prepend_with_overlap(self): + log_entries = writer.write( + self.tsdb_existing_file, + pd.Series( + index = [ + datetime(2013,12,30), + datetime(2013,12,31), + datetime(2014,1,1) + ], + data = [ + -2.0, + -1.0, + 0.0 + ] + ), + 'D' + ) + created = log_entries['C'] + self.assertEqual( + [ + (1388361600, -2.0, 0), + (1388448000, -1.0, 0), + (1388534400, 0.0, 0) + ], + created + ) + modified = log_entries['U'] + self.assertEqual([(1388534400, 1.0, 0)], modified) + + data = reader.read(self.tsdb_existing_file) + self.assertEqual(-2.0, data.values[0]) + self.assertEqual(-1.0, data.values[1]) + self.assertEqual(0.0, data.values[2]) + + def test_prepend_irregular(self): + log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2013,12,30)], data = [-1.0]), 'IRR') + created = log_entries['C'] + self.assertEqual([(1388361600, -1.0, 0)], created) + modified = log_entries['U'] + self.assertEqual([], modified) + + data = reader.read(self.tsdb_existing_file) + self.assertEqual(-1.0, data.values[0]) + self.assertEqual(1.0, data.values[1]) + def test_update_and_append(self): log_entries = writer.write(self.tsdb_existing_file, pd.Series(index = [datetime(2014,1,2), datetime(2014,1,3), datetime(2014,1,4), datetime(2014,1,5), datetime(2014,1,6)], data = [2.5, 3.0, 4.0, 5.0, 6.0]), 'D') modified = log_entries['U']