From 7737fa6306ce04832513d3c160eb5b9a9c602b88 Mon Sep 17 00:00:00 2001 From: Emmanuel Goossaert Date: Sat, 28 Feb 2015 01:38:29 +0100 Subject: [PATCH] Closed the refactoring of the ByteArray class. - Finished refactoring the ByteArray class by using RAII. - Renamed all usages of the temporary class, 'Kitten', into 'ByteArray'. - Replaced pointers for objects in all API calls, in particular for Iterator and Snapshot. - Implemented new Multipart API. - Move the Mmap class to 'util/file.h'. - Fixed various bugs along the way. --- algorithm/coding.h | 2 - cache/write_buffer.cc | 16 +- cache/write_buffer.h | 14 +- interface/interface.h | 47 ++- interface/iterator.h | 20 +- interface/kingdb.cc | 28 +- interface/kingdb.h | 33 +- interface/multipart.h | 18 +- interface/snapshot.h | 11 +- network/server.cc | 10 +- network/server.h | 1 - storage/hstable_manager.h | 7 +- storage/storage_engine.h | 29 +- unit-tests/client_embedded.cc | 9 +- unit-tests/test_db.cc | 25 +- util/byte_array.h | 600 +++++++++------------------------- util/byte_array_base.h | 54 --- util/file.h | 63 ++++ util/kitten.h | 314 ------------------ util/order.h | 6 +- 20 files changed, 358 insertions(+), 949 deletions(-) delete mode 100644 util/byte_array_base.h delete mode 100644 util/kitten.h diff --git a/algorithm/coding.h b/algorithm/coding.h index 099fbca..a70a77f 100644 --- a/algorithm/coding.h +++ b/algorithm/coding.h @@ -33,8 +33,6 @@ extern void PutFixed64(std::string* dst, uint64_t value); extern void PutVarint32(std::string* dst, uint32_t value); extern void PutVarint64(std::string* dst, uint64_t value); -// Standard Get... routines parse a value from the beginning of a ByteArray -// and returns the number of bytes that were read from the ByteArray. extern int GetVarint32(char* input, uint64_t size, uint32_t* value); extern int GetVarint64(char* input, uint64_t size, uint64_t* value); diff --git a/cache/write_buffer.cc b/cache/write_buffer.cc index 97f3e55..1e477d0 100644 --- a/cache/write_buffer.cc +++ b/cache/write_buffer.cc @@ -20,7 +20,7 @@ void WriteBuffer::Flush() { log::trace("WriteBuffer::Flush()", "end"); } -Status WriteBuffer::Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) { +Status WriteBuffer::Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) { // TODO: need to fix the way the value is returned here: to create a new // memory space and then return. // TODO: make sure the live buffer doesn't need to be protected by a mutex in @@ -125,15 +125,15 @@ Status WriteBuffer::Get(ReadOptions& read_options, Kitten& key, Kitten* value_ou } -Status WriteBuffer::Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) { +Status WriteBuffer::Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) { //return Write(OrderType::Put, key, value); return Status::InvalidArgument("WriteBuffer::Put() is not implemented"); } Status WriteBuffer::PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value, uint64_t size_value_compressed, @@ -150,16 +150,16 @@ Status WriteBuffer::PutChunk(WriteOptions& write_options, } -Status WriteBuffer::Delete(WriteOptions& write_options, Kitten& key) { - auto empty = Kitten::NewEmptyKitten(); +Status WriteBuffer::Delete(WriteOptions& write_options, ByteArray& key) { + auto empty = ByteArray::NewEmptyByteArray(); return WriteChunk(write_options, OrderType::Delete, key, empty, 0, 0, 0, 0); } Status WriteBuffer::WriteChunk(const WriteOptions& write_options, const OrderType& op, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value, uint64_t size_value_compressed, diff --git a/cache/write_buffer.h b/cache/write_buffer.h index 5031c69..71ed8e3 100644 --- a/cache/write_buffer.h +++ b/cache/write_buffer.h @@ -49,16 +49,16 @@ class WriteBuffer { } ~WriteBuffer() { Close(); } - Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out); - Status Put(WriteOptions& write_options, Kitten& key, Kitten& chunk); + Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out); + Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk); Status PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value, uint64_t size_value_compressed, uint32_t crc32); - Status Delete(WriteOptions& write_options, Kitten& key); + Status Delete(WriteOptions& write_options, ByteArray& key); void Flush(); void Close () { @@ -91,8 +91,8 @@ class WriteBuffer { private: Status WriteChunk(const WriteOptions& write_options, const OrderType& op, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value, uint64_t size_value_compressed, diff --git a/interface/interface.h b/interface/interface.h index 197d0f0..c962ccf 100644 --- a/interface/interface.h +++ b/interface/interface.h @@ -9,7 +9,6 @@ #include "util/status.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" namespace kdb { @@ -23,8 +22,8 @@ class Iterator { virtual void Begin() = 0; virtual bool IsValid() = 0; virtual bool Next() = 0; - virtual Kitten GetKey() = 0; - virtual Kitten GetValue() = 0; + virtual ByteArray GetKey() = 0; + virtual ByteArray GetValue() = 0; virtual MultipartReader GetMultipartValue() = 0; }; */ @@ -33,55 +32,55 @@ class Iterator { class Interface { public: virtual ~Interface() {} - virtual Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) = 0; + virtual Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) = 0; - virtual Status Get(ReadOptions& read_options, Kitten& key, std::string* value_out) { - Kitten value; + virtual Status Get(ReadOptions& read_options, ByteArray& key, std::string* value_out) { + ByteArray value; Status s = Get(read_options, key, &value); if (!s.IsOK()) return s; *value_out = value.ToString(); return s; } - virtual Status Get(ReadOptions& read_options, std::string& key, Kitten* value_out) { - Kitten kitten_key = Kitten::NewPointerKitten(key.c_str(), key.size()); - Status s = Get(read_options, kitten_key, value_out); + virtual Status Get(ReadOptions& read_options, std::string& key, ByteArray* value_out) { + ByteArray byte_array_key = ByteArray::NewPointerByteArray(key.c_str(), key.size()); + Status s = Get(read_options, byte_array_key, value_out); return s; } virtual Status Get(ReadOptions& read_options, std::string& key, std::string* value_out) { - Kitten kitten_key = Kitten::NewPointerKitten(key.c_str(), key.size()); - Kitten value; + ByteArray byte_array_key = ByteArray::NewPointerByteArray(key.c_str(), key.size()); + ByteArray value; Status s = Get(read_options, key, &value); if (!s.IsOK()) return s; *value_out = value.ToString(); return s; } - virtual Status Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) = 0; + virtual Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) = 0; - virtual Status Put(WriteOptions& write_options, Kitten& key, std::string& chunk) { - Kitten kitten_chunk = Kitten::NewDeepCopyKitten(chunk.c_str(), chunk.size()); - return Put(write_options, key, kitten_chunk); + virtual Status Put(WriteOptions& write_options, ByteArray& key, std::string& chunk) { + ByteArray byte_array_chunk = ByteArray::NewDeepCopyByteArray(chunk.c_str(), chunk.size()); + return Put(write_options, key, byte_array_chunk); } - virtual Status Put(WriteOptions& write_options, std::string& key, Kitten& chunk) { - Kitten kitten_key = Kitten::NewDeepCopyKitten(key.c_str(), key.size()); - return Put(write_options, kitten_key, chunk); + virtual Status Put(WriteOptions& write_options, std::string& key, ByteArray& chunk) { + ByteArray byte_array_key = ByteArray::NewDeepCopyByteArray(key.c_str(), key.size()); + return Put(write_options, byte_array_key, chunk); } virtual Status Put(WriteOptions& write_options, std::string& key, std::string& chunk) { - Kitten kitten_key = Kitten::NewDeepCopyKitten(key.c_str(), key.size()); - Kitten kitten_chunk = Kitten::NewDeepCopyKitten(chunk.c_str(), chunk.size()); - return Put(write_options, kitten_key, kitten_chunk); + ByteArray byte_array_key = ByteArray::NewDeepCopyByteArray(key.c_str(), key.size()); + ByteArray byte_array_chunk = ByteArray::NewDeepCopyByteArray(chunk.c_str(), chunk.size()); + return Put(write_options, byte_array_key, byte_array_chunk); } virtual Status PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value) = 0; - virtual Status Delete(WriteOptions& write_options, Kitten& key) = 0; + virtual Status Delete(WriteOptions& write_options, ByteArray& key) = 0; virtual Iterator NewIterator(ReadOptions& read_options) = 0; virtual Status Open() = 0; virtual void Close() = 0; diff --git a/interface/iterator.h b/interface/iterator.h index 7cbe266..61cb085 100644 --- a/interface/iterator.h +++ b/interface/iterator.h @@ -10,8 +10,8 @@ #include "util/status.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "util/options.h" +#include "util/file.h" #include "interface/interface.h" #include "interface/multipart.h" #include "storage/storage_engine.h" @@ -149,7 +149,7 @@ class Iterator { } // Get entry at the location - Kitten key, value; + ByteArray key, value; uint64_t location_current = locations_current_[index_location_]; Status s = se_readonly_->GetEntry(read_options_, location_current, &key, &value); if (!s.IsOK()) { @@ -161,7 +161,7 @@ class Iterator { // Get entry for the key found at the location, and continue if the // locations mismatch -- i.e. the current entry has been overwritten // by a later entry. - Kitten value_alt; + ByteArray value_alt; uint64_t location_out; s = se_readonly_->Get(read_options_, key, &value_alt, &location_out); if (!s.IsOK()) { @@ -191,17 +191,17 @@ class Iterator { return false; } - Kitten GetKey() { + ByteArray GetKey() { std::unique_lock lock(mutex_); return key_; } - Kitten GetValue() { + ByteArray GetValue() { std::unique_lock lock(mutex_); if (!value_.is_compressed()) return value_; if (value_.size() > se_readonly_->db_options_.internal__size_multipart_required) { - return Kitten(); + return ByteArray(); } // TODO-36: Uncompression should have to go through a MultipartReader. See @@ -210,7 +210,7 @@ class Iterator { uint64_t offset = 0; MultipartReader mp_reader(read_options_, value_); for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - Kitten part; + ByteArray part; mp_reader.GetPart(&part); log::trace("KingDB Get()", "Multipart loop size:%d [%s]", part.size(), part.ToString().c_str()); memcpy(buffer + offset, part.data(), part.size()); @@ -218,7 +218,7 @@ class Iterator { } status_ = mp_reader.GetStatus(); if (!status_.IsOK()) log::trace("KingDB Get()", "Error in GetValue(): %s\n", status_.ToString().c_str()); - return Kitten::NewShallowCopyKitten(buffer, value_.size()); + return ByteArray::NewShallowCopyByteArray(buffer, value_.size()); } MultipartReader GetMultipartValue() { @@ -246,8 +246,8 @@ class Iterator { Status status_; bool is_closed_; - Kitten key_; - Kitten value_; + ByteArray key_; + ByteArray value_; }; } // end namespace kdb diff --git a/interface/kingdb.cc b/interface/kingdb.cc index a2448df..2a86ef1 100644 --- a/interface/kingdb.cc +++ b/interface/kingdb.cc @@ -7,8 +7,8 @@ namespace kdb { Status KingDB::Get(ReadOptions& read_options, - Kitten& key, - Kitten* value_out, + ByteArray& key, + ByteArray* value_out, bool want_raw_data) { if (is_closed_) return Status::IOError("The database is not open"); log::trace("KingDB Get()", "[%s]", key.ToString().c_str()); @@ -47,31 +47,31 @@ Status KingDB::Get(ReadOptions& read_options, uint64_t offset = 0; MultipartReader mp_reader(read_options, *value_out); for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - Kitten part; + ByteArray part; mp_reader.GetPart(&part); log::trace("KingDB Get()", "Multipart loop size:%d [%s]", part.size(), part.ToString().c_str()); memcpy(buffer + offset, part.data(), part.size()); offset += part.size(); } - *value_out = Kitten::NewShallowCopyKitten(buffer, value_out->size()); + *value_out = ByteArray::NewShallowCopyByteArray(buffer, value_out->size()); } return s; } -Status KingDB::Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) { +Status KingDB::Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) { return Get(read_options, key, value_out, false); } -Status KingDB::Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) { +Status KingDB::Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) { return PutChunk(write_options, key, chunk, 0, chunk.size()); } Status KingDB::PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value) { if (is_closed_) return Status::IOError("The database is not open"); @@ -84,7 +84,7 @@ Status KingDB::PutChunk(WriteOptions& write_options, uint64_t size_chunk = chunk.size(); Status s; for (uint64_t offset = 0; offset < size_chunk; offset += db_options_.storage__maximum_chunk_size) { - Kitten key_new, chunk_new; + ByteArray key_new, chunk_new; if (offset + db_options_.storage__maximum_chunk_size < chunk.size()) { chunk_new = chunk; chunk_new.set_offset(offset); @@ -106,8 +106,8 @@ Status KingDB::PutChunk(WriteOptions& write_options, Status KingDB::PutChunkValidSize(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value) { if (is_closed_) return Status::IOError("The database is not open"); @@ -123,7 +123,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options, bool do_compression = true; uint64_t size_value_compressed = 0; uint64_t offset_chunk_compressed = offset_chunk; - Kitten chunk_final; + ByteArray chunk_final; bool is_first_chunk = (offset_chunk == 0); bool is_last_chunk = (chunk.size() + offset_chunk == size_value); @@ -185,7 +185,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options, } if (!s.IsOK()) return s; - Kitten chunk_compressed = Kitten::NewShallowCopyKitten(compressed, size_compressed); + ByteArray chunk_compressed = ByteArray::NewShallowCopyByteArray(compressed, size_compressed); log::trace("KingDB::PutChunkValidSize()", "[%s] (%" PRIu64 ") compressed size %" PRIu64 " - offset_chunk_compressed %" PRIu64, @@ -240,7 +240,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options, Status KingDB::Delete(WriteOptions& write_options, - Kitten& key) { + ByteArray& key) { if (is_closed_) return Status::IOError("The database is not open"); log::trace("KingDB::Delete()", "[%s]", key.ToString().c_str()); Status s = se_->FileSystemStatus(); diff --git a/interface/kingdb.h b/interface/kingdb.h index d82c132..b608451 100644 --- a/interface/kingdb.h +++ b/interface/kingdb.h @@ -23,7 +23,6 @@ #include "util/status.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "util/options.h" #include "util/file.h" #include "interface/iterator.h" @@ -157,13 +156,13 @@ class KingDB: public Interface { // be allocated, a proper error message is returned -- same for the Iterator // and Snapshot - virtual Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) override; + virtual Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) override; - virtual Status Get(ReadOptions& read_options, Kitten& key, std::string* value_out) { + virtual Status Get(ReadOptions& read_options, ByteArray& key, std::string* value_out) { return Interface::Get(read_options, key, value_out); } - virtual Status Get(ReadOptions& read_options, std::string& key, Kitten* value_out) { + virtual Status Get(ReadOptions& read_options, std::string& key, ByteArray* value_out) { return Interface::Get(read_options, key, value_out); } @@ -171,13 +170,13 @@ class KingDB: public Interface { return Interface::Get(read_options, key, value_out); } - virtual Status Put(WriteOptions& write_options, Kitten& key, Kitten& value) override; + virtual Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& value) override; - virtual Status Put(WriteOptions& write_options, Kitten& key, std::string& chunk) { + virtual Status Put(WriteOptions& write_options, ByteArray& key, std::string& chunk) { return Interface::Put(write_options, key, chunk); } - virtual Status Put(WriteOptions& write_options, std::string& key, Kitten& chunk) { + virtual Status Put(WriteOptions& write_options, std::string& key, ByteArray& chunk) { return Interface::Put(write_options, key, chunk); } @@ -186,16 +185,16 @@ class KingDB: public Interface { } virtual Status PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, // TODO: could the offset be handled by the method itself? uint64_t size_value) override; - virtual Status Delete(WriteOptions& write_options, Kitten& key) override; + virtual Status Delete(WriteOptions& write_options, ByteArray& key) override; virtual Snapshot NewSnapshot(); virtual Iterator NewIterator(ReadOptions& read_options) override; - MultipartReader NewMultipartReader(ReadOptions& read_options, Kitten& key) { - Kitten value; + MultipartReader NewMultipartReader(ReadOptions& read_options, ByteArray& key) { + ByteArray value; Status s = Get(read_options, key, &value, true); if (!s.IsOK()) { return MultipartReader(s); @@ -205,7 +204,7 @@ class KingDB: public Interface { } - MultipartWriter NewMultipartWriter(WriteOptions& write_options, Kitten& key, uint64_t size_value_total) { + MultipartWriter NewMultipartWriter(WriteOptions& write_options, ByteArray& key, uint64_t size_value_total) { return MultipartWriter(this, write_options, key, size_value_total); } @@ -214,13 +213,13 @@ class KingDB: public Interface { private: Interface* NewSnapshotPointer(); Status Get(ReadOptions& read_options, - Kitten& key, - Kitten* value_out, + ByteArray& key, + ByteArray* value_out, bool want_raw_data); Status PutChunkValidSize(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value); diff --git a/interface/multipart.h b/interface/multipart.h index 82d6021..9761f11 100644 --- a/interface/multipart.h +++ b/interface/multipart.h @@ -19,7 +19,7 @@ #include "util/logger.h" #include "util/options.h" -#include "util/kitten.h" +#include "util/byte_array.h" #include "algorithm/compressor.h" #include "algorithm/crc32c.h" #include "interface/interface.h" @@ -97,7 +97,7 @@ class MultipartReader { &frame, &size_frame); offset_output_ += size_frame; - chunk_ = Kitten::NewShallowCopyKitten(data_out, size_out); + chunk_ = ByteArray::NewShallowCopyByteArray(data_out, size_out); if (s.IsDone()) { is_valid_stream_ = false; @@ -145,7 +145,7 @@ class MultipartReader { return true; } - virtual Status GetPart(Kitten* part) { + virtual Status GetPart(ByteArray* part) { *part = chunk_; return status_; } @@ -156,7 +156,7 @@ class MultipartReader { bool is_compression_disabled_; Status status_; - Kitten chunk_; + ByteArray chunk_; bool is_valid_stream_; bool is_compressed() { @@ -177,14 +177,14 @@ class MultipartReader { MultipartReader(Status s) : status_(s) { } - MultipartReader(ReadOptions& read_options, Kitten& value) + MultipartReader(ReadOptions& read_options, ByteArray& value) : read_options_(read_options), value_(value), status_(Status::OK()) { } ReadOptions read_options_; - Kitten value_; + ByteArray value_; }; @@ -194,13 +194,13 @@ class MultipartWriter { public: ~MultipartWriter() {} - Status PutPart(Kitten& part) { + Status PutPart(ByteArray& part) { Status s = db_->PutChunk(write_options_, key_, part, offset_, size_value_total_); if (s.IsOK()) offset_ += part.size(); return s; } private: - MultipartWriter(Interface* db, WriteOptions& write_options, Kitten& key, uint64_t size_value_total) + MultipartWriter(Interface* db, WriteOptions& write_options, ByteArray& key, uint64_t size_value_total) : db_(db), write_options_(write_options), key_(key), @@ -210,7 +210,7 @@ class MultipartWriter { Interface* db_; WriteOptions write_options_; - Kitten key_; + ByteArray key_; uint64_t size_value_total_; uint64_t offset_; }; diff --git a/interface/snapshot.h b/interface/snapshot.h index 3416589..274046d 100644 --- a/interface/snapshot.h +++ b/interface/snapshot.h @@ -12,7 +12,6 @@ #include "interface/interface.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "util/options.h" namespace kdb { @@ -75,7 +74,7 @@ class Snapshot: public Interface { log::trace("Snapshot::Close()", "end"); } - virtual Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) override { + virtual Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) override { Status s = se_readonly_->Get(read_options, key, value_out); if (s.IsNotFound()) { log::trace("Snapshot::Get()", "not found in storage engine"); @@ -91,19 +90,19 @@ class Snapshot: public Interface { return s; } - virtual Status Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) override { + virtual Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) override { return Status::IOError("Not supported"); } virtual Status PutChunk(WriteOptions& write_options, - Kitten& key, - Kitten& chunk, + ByteArray& key, + ByteArray& chunk, uint64_t offset_chunk, uint64_t size_value) override { return Status::IOError("Not supported"); } - virtual Status Delete(WriteOptions& write_options, Kitten& key) override { + virtual Status Delete(WriteOptions& write_options, ByteArray& key) override { return Status::IOError("Not supported"); } diff --git a/network/server.cc b/network/server.cc index 29226e1..e16d70d 100644 --- a/network/server.cc +++ b/network/server.cc @@ -24,8 +24,8 @@ void NetworkTask::Run(std::thread::id tid, uint64_t id) { bool is_command_put = false; bool is_command_delete = false; char *buffer_send = new char[server_options_.size_buffer_send]; - Kitten buffer; - Kitten key; + ByteArray buffer; + ByteArray key; int size_key = 0; log::trace("NetworkTask", "ENTER"); // TODO-7: replace the memory allocation performed for 'key' and 'buffer' by a @@ -52,7 +52,7 @@ void NetworkTask::Run(std::thread::id tid, uint64_t id) { if (is_new_buffer) { log::trace("NetworkTask", "is_new_buffer"); bytes_received_buffer = 0; - buffer = Kitten::NewAllocatedMemoryKitten(server_options_.size_buffer_recv); + buffer = ByteArray::NewAllocatedMemoryByteArray(server_options_.size_buffer_recv); log::trace("NetworkTask", "allocated"); } @@ -169,7 +169,7 @@ void NetworkTask::Run(std::thread::id tid, uint64_t id) { } for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - kdb::Kitten part; + kdb::ByteArray part; kdb::Status s = mp_reader.GetPart(&part); if (!s.IsOK()) { log::trace("NetworkTask", "Error: MultipartReader - %s", s.ToString().c_str()); @@ -230,7 +230,7 @@ void NetworkTask::Run(std::thread::id tid, uint64_t id) { } } else if (is_command_put) { uint64_t offset_chunk; - Kitten chunk = buffer; + ByteArray chunk = buffer; if(bytes_received_total == bytes_received_buffer) { // chunk is a first chunk, need to skip all the characters before the value data diff --git a/network/server.h b/network/server.h index 611ff02..ccf6640 100644 --- a/network/server.h +++ b/network/server.h @@ -33,7 +33,6 @@ #include "thread/threadpool.h" #include "interface/kingdb.h" #include "interface/multipart.h" -#include "util/kitten.h" #include "util/byte_array.h" #include "util/options.h" #include "util/logger.h" diff --git a/storage/hstable_manager.h b/storage/hstable_manager.h index 432dd42..dbeb906 100644 --- a/storage/hstable_manager.h +++ b/storage/hstable_manager.h @@ -26,14 +26,13 @@ #include "kingdb/kdb.h" #include "util/options.h" -#include "algorithm/hash.h" #include "util/order.h" -#include "util/kitten.h" #include "util/byte_array.h" -#include "algorithm/crc32c.h" #include "util/file.h" -#include "storage/resource_manager.h" +#include "algorithm/crc32c.h" +#include "algorithm/hash.h" #include "storage/format.h" +#include "storage/resource_manager.h" namespace kdb { diff --git a/storage/storage_engine.h b/storage/storage_engine.h index e441fb0..c20a655 100644 --- a/storage/storage_engine.h +++ b/storage/storage_engine.h @@ -27,12 +27,11 @@ #include "kingdb/kdb.h" #include "util/options.h" -#include "algorithm/hash.h" #include "util/order.h" -#include "util/kitten.h" #include "util/byte_array.h" -#include "algorithm/crc32c.h" #include "util/file.h" +#include "algorithm/crc32c.h" +#include "algorithm/hash.h" #include "storage/format.h" #include "storage/resource_manager.h" #include "storage/hstable_manager.h" @@ -318,8 +317,8 @@ class StorageEngine { } Status Get(ReadOptions& read_options, - Kitten& key, - Kitten* value_out, + ByteArray& key, + ByteArray* value_out, uint64_t *location_out=nullptr) { mutex_write_.lock(); mutex_read_.lock(); @@ -357,8 +356,8 @@ class StorageEngine { // IMPORTANT: value_out must be deleled by the caller Status GetWithIndex(ReadOptions& read_options, std::multimap& index, - Kitten& key, - Kitten* value_out, + ByteArray& key, + ByteArray* value_out, uint64_t *location_out=nullptr) { //std::unique_lock lock(mutex_index_); // TODO-26: should not be locking here, instead, should store the hashed key @@ -374,7 +373,7 @@ class StorageEngine { auto rbegin = --range.second; auto rend = --range.first; for (auto it = rbegin; it != rend; --it) { - Kitten key_temp; + ByteArray key_temp; Status s = GetEntry(read_options, it->second, &key_temp, value_out); //log::trace("StorageEngine::GetWithIndex()", "key:[%s] key_temp:[%s] hashed_key:[0x%" PRIx64 "] hashed_key_temp:[0x%" PRIx64 "] size_key:[%" PRIu64 "] size_key_temp:[%" PRIu64 "]", key->ToString().c_str(), key_temp->ToString().c_str(), hashed_key, it->first, key->size(), key_temp->size()); //std::string temp(key_temp->data(), key_temp->size()); @@ -396,8 +395,8 @@ class StorageEngine { // IMPORTANT: key_out and value_out must be deleted by the caller Status GetEntry(ReadOptions& read_options, uint64_t location, - Kitten* key_out, - Kitten* value_out) { + ByteArray* key_out, + ByteArray* value_out) { log::trace("StorageEngine::GetEntry()", "start"); Status s = Status::OK(); // TODO: check that the offset falls into the @@ -414,8 +413,8 @@ class StorageEngine { log::trace("StorageEngine::GetEntry()", "location:%" PRIu64 " fileid:%u offset_file:%u filesize:%" PRIu64, location, fileid, offset_file, filesize); std::string filepath = hstable_manager_.GetFilepath(fileid); // TODO: optimize here - Kitten key_temp = Kitten::NewMmappedKitten(filepath, filesize); - Kitten value_temp = key_temp; + ByteArray key_temp = ByteArray::NewMmappedByteArray(filepath, filesize); + ByteArray value_temp = key_temp; // NOTE: verify that value_temp.size() is indeed filesize -- verified and // the size was 0: should the size of an mmapped byte array be the size of // the file by default? @@ -592,7 +591,7 @@ class StorageEngine { std::reverse(index_compaction_se.begin(), index_compaction_se.end()); ReadOptions read_options; for (auto &p: index_compaction_se) { - Kitten key, value; + ByteArray key, value; uint64_t& location = p.second; uint32_t fileid = (location & 0xFFFFFFFF00000000) >> 32; if (fileid > fileid_end_actual) { @@ -801,8 +800,8 @@ class StorageEngine { Status s = EntryHeader::DecodeFrom(db_options_, mmap->datafile() + offset, mmap->filesize() - offset, &entry_header, &size_header); log::trace("Compaction()", "order list loop - create byte arrays"); - Kitten key = Kitten::NewPointerKitten(mmap_location->datafile() + offset_file + size_header, entry_header.size_key); - Kitten chunk = Kitten::NewPointerKitten(mmap_location->datafile() + offset_file + size_header + entry_header.size_key, entry_header.size_value_used()); + ByteArray key = ByteArray::NewPointerByteArray(mmap_location->datafile() + offset_file + size_header, entry_header.size_key); + ByteArray chunk = ByteArray::NewPointerByteArray(mmap_location->datafile() + offset_file + size_header + entry_header.size_key, entry_header.size_value_used()); log::trace("Compaction()", "order list loop - push_back() orders"); // NOTE: Need to recompute the crc32 of the key and value, as entry_header.crc32 diff --git a/unit-tests/client_embedded.cc b/unit-tests/client_embedded.cc index a7ef628..e7184e7 100644 --- a/unit-tests/client_embedded.cc +++ b/unit-tests/client_embedded.cc @@ -23,7 +23,6 @@ #include "util/status.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "interface/snapshot.h" #include "interface/iterator.h" @@ -80,8 +79,8 @@ int main() { std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); for (auto i = 0; i < num_items; i++) { - kdb::Kitten key = kdb::Kitten::NewDeepCopyKitten(items[i].c_str(), items[i].size()); - kdb::Kitten value = kdb::Kitten::NewDeepCopyKitten(buffer_large, 100); + kdb::ByteArray key = kdb::ByteArray::NewDeepCopyByteArray(items[i].c_str(), items[i].size()); + kdb::ByteArray value = kdb::ByteArray::NewDeepCopyByteArray(buffer_large, 100); kdb::Status s = db.PutChunk(write_options, key, value, @@ -98,8 +97,8 @@ int main() { auto count_items = 0; for (iterator.Begin(); iterator.IsValid(); iterator.Next()) { - kdb::Kitten key = iterator.GetKey(); - kdb::Kitten value = iterator.GetValue(); + kdb::ByteArray key = iterator.GetKey(); + kdb::ByteArray value = iterator.GetValue(); count_items += 1; } diff --git a/unit-tests/test_db.cc b/unit-tests/test_db.cc index e2c5315..84cc1f6 100644 --- a/unit-tests/test_db.cc +++ b/unit-tests/test_db.cc @@ -24,7 +24,6 @@ #include "util/status.h" #include "util/order.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "util/file.h" #include "interface/snapshot.h" @@ -276,8 +275,8 @@ TEST(DBTest, KeysWithNullBytes) { int num_count_valid = 0; - //kdb::Kitten kitten = kdb::Kitten::NewDeepCopyKitten("blahblah", 8); - //fprintf(stderr, "kitten: %s\n", kitten.ToString().c_str()); + //kdb::ByteArray byte_array = kdb::ByteArray::NewDeepCopyByteArray("blahblah", 8); + //fprintf(stderr, "byte_array: %s\n", byte_array.ToString().c_str()); std::string key1("000000000000key1"); std::string key2("000000000000key2"); @@ -335,10 +334,10 @@ TEST(DBTest, SingleThreadSmallEntries) { for (auto i = 0; i < num_items; i++) { std::string key_str = kg->GetKey(0, i, 16); - kdb::Kitten key = kdb::Kitten::NewDeepCopyKitten(key_str.c_str(), key_str.size()); + kdb::ByteArray key = kdb::ByteArray::NewDeepCopyByteArray(key_str.c_str(), key_str.size()); data_generator_->GenerateData(buffer_large, 100); - kdb::Kitten value = kdb::Kitten::NewDeepCopyKitten(buffer_large, 100); + kdb::ByteArray value = kdb::ByteArray::NewDeepCopyByteArray(buffer_large, 100); kdb::Status s = db_->Put(write_options_, key, value); if (!s.IsOK()) { @@ -359,7 +358,7 @@ TEST(DBTest, SingleThreadSmallEntries) { kdb::MultipartReader mp_reader = iterator.GetMultipartValue(); for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - kdb::Kitten part; + kdb::ByteArray part; kdb::Status s = mp_reader.GetPart(&part); } @@ -398,10 +397,10 @@ TEST(DBTest, SingleThreadSnapshot) { for (auto i = 0; i < num_items; i++) { std::string key_str = kg->GetKey(0, i, 16); - kdb::Kitten key = kdb::Kitten::NewDeepCopyKitten(key_str.c_str(), key_str.size()); + kdb::ByteArray key = kdb::ByteArray::NewDeepCopyByteArray(key_str.c_str(), key_str.size()); data_generator_->GenerateData(buffer_large, 100); - kdb::Kitten value = kdb::Kitten::NewDeepCopyKitten(buffer_large, 100); + kdb::ByteArray value = kdb::ByteArray::NewDeepCopyByteArray(buffer_large, 100); kdb::Status s = db_->Put(write_options_, key, value); if (!s.IsOK()) { @@ -423,7 +422,7 @@ TEST(DBTest, SingleThreadSnapshot) { kdb::MultipartReader mp_reader = iterator.GetMultipartValue(); for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - kdb::Kitten part; + kdb::ByteArray part; kdb::Status s = mp_reader.GetPart(&part); } @@ -471,7 +470,7 @@ TEST(DBTest, SingleThreadSingleLargeEntry) { //char buffer_full[total_size]; //usleep(10 * 1000000); - kdb::Kitten key = kdb::Kitten::NewDeepCopyKitten(key_str.c_str(), key_str.size()); + kdb::ByteArray key = kdb::ByteArray::NewDeepCopyByteArray(key_str.c_str(), key_str.size()); kdb::MultipartWriter mp_writer = db_->NewMultipartWriter(write_options_, key, total_size); int fd = open("/tmp/kingdb-input", O_WRONLY|O_CREAT|O_TRUNC, 0644); @@ -488,7 +487,7 @@ TEST(DBTest, SingleThreadSingleLargeEntry) { size_current = total_size - i; } - kdb::Kitten value = kdb::Kitten::NewDeepCopyKitten(buffer, size_current); + kdb::ByteArray value = kdb::ByteArray::NewDeepCopyByteArray(buffer, size_current); s = mp_writer.PutPart(value); write(fd, buffer, size_current); @@ -503,13 +502,13 @@ TEST(DBTest, SingleThreadSingleLargeEntry) { usleep(4 * 1000000); kdb::MultipartReader mp_reader = db_->NewMultipartReader(read_options_, key); - Kitten value_out; + ByteArray value_out; uint64_t bytes_read = 0; int fd_output = open("/tmp/kingdb-output", O_WRONLY|O_CREAT|O_TRUNC, 0644); for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) { - Kitten part; + ByteArray part; Status s = mp_reader.GetPart(&part); if (write(fd_output, part.data(), part.size()) < 0) { fprintf(stderr, "ClientEmbedded - Couldn't write to output file: [%s]\n", strerror(errno)); diff --git a/util/byte_array.h b/util/byte_array.h index a523127..c50161e 100644 --- a/util/byte_array.h +++ b/util/byte_array.h @@ -17,516 +17,242 @@ #include #include +#include "util/file.h" #include "util/logger.h" +#include "util/logger.h" +#include "util/options.h" #include "algorithm/compressor.h" #include "algorithm/crc32c.h" -#include "util/byte_array_base.h" namespace kdb { -class Stream { +class ByteArrayResource { public: - Stream(ByteArray* byte_array) - : byte_array_(byte_array), - chunk_current_(nullptr) { - - } - - ~Stream() { - if (chunk_current_ != nullptr) { - delete chunk_current_; - chunk_current_ = nullptr; - } - } - - private: - ByteArray* byte_array_; - ByteArray* chunk_current_; + ByteArrayResource() {} + virtual ~ByteArrayResource() {} + virtual char* data() = 0; + virtual const char* data_const() = 0; + virtual uint64_t size() = 0; + virtual const uint64_t size_const() = 0; + virtual uint64_t size_compressed() = 0; + virtual uint64_t const size_compressed_const() = 0; }; - -class ByteArrayCommon: public ByteArray { +class MmappedByteArrayResource: public ByteArrayResource { + friend class ByteArray; public: - ByteArrayCommon() - : data_(nullptr), - chunk_(nullptr), - size_(0), - size_compressed_(0), - off_(0), - crc32_value_(0) - { - } - virtual ~ByteArrayCommon() { - if (chunk_ != nullptr) delete chunk_; - chunk_ = nullptr; - } - virtual char* data() { return data_ + off_; } - virtual char* data_const() const { return data_ + off_; } - virtual uint64_t size() { return size_ - off_; } - virtual uint64_t size_const() const { return size_ - off_; } - virtual uint64_t size_compressed() { return size_compressed_ - off_; } - virtual uint64_t size_compressed_const() const { return size_compressed_ - off_; } - virtual uint32_t checksum() const { return crc32_value_; } - virtual uint64_t offset() const { return off_; } - virtual bool is_compressed() { return size_compressed_ > 0; } - virtual void SetSizes(uint64_t size, uint64_t size_compressed) { - size_ = size; - size_compressed_ = size_compressed; - } - - virtual bool StartsWith(const char *substr, int n) { - return (n <= size_ && strncmp(data_, substr, n) == 0); - } - - virtual void set_offset(int off) { - off_ = off; - } - - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) = 0; - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) = 0; - - /* - char& operator[](std::size_t index) { - return data_[index]; - }; - - char operator[](std::size_t index) const { - return data_[index]; - }; - */ - - virtual std::string ToString() { - return std::string(data(), size()); - } - - virtual void SetSizeCompressed(uint64_t s) { size_compressed_ = s; } - virtual void SetCRC32(uint64_t c) { crc32_value_ = c; } - - bool IsStreamingRequired() { return false; } - - Stream* GetNewStream(uint64_t advised_chunk_size) { - return nullptr; - } - - // Streaming API - START - - void EnableChecksumVerification() { - is_enabled_checksum_verification_ = true; - } - - void SetInitialCRC32(uint32_t c32) { - log::debug("SetInitialCRC32()", "Initial CRC32 0x%08" PRIx64 "\n", c32); - if (is_enabled_checksum_verification_) { - initial_crc32_ = c32; - crc32_.put(c32); - } - } - - // TODO: I think there is a bug here -- some subclass may later try to - // call delete on data_, which will fail if it was shifted. - void SetOffset(uint64_t offset, uint64_t size) { - offset_ = offset; - data_ = data_ + offset; - size_ = size; - } - - - Status status_; - ByteArray* chunk_; - bool is_valid_stream_; - - virtual void Begin() { - if (is_enabled_checksum_verification_) { - crc32_.ResetThreadLocalStorage(); - crc32_.put(initial_crc32_); - } - if (chunk_ != nullptr) delete chunk_; - chunk_ = nullptr; - is_valid_stream_ = true; - is_compression_disabled_ = false; - offset_output_ = 0; - compressor_.ResetThreadLocalStorage(); - Next(); - status_ = Status::IOError("Steam is unfinished"); - } - - virtual bool IsValid() { - return is_valid_stream_; - } - - virtual Status GetStatus() { - return status_; + virtual ~MmappedByteArrayResource() { + //fprintf(stderr, "MmappedByteArrayResource::dtor()\n"); } - // Careful here: if the call to Next() is the first one, i.e. the one in - // Begin(), then is_valid_stream_ must not be set to false yet, otherwise - // the for-loops of type for(Begin(); IsValid(); Next()) would never run, - // as IsValid() would prevent the first iteration to start. - virtual bool Next() { - if (is_compressed() && !is_compression_disabled_) { - - if (compressor_.IsUncompressionDone(size_compressed_)) { - is_valid_stream_ = false; - if ( !is_enabled_checksum_verification_ - || crc32_.get() == crc32_value_) { - log::debug("SharedMmappedByteArray::Next()", "Good CRC32 - stored:0x%08" PRIx64 " computed:0x%08" PRIx64 "\n", crc32_value_, crc32_.get()); - status_ = Status::OK(); - } else { - log::debug("SharedMmappedByteArray::Next()", "Bad CRC32 - stored:0x%08" PRIx64 " computed:0x%08" PRIx64 "\n", crc32_value_, crc32_.get()); - status_ = Status::IOError("Invalid checksum."); - } - return false; - } - - if (compressor_.HasFrameHeaderDisabledCompression(data_ + offset_output_)) { - log::debug("SharedMmappedByteArray::Next()", "Finds that compression is disabled\n"); - is_compression_disabled_ = true; - if (is_enabled_checksum_verification_) { - crc32_.stream(data_ + offset_output_, compressor_.size_frame_header()); - } - offset_output_ += compressor_.size_frame_header(); - } - - if (!is_compression_disabled_) { - char *frame; - uint64_t size_frame; - - char *data_out; - uint64_t size_out; - - log::trace("SharedMmappedByteArray::Next()", "before uncompress"); - Status s = compressor_.Uncompress(data_, - size_compressed_, - &data_out, - &size_out, - &frame, - &size_frame); - offset_output_ += size_frame; - - if (chunk_ != nullptr) delete chunk_; - //chunk_ = new SharedAllocatedByteArray(data_out, size_out); - chunk_ = NewByteArrayChunk(data_out, size_out); - - if (s.IsDone()) { - is_valid_stream_ = false; - status_ = Status::OK(); - } else if (s.IsOK()) { - if (is_enabled_checksum_verification_) { - crc32_.stream(frame, size_frame); - } - } else { - is_valid_stream_ = false; - status_ = s; - } - } - } + virtual char* data() { return data_; } + virtual const char* data_const() { return data_; } + virtual uint64_t size() { return size_; } + virtual const uint64_t size_const() { return size_; } + virtual uint64_t size_compressed() { return size_compressed_; } + virtual const uint64_t size_compressed_const() { return size_compressed_; } - if (!is_compressed() || is_compression_disabled_) { - uint64_t size_left; - if (is_compressed() && is_compression_disabled_) { - size_left = size_compressed_; - } else { - size_left = size_; - } - - if (offset_output_ == size_left) { - is_valid_stream_ = false; - status_ = Status::OK(); - return false; - } - - char* data_left = data_ + offset_output_; - - size_t step = 1024*1024; - size_t size_current = offset_output_ + step < size_left ? step : size_left - offset_output_; - if (is_enabled_checksum_verification_) { - crc32_.stream(data_left, size_current); - } - - auto chunk = NewByteArrayClone(offset_output_, size_current); - - if (chunk_ != nullptr) delete chunk_; - chunk_ = chunk; - offset_output_ += size_current; - status_ = Status::Done(); + private: + MmappedByteArrayResource(std::string& filepath, uint64_t filesize) + : data_(nullptr), + size_(0), + size_compressed_(0), + mmap_(filepath, filesize) { + if (mmap_.is_valid()) { + data_ = mmap_.datafile(); + size_ = mmap_.filesize(); } - return true; - } - // Streaming API - END - - virtual ByteArray* GetChunk() { - return chunk_; + //fprintf(stderr, "MmappedByteArrayResource::ctor()\n"); } - CompressorLZ4 compressor_; - uint32_t initial_crc32_; - CRC32 crc32_; - uint64_t offset_; - uint64_t offset_output_; - bool is_compression_disabled_; - bool is_enabled_checksum_verification_; - - - - - + Mmap mmap_; char *data_; uint64_t size_; uint64_t size_compressed_; - uint64_t off_; - uint32_t crc32_value_; }; - -class SimpleByteArray: public ByteArrayCommon { +class AllocatedByteArrayResource: public ByteArrayResource { + friend class ByteArray; public: - SimpleByteArray(const char* data_in, uint64_t size_in) { - data_ = const_cast(data_in); - size_ = size_in; + virtual ~AllocatedByteArrayResource() { + //fprintf(stderr, "AllocatedByteArrayResource::dtor()\n"); + delete[] data_; } - void AddOffset(int offset) { - data_ += offset; - size_ -= offset; - } + virtual char* data() { return data_; } + virtual const char* data_const() { return data_; } + virtual uint64_t size() { return size_; } + virtual const uint64_t size_const() { return size_; } + virtual uint64_t size_compressed() { return size_compressed_; } + virtual const uint64_t size_compressed_const() { return size_compressed_; } - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) { - return new SimpleByteArray(data_out, size_out); + private: + AllocatedByteArrayResource(char *data, uint64_t size, bool deep_copy) + : data_(nullptr), + size_(0), + size_compressed_(0) { + if (deep_copy) { + size_ = size; + data_ = new char[size_]; + memcpy(data_, data, size_); + } else { + size_ = size; + data_ = data; + } + //fprintf(stderr, "AllocatedByteArrayResource::ctor()\n"); } - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) { - return new SimpleByteArray(data_ + offset, size); + AllocatedByteArrayResource(uint64_t size) + : data_(nullptr), + size_(0), + size_compressed_(0) { + size_ = size; + data_ = new char[size_]; } - virtual ~SimpleByteArray() { - //log::trace("SimpleByteArray::dtor()", ""); - } + char *data_; + uint64_t size_; + uint64_t size_compressed_; }; -// Like a smart pointer but for Byte Arrays -class SmartByteArray: public ByteArrayCommon { +class PointerByteArrayResource: public ByteArrayResource { + friend class ByteArray; public: - SmartByteArray(ByteArray* ba, const char* data_in, uint64_t size_in) { - ba_ = ba; - data_ = const_cast(data_in); - size_ = size_in; - } - - virtual ~SmartByteArray() { - //log::trace("SmartByteArray::dtor()", ""); - delete ba_; - } - - void AddOffset(int offset) { - data_ += offset; - size_ -= offset; - } - - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) { - return ba_->NewByteArrayChunk(data_out, size_out); - } - - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) { - return ba_->NewByteArrayClone(offset, size); - } + virtual ~PointerByteArrayResource() {} + virtual char* data() { return const_cast(data_); } + virtual const char* data_const() { return data_; } + virtual uint64_t size() { return size_; } + virtual const uint64_t size_const() { return size_; } + virtual uint64_t size_compressed() { return size_compressed_; } + virtual const uint64_t size_compressed_const() { return size_compressed_; } private: - ByteArray* ba_; - -}; - - -class AllocatedByteArray: public ByteArrayCommon { - public: - AllocatedByteArray(const char* data_in, uint64_t size_in) { - size_ = size_in; - data_ = new char[size_]; - memcpy(data_, data_in, size_); + PointerByteArrayResource(const char *data, uint64_t size) + : size_(size), + size_compressed_(0), + data_(data) { } - AllocatedByteArray(uint64_t size_in) { - size_ = size_in; - data_ = new char[size_+1]; - } + const char *data_; + uint64_t size_; + uint64_t size_compressed_; +}; - virtual ~AllocatedByteArray() { - delete[] data_; - } - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) { - return new AllocatedByteArray(data_out, size_out); - } - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) { - uint64_t size_max = std::max(size_, size_compressed_); - AllocatedByteArray* ba = new AllocatedByteArray(data_, size_max); - ba->size_ = size_; - ba->SetOffset(offset, size); - ba->off_ = this->offset(); - ba->crc32_value_ = checksum(); - ba->size_compressed_ = size_compressed(); - return ba; - } -}; -class SharedAllocatedByteArray: public ByteArrayCommon { +class ByteArray { + // TODO: what is happenning when a ByteArray is assigned to another ByteArray? + friend class MultipartReader; + friend class StorageEngine; + friend class KingDB; + friend class WriteBuffer; + friend class Iterator; + friend class NetworkTask; public: - SharedAllocatedByteArray() {} - - SharedAllocatedByteArray(char *data, uint64_t size_in) { - data_allocated_ = std::shared_ptr(data, [](char *p) { delete[] p; }); - data_ = data_allocated_.get(); - size_ = size_in; + ByteArray() + : size_(0), + size_compressed_(0), + offset_(0), + checksum_(0), + checksum_initial_(0) { } - SharedAllocatedByteArray(uint64_t size_in) { - data_allocated_ = std::shared_ptr(new char[size_in], [](char *p) { delete[] p; }); - data_ = data_allocated_.get(); - size_ = size_in; + virtual ~ByteArray() { } - virtual ~SharedAllocatedByteArray() { - //log::trace("SharedAllocatedByteArray::dtor()", ""); - } + virtual char* data() { return resource_->data() + offset_; } + virtual const char* data_const() const { return resource_->data_const() + offset_; } + virtual uint64_t size() { return size_; } + virtual const uint64_t size_const() const { return size_; } - void AddSize(int add) { - size_ += add; - } - - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) { - return new AllocatedByteArray(data_out, size_out); + virtual std::string ToString() { + return std::string(data(), size()); } - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) { - SharedAllocatedByteArray* ba = new SharedAllocatedByteArray(); - *ba = *this; - ba->SetOffset(offset, size); - ba->chunk_ = nullptr; - return ba; + static ByteArray NewShallowCopyByteArray(char* data, uint64_t size) { + ByteArray byte_array; + byte_array.resource_ = std::shared_ptr(new AllocatedByteArrayResource(data, size, false)); + byte_array.size_ = size; + return byte_array; } - private: - std::shared_ptr data_allocated_; - uint64_t offset_; - -}; - - - - - - - -// TODO: move to file.h -class Mmap { - public: - Mmap(std::string filepath, int64_t filesize) - : filepath_(filepath), - filesize_(filesize), - is_valid_(false) { - if ((fd_ = open(filepath.c_str(), O_RDONLY)) < 0) { - log::emerg("Mmap()::ctor()", "Could not open file [%s]: %s", filepath.c_str(), strerror(errno)); - return; - } - - log::trace("Mmap::ctor()", "open file: ok"); - - datafile_ = static_cast(mmap(0, - filesize, - PROT_READ, - MAP_SHARED, - fd_, - 0)); - if (datafile_ == MAP_FAILED) { - log::emerg("Could not mmap() file [%s]: %s", filepath.c_str(), strerror(errno)); - return; - } - - is_valid_ = true; + static ByteArray NewDeepCopyByteArray(const char* data, uint64_t size) { + char* data_non_const = const_cast(data); + ByteArray byte_array; + byte_array.resource_ = std::shared_ptr(new AllocatedByteArrayResource(data_non_const, size, true)); + byte_array.size_ = size; + return byte_array; } - virtual ~Mmap() { - Close(); + static ByteArray NewDeepCopyByteArray(std::string& str) { + return NewDeepCopyByteArray(str.c_str(), str.size()); } - void Close() { - if (datafile_ != nullptr) { - munmap(datafile_, filesize_); - close(fd_); - datafile_ = nullptr; - log::debug("Mmap::~Mmap()", "released mmap on file: [%s]", filepath_.c_str()); - } + static ByteArray NewAllocatedMemoryByteArray(uint64_t size) { + ByteArray byte_array; + byte_array.resource_ = std::shared_ptr(new AllocatedByteArrayResource(size)); + byte_array.size_ = size; + return byte_array; } - char* datafile() { return datafile_; } - int64_t filesize() { return filesize_; } - bool is_valid_; - bool is_valid() { return is_valid_; } - - int fd_; - int64_t filesize_; - char *datafile_; - - // For debugging - const char* filepath() const { return filepath_.c_str(); } - std::string filepath_; -}; - - -class SharedMmappedByteArray: public ByteArrayCommon { - // TODO: Does the checksum and compressor here really need to be store in a - // thread local storage? It would be way simpler to have this state held within - // the ByteArray object. - public: - SharedMmappedByteArray() {} - SharedMmappedByteArray(std::string filepath, int64_t filesize) { - mmap_ = std::shared_ptr(new Mmap(filepath, filesize)); - data_ = mmap_->datafile(); - size_ = 0; - is_compression_disabled_ = false; - is_enabled_checksum_verification_ = false; - offset_output_ = 0; - compressor_.ResetThreadLocalStorage(); - crc32_.ResetThreadLocalStorage(); + static ByteArray NewMmappedByteArray(std::string& filepath, uint64_t filesize) { + ByteArray byte_array; + byte_array.resource_ = std::shared_ptr(new MmappedByteArrayResource(filepath, filesize)); + byte_array.size_ = filesize; + return byte_array; } - SharedMmappedByteArray(char *data, uint64_t size) { - data_ = data; - size_ = size; - compressor_.ResetThreadLocalStorage(); - crc32_.ResetThreadLocalStorage(); + static ByteArray NewReferenceByteArray(ByteArray& byte_array_in) { + // TODO: make this the =operator() + ByteArray byte_array = byte_array_in; + return byte_array; } - virtual ~SharedMmappedByteArray() {} - - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) { - return new AllocatedByteArray(data_out, size_out); + static ByteArray NewPointerByteArray(const char* data, uint64_t size) { + ByteArray byte_array; + byte_array.resource_ = std::shared_ptr(new PointerByteArrayResource(data, size)); + byte_array.size_ = size; + return byte_array; } - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) { - SharedMmappedByteArray* ba = new SharedMmappedByteArray(); - *ba = *this; - ba->chunk_ = nullptr; - ba->SetOffset(offset, size); - return ba; + static ByteArray NewEmptyByteArray() { + return ByteArray(); } - void AddSize(int add) { - size_ += add; + bool operator ==(const ByteArray &right) const { + return ( size_const() == right.size_const() + && memcmp(data_const(), right.data_const(), size_const()) == 0); } - char* datafile() { return mmap_->datafile(); }; - std::shared_ptr mmap_; + private: + virtual uint64_t size_compressed() { return size_compressed_; } + virtual uint64_t size_compressed_const() const { return size_compressed_; } + virtual void set_size(uint64_t s) { size_ = s; } + virtual void set_size_compressed(uint64_t s) { size_compressed_ = s; } + virtual uint64_t is_compressed() { return (size_compressed_ != 0); } + virtual void set_offset(uint64_t o) { offset_ = o; } + virtual void increment_offset(uint64_t inc) { offset_ += inc; } + + virtual uint32_t checksum() { return checksum_; } + virtual uint32_t checksum_initial() { return checksum_initial_; } + virtual void set_checksum(uint32_t c) { checksum_ = c; } + virtual void set_checksum_initial(uint32_t c) { checksum_initial_ = c; } + + std::shared_ptr resource_; + uint64_t size_; + uint64_t size_compressed_; + uint64_t offset_; + uint32_t checksum_; // checksum for value_; + uint32_t checksum_initial_; // initial checksum for value_ }; - - -} +} // namespace kdb #endif // KINGDB_BYTE_ARRAY_H_ diff --git a/util/byte_array_base.h b/util/byte_array_base.h deleted file mode 100644 index c6b1c18..0000000 --- a/util/byte_array_base.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2014, Emmanuel Goossaert. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause License, -// that can be found in the LICENSE file. - -#ifndef KINGDB_BYTE_ARRAY_BASE_H_ -#define KINGDB_BYTE_ARRAY_BASE_H_ - -#include "util/debug.h" -#include "util/status.h" - -namespace kdb { - -class ByteArray { - public: - ByteArray() {} - virtual ~ByteArray() {} - virtual std::string ToString() = 0; - virtual char* data() = 0; - virtual char* data_const() const = 0; - virtual uint64_t size() = 0; - virtual uint64_t size_const() const = 0; - virtual uint64_t size_compressed() = 0; - virtual uint64_t size_compressed_const() const = 0; - virtual uint32_t checksum() const = 0; - virtual uint64_t offset() const = 0; - virtual void set_offset(int off) = 0; - virtual bool is_compressed() = 0; - virtual void SetOffset(uint64_t offset, uint64_t size) = 0; - virtual bool StartsWith(const char *substr, int n) = 0; - virtual void SetSizes(uint64_t size, uint64_t size_compressed) = 0; - - // ByteArray management - virtual ByteArray* NewByteArrayChunk(char* data_out, uint64_t size_out) = 0; - virtual ByteArray* NewByteArrayClone(uint64_t offset, uint64_t size) = 0; - - // Streaming API - virtual void Begin() = 0; - virtual bool IsValid() = 0; - virtual bool Next() = 0; - virtual ByteArray* GetChunk() = 0; - virtual Status GetStatus() = 0; - - bool operator ==(const ByteArray &right) const { - //fprintf(stderr, "ByteArray operator==() -- left: %p %" PRIu64 " [%s] right: %p %" PRIu64 " [%s]\n", data_, size_, std::string(data_, size_).c_str(), right.data_const(), right.size_const(), std::string(right.data_const(), right.size_const()).c_str()); - return ( size_const() == right.size_const() - && memcmp(data_const(), right.data_const(), size_const()) == 0); - } - -}; - -}; - -#endif // KINGDB_BYTE_ARRAY_BASE_H_ - diff --git a/util/file.h b/util/file.h index 8f5114a..970416e 100644 --- a/util/file.h +++ b/util/file.h @@ -9,8 +9,13 @@ #include #include +#include +#include +#include #include #include +#include +#include #include "util/status.h" @@ -143,6 +148,64 @@ class FileUtil { } }; + +class Mmap { + public: + Mmap(std::string filepath, int64_t filesize) + : filepath_(filepath), + filesize_(filesize), + is_valid_(false) { + if ((fd_ = open(filepath.c_str(), O_RDONLY)) < 0) { + log::emerg("Mmap()::ctor()", "Could not open file [%s]: %s", filepath.c_str(), strerror(errno)); + return; + } + + log::trace("Mmap::ctor()", "open file: ok"); + + datafile_ = static_cast(mmap(0, + filesize, + PROT_READ, + MAP_SHARED, + fd_, + 0)); + if (datafile_ == MAP_FAILED) { + log::emerg("Could not mmap() file [%s]: %s", filepath.c_str(), strerror(errno)); + return; + } + + is_valid_ = true; + } + + virtual ~Mmap() { + Close(); + } + + void Close() { + if (datafile_ != nullptr) { + munmap(datafile_, filesize_); + close(fd_); + datafile_ = nullptr; + log::debug("Mmap::~Mmap()", "released mmap on file: [%s]", filepath_.c_str()); + } + } + + char* datafile() { return datafile_; } + int64_t filesize() { return filesize_; } + bool is_valid_; + bool is_valid() { return is_valid_; } + + int fd_; + int64_t filesize_; + char *datafile_; + + // For debugging + const char* filepath() const { return filepath_.c_str(); } + std::string filepath_; +}; + + + + } // namespace kdb #endif // KINGDB_FILE_H_ diff --git a/util/kitten.h b/util/kitten.h deleted file mode 100644 index 2e1a9d5..0000000 --- a/util/kitten.h +++ /dev/null @@ -1,314 +0,0 @@ -// Copyright (c) 2014, Emmanuel Goossaert. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause License, -// that can be found in the LICENSE file. - -#ifndef KINGDB_KITTEN_H_ -#define KINGDB_KITTEN_H_ - -#include "util/debug.h" -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "util/logger.h" -#include "util/options.h" -#include "algorithm/compressor.h" -#include "algorithm/crc32c.h" - - -namespace kdb { - -// TODO: move to file.h -class KittenMmap { - public: - KittenMmap(std::string filepath, int64_t filesize) - : filepath_(filepath), - filesize_(filesize), - is_valid_(false) { - if ((fd_ = open(filepath.c_str(), O_RDONLY)) < 0) { - log::emerg("KittenMmap()::ctor()", "Could not open file [%s]: %s", filepath.c_str(), strerror(errno)); - return; - } - - log::trace("KittenMmap::ctor()", "open file: ok"); - - datafile_ = static_cast(mmap(0, - filesize, - PROT_READ, - MAP_SHARED, - fd_, - 0)); - if (datafile_ == MAP_FAILED) { - log::emerg("Could not mmap() file [%s]: %s", filepath.c_str(), strerror(errno)); - return; - } - - is_valid_ = true; - } - - virtual ~KittenMmap() { - Close(); - } - - void Close() { - if (datafile_ != nullptr) { - munmap(datafile_, filesize_); - close(fd_); - datafile_ = nullptr; - log::debug("KittenMmap::~KittenMmap()", "released mmap on file: [%s]", filepath_.c_str()); - } - } - - char* datafile() { return datafile_; } - int64_t filesize() { return filesize_; } - bool is_valid_; - bool is_valid() { return is_valid_; } - - int fd_; - int64_t filesize_; - char *datafile_; - - // For debugging - const char* filepath() const { return filepath_.c_str(); } - std::string filepath_; -}; - - - - -class KittenResource { - public: - KittenResource() {} - virtual ~KittenResource() {} - virtual char* data() = 0; - virtual const char* data_const() = 0; - virtual uint64_t size() = 0; - virtual const uint64_t size_const() = 0; - virtual uint64_t size_compressed() = 0; - virtual uint64_t const size_compressed_const() = 0; -}; - -class MmappedKittenResource: public KittenResource { - friend class Kitten; - public: - virtual ~MmappedKittenResource() { - //fprintf(stderr, "MmappedKittenResource::dtor()\n"); - } - - virtual char* data() { return data_; } - virtual const char* data_const() { return data_; } - virtual uint64_t size() { return size_; } - virtual const uint64_t size_const() { return size_; } - virtual uint64_t size_compressed() { return size_compressed_; } - virtual const uint64_t size_compressed_const() { return size_compressed_; } - - private: - MmappedKittenResource(std::string& filepath, uint64_t filesize) - : data_(nullptr), - size_(0), - size_compressed_(0), - mmap_(filepath, filesize) { - if (mmap_.is_valid()) { - data_ = mmap_.datafile(); - size_ = mmap_.filesize(); - } - //fprintf(stderr, "MmappedKittenResource::ctor()\n"); - } - - KittenMmap mmap_; - char *data_; - uint64_t size_; - uint64_t size_compressed_; -}; - - -class AllocatedKittenResource: public KittenResource { - friend class Kitten; - public: - virtual ~AllocatedKittenResource() { - //fprintf(stderr, "AllocatedKittenResource::dtor()\n"); - delete[] data_; - } - - virtual char* data() { return data_; } - virtual const char* data_const() { return data_; } - virtual uint64_t size() { return size_; } - virtual const uint64_t size_const() { return size_; } - virtual uint64_t size_compressed() { return size_compressed_; } - virtual const uint64_t size_compressed_const() { return size_compressed_; } - - private: - AllocatedKittenResource(char *data, uint64_t size, bool deep_copy) - : data_(nullptr), - size_(0), - size_compressed_(0) { - if (deep_copy) { - size_ = size; - data_ = new char[size_]; - memcpy(data_, data, size_); - } else { - size_ = size; - data_ = data; - } - //fprintf(stderr, "AllocatedKittenResource::ctor()\n"); - } - - AllocatedKittenResource(uint64_t size) - : data_(nullptr), - size_(0), - size_compressed_(0) { - size_ = size; - data_ = new char[size_]; - } - - char *data_; - uint64_t size_; - uint64_t size_compressed_; -}; - - -class PointerKittenResource: public KittenResource { - friend class Kitten; - public: - virtual ~PointerKittenResource() {} - - virtual char* data() { return const_cast(data_); } - virtual const char* data_const() { return data_; } - virtual uint64_t size() { return size_; } - virtual const uint64_t size_const() { return size_; } - virtual uint64_t size_compressed() { return size_compressed_; } - virtual const uint64_t size_compressed_const() { return size_compressed_; } - - private: - PointerKittenResource(const char *data, uint64_t size) - : size_(size), - size_compressed_(0), - data_(data) { - } - - const char *data_; - uint64_t size_; - uint64_t size_compressed_; -}; - - - - - -class Kitten { - // TODO: what is happenning when a Kitten is assigned to another Kitten? - friend class MultipartReader; - friend class StorageEngine; - friend class KingDB; - friend class WriteBuffer; - friend class Iterator; - friend class NetworkTask; - public: - Kitten() - : size_(0), - size_compressed_(0), - offset_(0), - checksum_(0), - checksum_initial_(0) { - } - - virtual ~Kitten() { - } - - virtual char* data() { return resource_->data() + offset_; } - virtual const char* data_const() const { return resource_->data_const() + offset_; } - virtual uint64_t size() { return size_; } - virtual const uint64_t size_const() const { return size_; } - - virtual std::string ToString() { - return std::string(data(), size()); - } - - static Kitten NewShallowCopyKitten(char* data, uint64_t size) { - Kitten kitten; - kitten.resource_ = std::shared_ptr(new AllocatedKittenResource(data, size, false)); - kitten.size_ = size; - return kitten; - } - - static Kitten NewDeepCopyKitten(const char* data, uint64_t size) { - char* data_non_const = const_cast(data); - Kitten kitten; - kitten.resource_ = std::shared_ptr(new AllocatedKittenResource(data_non_const, size, true)); - kitten.size_ = size; - return kitten; - } - - static Kitten NewDeepCopyKitten(std::string& str) { - return NewDeepCopyKitten(str.c_str(), str.size()); - } - - static Kitten NewAllocatedMemoryKitten(uint64_t size) { - Kitten kitten; - kitten.resource_ = std::shared_ptr(new AllocatedKittenResource(size)); - kitten.size_ = size; - return kitten; - } - - static Kitten NewMmappedKitten(std::string& filepath, uint64_t filesize) { - Kitten kitten; - kitten.resource_ = std::shared_ptr(new MmappedKittenResource(filepath, filesize)); - kitten.size_ = filesize; - return kitten; - } - - static Kitten NewReferenceKitten(Kitten& kitten_in) { - // TODO: make this the =operator() - Kitten kitten = kitten_in; - return kitten; - } - - static Kitten NewPointerKitten(const char* data, uint64_t size) { - Kitten kitten; - kitten.resource_ = std::shared_ptr(new PointerKittenResource(data, size)); - kitten.size_ = size; - return kitten; - } - - static Kitten NewEmptyKitten() { - return Kitten(); - } - - bool operator ==(const Kitten &right) const { - return ( size_const() == right.size_const() - && memcmp(data_const(), right.data_const(), size_const()) == 0); - } - - private: - virtual uint64_t size_compressed() { return size_compressed_; } - virtual uint64_t size_compressed_const() const { return size_compressed_; } - virtual void set_size(uint64_t s) { size_ = s; } - virtual void set_size_compressed(uint64_t s) { size_compressed_ = s; } - virtual uint64_t is_compressed() { return (size_compressed_ != 0); } - virtual void set_offset(uint64_t o) { offset_ = o; } - virtual void increment_offset(uint64_t inc) { offset_ += inc; } - - virtual uint32_t checksum() { return checksum_; } - virtual uint32_t checksum_initial() { return checksum_initial_; } - virtual void set_checksum(uint32_t c) { checksum_ = c; } - virtual void set_checksum_initial(uint32_t c) { checksum_initial_ = c; } - - std::shared_ptr resource_; - uint64_t size_; - uint64_t size_compressed_; - uint64_t offset_; - - uint32_t checksum_; // checksum for value_; - uint32_t checksum_initial_; // initial checksum for value_ -}; - -} // namespace kdb - -#endif // KINGDB_BYTE_ARRAY_H_ diff --git a/util/order.h b/util/order.h index eee3533..a0c7c9e 100644 --- a/util/order.h +++ b/util/order.h @@ -22,9 +22,7 @@ #include "util/status.h" #include "algorithm/coding.h" #include "algorithm/crc32c.h" -#include "util/byte_array_base.h" #include "util/byte_array.h" -#include "util/kitten.h" #include "util/options.h" namespace kdb { @@ -35,8 +33,8 @@ struct Order { std::thread::id tid; WriteOptions write_options; OrderType type; - Kitten key; - Kitten chunk; + ByteArray key; + ByteArray chunk; uint64_t offset_chunk; uint64_t size_value; uint64_t size_value_compressed;