Skip to content

Commit

Permalink
Renamed 'Chunk' into 'Part'.
Browse files Browse the repository at this point in the history
  • Loading branch information
goossaert committed Mar 25, 2015
1 parent 2d34871 commit 175d811
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 162 deletions.
2 changes: 1 addition & 1 deletion algorithm/compressor.cc
Expand Up @@ -48,7 +48,7 @@ Status CompressorLZ4::Compress(char *source,
}

// NOTE: Yes, storing 64 bits into 32, but overflows will not happens as
// size_source is limited to db.storage.maximum_chunk_size
// size_source is limited to db.storage.maximum_part_size
uint32_t size_source_32 = size_source;
EncodeFixed32((*dest), size_compressed_stored);
EncodeFixed32((*dest) + 4, size_source_32);
Expand Down
10 changes: 5 additions & 5 deletions cache/rate_limiter.h
Expand Up @@ -31,7 +31,7 @@ class RateLimiter {
epoch_current_ = std::time(0);
if (epoch_current_ != epoch_last_) {
rate_incoming_adjusted_ = rate_incoming_ + bytes_per_microsecond_ * duration_slept_;
log::trace("RateLimiter",
log::trace("RateLimiter::Tick()",
"rate_incoming_: %" PRIu64 " rate_incoming_adjusted_:%" PRIu64,
rate_incoming_, rate_incoming_adjusted_);
duration_slept_ = 0;
Expand All @@ -57,7 +57,7 @@ class RateLimiter {
bytes_per_microsecond_ *= 0.995;
}
if (bytes_per_microsecond_ <= 5) bytes_per_microsecond_ += 1;
log::trace("WriteBuffer::WriteChunk()", "decreasing");
log::trace("RateLimiter::Tick()", "decreasing");
} else {
// The rate of incoming data is lower than the rate at which data
// can be written, therefore bytes_per_microsecond_ needs to be
Expand All @@ -71,11 +71,11 @@ class RateLimiter {
} else {
bytes_per_microsecond_ *= 1.005;
}
log::trace("RateLimiter", "increasing");
log::trace("RateLimiter::Tick()", "increasing");
if (bytes_per_microsecond_ <= 5) bytes_per_microsecond_ += 1;
}

log::trace("RateLimiter",
log::trace("RateLimiter::Tick()",
"limit rate: bytes_per_microsecond_: %" PRIu64 " rate_writing:%" PRIu64,
bytes_per_microsecond_, rate_writing);
}
Expand All @@ -90,7 +90,7 @@ class RateLimiter {
if (sleep_microseconds > 50000) sleep_microseconds = 50000;

if (sleep_microseconds) {
log::trace("RateLimiter",
log::trace("RateLimiter::Tick()",
"bytes_per_microsecond_: %" PRIu64 ", sleep_microseconds: %" PRIu64,
bytes_per_microsecond_, sleep_microseconds);
std::chrono::microseconds duration(sleep_microseconds);
Expand Down
32 changes: 13 additions & 19 deletions cache/write_buffer.cc
Expand Up @@ -28,10 +28,10 @@ Status WriteBuffer::Get(ReadOptions& read_options, ByteArray& key, ByteArray* va
// TODO: make sure the live buffer doesn't need to be protected by a mutex in
// order to be accessed -- right now I'm relying on timing, but that may
// be too weak to guarantee proper access
// TODO: for items being stored that are not small enough, only chunks will
// TODO: for items being stored that are not small enough, only parts will
// be found in the buffers -- should the kv-store return "not found"
// or should it try to send the data from the disk and the partially
// available chunks in the buffer?
// available parts in the buffer?
if (IsStopRequested()) return Status::IOError("Cannot handle request: WriteBuffer is closing");

// read the "live" buffer
Expand All @@ -58,9 +58,6 @@ Status WriteBuffer::Get(ReadOptions& read_options, ByteArray& key, ByteArray* va
log::debug("WriteBuffer::Get()", "found in buffer_live");
if ( order_found.type == OrderType::Put
&& order_found.IsSelfContained()) {
// TODO: uncompressed the chunk
// TODO: make sure that it is clear for the code below this method
// whether or not the chunk is compressed
*value_out = order_found.chunk;
(*value_out).set_size(order_found.size_value);
(*value_out).set_size_compressed(order_found.size_value_compressed);
Expand Down Expand Up @@ -102,9 +99,6 @@ Status WriteBuffer::Get(ReadOptions& read_options, ByteArray& key, ByteArray* va
if ( found
&& order_found.type == OrderType::Put
&& order_found.IsSelfContained()) {
// TODO: uncompressed the chunk
// TODO: make sure that it is clear for the code below this method
// whether or not the chunk is compressed
*value_out = order_found.chunk;
(*value_out).set_size(order_found.size_value);
(*value_out).set_size_compressed(order_found.size_value_compressed);
Expand Down Expand Up @@ -133,14 +127,14 @@ Status WriteBuffer::Put(WriteOptions& write_options, ByteArray& key, ByteArray&
}


Status WriteBuffer::PutChunk(WriteOptions& write_options,
Status WriteBuffer::PutPart(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value,
uint64_t size_value_compressed,
uint32_t crc32) {
return WriteChunk(write_options,
return WritePart(write_options,
OrderType::Put,
key,
chunk,
Expand All @@ -154,11 +148,11 @@ Status WriteBuffer::PutChunk(WriteOptions& write_options,

Status WriteBuffer::Delete(WriteOptions& write_options, ByteArray& key) {
auto empty = ByteArray::NewEmptyByteArray();
return WriteChunk(write_options, OrderType::Delete, key, empty, 0, 0, 0, 0);
return WritePart(write_options, OrderType::Delete, key, empty, 0, 0, 0, 0);
}


Status WriteBuffer::WriteChunk(const WriteOptions& write_options,
Status WriteBuffer::WritePart(const WriteOptions& write_options,
const OrderType& op,
ByteArray& key,
ByteArray& chunk,
Expand All @@ -168,15 +162,15 @@ Status WriteBuffer::WriteChunk(const WriteOptions& write_options,
uint32_t crc32) {
if (IsStopRequested()) return Status::IOError("Cannot handle request: WriteBuffer is closing");

log::trace("WriteBuffer::WriteChunk()",
log::trace("WriteBuffer::WritePart()",
"key:[%s] | size chunk:%" PRIu64 ", total size value:%" PRIu64 " offset_chunk:%" PRIu64 " sizeOfBuffer:%d",
key.ToString().c_str(), chunk.size(), size_value, offset_chunk, buffers_[im_live_].size());

bool is_first_chunk = (offset_chunk == 0);
bool is_first_part = (offset_chunk == 0);
bool is_large = key.size() + size_value > db_options_.storage__hstable_size;

uint64_t bytes_arriving = 0;
if (is_first_chunk) bytes_arriving += key.size();
if (is_first_part) bytes_arriving += key.size();
bytes_arriving += chunk.size();

if (UseRateLimiter()) rate_limiter_.Tick(bytes_arriving);
Expand All @@ -202,17 +196,17 @@ Status WriteBuffer::WriteChunk(const WriteOptions& write_options,
/*
if (buffers_[im_live_].size()) {
for(auto &p: buffers_[im_live_]) {
log::trace("WriteBuffer::WriteChunk()",
log::trace("WriteBuffer::WritePart()",
"Write() ITEM key_ptr:[%p] key:[%s] | size chunk:%d, total size value:%d offset_chunk:%" PRIu64 " sizeOfBuffer:%d sizes_[im_live_]:%d",
p.key, p.key->ToString().c_str(), p.chunk->size(), p.size_value, p.offset_chunk, buffers_[im_live_].size(), sizes_[im_live_]);
}
} else {
log::trace("WriteBuffer::WriteChunk()", "Write() ITEM no buffers_[im_live_]");
log::trace("WriteBuffer::WritePart()", "Write() ITEM no buffers_[im_live_]");
}
*/

if (size_buffer_live > buffer_size_) {
log::trace("WriteBuffer::WriteChunk()", "trying to swap");
log::trace("WriteBuffer::WritePart()", "trying to swap");
mutex_flush_level2_.lock();
log::debug("LOCK", "2 lock");
log::debug("LOCK", "3 lock");
Expand All @@ -223,7 +217,7 @@ Status WriteBuffer::WriteChunk(const WriteOptions& write_options,
log::debug("LOCK", "2 unlock");

} else {
log::trace("WriteBuffer::WriteChunk()", "will not swap");
log::trace("WriteBuffer::WritePart()", "will not swap");
}

log::debug("LOCK", "1 unlock");
Expand Down
4 changes: 2 additions & 2 deletions cache/write_buffer.h
Expand Up @@ -51,7 +51,7 @@ class WriteBuffer {
~WriteBuffer() { Close(); }
Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out);
Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk);
Status PutChunk(WriteOptions& write_options,
Status PutPart(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
Expand Down Expand Up @@ -89,7 +89,7 @@ class WriteBuffer {
bool stop_requested_;

private:
Status WriteChunk(const WriteOptions& write_options,
Status WritePart(const WriteOptions& write_options,
const OrderType& op,
ByteArray& key,
ByteArray& chunk,
Expand Down
52 changes: 26 additions & 26 deletions interface/database.cc
Expand Up @@ -65,11 +65,11 @@ Status Database::Get(ReadOptions& read_options, ByteArray& key, ByteArray* value


Status Database::Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) {
return PutChunk(write_options, key, chunk, 0, chunk.size());
return PutPart(write_options, key, chunk, 0, chunk.size());
}


Status Database::PutChunk(WriteOptions& write_options,
Status Database::PutPart(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
Expand All @@ -80,20 +80,20 @@ Status Database::PutChunk(WriteOptions& write_options,
return Status::IOError("Attempted write beyond the total value size, aborting write.");
}

if (size_value <= db_options_.storage__maximum_chunk_size) {
return PutChunkValidSize(write_options, key, chunk, offset_chunk, size_value);
if (size_value <= db_options_.storage__maximum_part_size) {
return PutPartValidSize(write_options, key, chunk, offset_chunk, size_value);
}

// 'chunk' may be deleted by the call to PutChunkValidSize()
// 'chunk' may be deleted by the call to PutPartValidSize()
// and therefore it cannot be used in the loop test condition
uint64_t size_chunk = chunk.size();
Status s;
for (uint64_t offset = 0; offset < size_chunk; offset += db_options_.storage__maximum_chunk_size) {
for (uint64_t offset = 0; offset < size_chunk; offset += db_options_.storage__maximum_part_size) {
ByteArray key_new, chunk_new;
if (offset + db_options_.storage__maximum_chunk_size < chunk.size()) {
if (offset + db_options_.storage__maximum_part_size < chunk.size()) {
chunk_new = chunk;
chunk_new.set_offset(offset);
chunk_new.set_size(db_options_.storage__maximum_chunk_size);
chunk_new.set_size(db_options_.storage__maximum_part_size);
key_new = key;
} else {
chunk_new = chunk;
Expand All @@ -102,15 +102,15 @@ Status Database::PutChunk(WriteOptions& write_options,
key_new = key;
}

s = PutChunkValidSize(write_options, key_new, chunk_new, offset_chunk + offset, size_value);
s = PutPartValidSize(write_options, key_new, chunk_new, offset_chunk + offset, size_value);
if (!s.IsOK()) break;
}

return s;
}


Status Database::PutChunkValidSize(WriteOptions& write_options,
Status Database::PutPartValidSize(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
Expand All @@ -119,7 +119,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
Status s;
s = se_->FileSystemStatus();
if (!s.IsOK()) return s;
log::trace("Database::PutChunkValidSize()",
log::trace("Database::PutPartValidSize()",
"[%s] size_chunk:%" PRIu64 " offset_chunk:%" PRIu64,
key.ToString().c_str(),
chunk.size(),
Expand All @@ -130,9 +130,9 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
uint64_t offset_chunk_compressed = offset_chunk;
ByteArray chunk_final;

bool is_first_chunk = (offset_chunk == 0);
bool is_last_chunk = (chunk.size() + offset_chunk == size_value);
log::trace("Database::PutChunkValidSize()",
bool is_first_part = (offset_chunk == 0);
bool is_last_part = (chunk.size() + offset_chunk == size_value);
log::trace("Database::PutPartValidSize()",
"CompressionType:%d",
db_options_.compression.type);

Expand All @@ -141,7 +141,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
do_compression = false;
}

if (is_first_chunk) {
if (is_first_part) {
ts_compression_enabled_.put(1);
ts_offset_.put(0);
}
Expand All @@ -159,7 +159,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
chunk_final = chunk;
} else {
std::chrono::high_resolution_clock::time_point step00 = std::chrono::high_resolution_clock::now();
if (is_first_chunk) {
if (is_first_part) {
compressor_.ResetThreadLocalStorage();
}
std::chrono::high_resolution_clock::time_point step01 = std::chrono::high_resolution_clock::now();
Expand All @@ -174,7 +174,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
if (!s.IsOK()) return s;
std::chrono::high_resolution_clock::time_point step02 = std::chrono::high_resolution_clock::now();

log::trace("Database::PutChunkValidSize()",
log::trace("Database::PutPartValidSize()",
"[%s] size_compressed:%" PRIu64,
key.ToString().c_str(), compressor_.size_compressed());

Expand All @@ -197,7 +197,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
ByteArray chunk_compressed = ByteArray::NewShallowCopyByteArray(compressed, size_compressed);
std::chrono::high_resolution_clock::time_point step04 = std::chrono::high_resolution_clock::now();

log::trace("Database::PutChunkValidSize()",
log::trace("Database::PutPartValidSize()",
"[%s] (%" PRIu64 ") compressed size %" PRIu64 " - offset_chunk_compressed %" PRIu64,
key.ToString().c_str(),
chunk.size(),
Expand All @@ -213,18 +213,18 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
uint64_t duration04 = std::chrono::duration_cast<std::chrono::microseconds>(step05 - step04).count();

/*
log::info("Database::PutChunkValidSize()",
log::info("Database::PutPartValidSize()",
"Durations: [%" PRIu64 "] [%" PRIu64 "] [%" PRIu64 "] [%" PRIu64 "] [%" PRIu64 "]",
duration00, duration01, duration02, duration03, duration04
);
*/
}

if (do_compression && is_last_chunk) {
if (do_compression && is_last_part) {
if (ts_compression_enabled_.get() == 1) {
size_value_compressed = compressor_.size_compressed();
} else {
if (is_first_chunk) {
if (is_first_part) {
// chunk is self-contained: first ans last
size_value_compressed = ts_offset_.get();
} else {
Expand All @@ -235,24 +235,24 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,

// Compute CRC32 checksum
uint32_t crc32 = 0;
if (is_first_chunk) {
if (is_first_part) {
crc32_.ResetThreadLocalStorage();
crc32_.stream(key.data(), key.size());
}
crc32_.stream(chunk_final.data(), chunk_final.size());
if (is_last_chunk) crc32 = crc32_.get();
if (is_last_part) crc32 = crc32_.get();

log::trace("Database PutChunkValidSize()", "[%s] size_value_compressed:%" PRIu64 " crc32:0x%" PRIx64 " END", key.ToString().c_str(), size_value_compressed, crc32);
log::trace("Database PutPartValidSize()", "[%s] size_value_compressed:%" PRIu64 " crc32:0x%" PRIx64 " END", key.ToString().c_str(), size_value_compressed, crc32);

uint64_t size_padding = do_compression ? EntryHeader::CalculatePaddingSize(size_value) : 0;
if ( offset_chunk_compressed + chunk_final.size()
> size_value + size_padding) {
log::emerg("Database::PutChunkValidSize()", "Error: write was attempted outside of the allocated memory.");
log::emerg("Database::PutPartValidSize()", "Error: write was attempted outside of the allocated memory.");
return Status::IOError("Prevented write to occur outside of the allocated memory.");
}

// (size_value_compressed != 0 && chunk->size() + offset_chunk == size_value_compressed));
return wb_->PutChunk(write_options,
return wb_->PutPart(write_options,
key,
chunk_final,
offset_chunk_compressed,
Expand Down
20 changes: 10 additions & 10 deletions interface/database.h
Expand Up @@ -145,21 +145,21 @@ class Database: public KingDB {
uint64_t max_size_hash = hash->MaxInputSize();
delete hash;

if (db_options_.storage__maximum_chunk_size > std::numeric_limits<int32_t>::max()) {
return Status::IOError("db.storage.maximum-chunk-size cannot be greater than max int32. Fix your options.");
if (db_options_.storage__maximum_part_size > std::numeric_limits<int32_t>::max()) {
return Status::IOError("db.storage.maximum-part-size cannot be greater than max int32. Fix your options.");
}

if (db_options_.storage__maximum_chunk_size >= db_options_.storage__hstable_size) {
return Status::IOError("The maximum size of a chunk cannot be larger than the minimum size of a large file (db.storage.maximum-chunk-size >= db.storage.hstable-size). Fix your options.");
if (db_options_.storage__maximum_part_size >= db_options_.storage__hstable_size) {
return Status::IOError("The maximum size of a chunk cannot be larger than the minimum size of a large file (db.storage.maximum-part-size >= db.storage.hstable-size). Fix your options.");
}

if (db_options_.storage__maximum_chunk_size > max_size_hash) {
return Status::IOError("db.storage.maximum-chunk-size cannot be greater than the maximum input size of the hash function you chose. Fix your options.");
if (db_options_.storage__maximum_part_size > max_size_hash) {
return Status::IOError("db.storage.maximum-part-size cannot be greater than the maximum input size of the hash function you chose. Fix your options.");
}

if ( db_options_.compression.type != kNoCompression
&& db_options_.storage__maximum_chunk_size > compressor_.MaxInputSize()) {
return Status::IOError("db.storage.maximum-chunk-size cannot be greater than the maximum input size of the compression function you chose. Fix your options.");
&& db_options_.storage__maximum_part_size > compressor_.MaxInputSize()) {
return Status::IOError("db.storage.maximum-part-size cannot be greater than the maximum input size of the compression function you chose. Fix your options.");
}

std::unique_lock<std::mutex> lock(mutex_close_);
Expand Down Expand Up @@ -217,7 +217,7 @@ class Database: public KingDB {
return KingDB::Put(write_options, key, chunk);
}

virtual Status PutChunk(WriteOptions& write_options,
virtual Status PutPart(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk, // TODO: could the offset be handled by the method itself?
Expand Down Expand Up @@ -252,7 +252,7 @@ class Database: public KingDB {
ByteArray* value_out,
bool want_raw_data);

Status PutChunkValidSize(WriteOptions& write_options,
Status PutPartValidSize(WriteOptions& write_options,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
Expand Down

0 comments on commit 175d811

Please sign in to comment.