Skip to content

Commit

Permalink
Closed the refactoring of the ByteArray class.
Browse files Browse the repository at this point in the history
- Finished refactoring the ByteArray class by using RAII.
- Renamed all usages of the temporary class, 'Kitten', into 'ByteArray'.
- Replaced pointers for objects in all API calls, in particular for
  Iterator and Snapshot.
- Implemented new Multipart API.
- Move the Mmap class to 'util/file.h'.
- Fixed various bugs along the way.
  • Loading branch information
goossaert committed Feb 28, 2015
1 parent 3248124 commit 7737fa6
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 949 deletions.
2 changes: 0 additions & 2 deletions algorithm/coding.h
Expand Up @@ -33,8 +33,6 @@ extern void PutFixed64(std::string* dst, uint64_t value);
extern void PutVarint32(std::string* dst, uint32_t value);
extern void PutVarint64(std::string* dst, uint64_t value);

// Standard Get... routines parse a value from the beginning of a ByteArray
// and returns the number of bytes that were read from the ByteArray.
extern int GetVarint32(char* input, uint64_t size, uint32_t* value);
extern int GetVarint64(char* input, uint64_t size, uint64_t* value);

Expand Down
16 changes: 8 additions & 8 deletions cache/write_buffer.cc
Expand Up @@ -20,7 +20,7 @@ void WriteBuffer::Flush() {
log::trace("WriteBuffer::Flush()", "end");
}

Status WriteBuffer::Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) {
Status WriteBuffer::Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) {
// TODO: need to fix the way the value is returned here: to create a new
// memory space and then return.
// TODO: make sure the live buffer doesn't need to be protected by a mutex in
Expand Down Expand Up @@ -125,15 +125,15 @@ Status WriteBuffer::Get(ReadOptions& read_options, Kitten& key, Kitten* value_ou
}


Status WriteBuffer::Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) {
Status WriteBuffer::Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) {
//return Write(OrderType::Put, key, value);
return Status::InvalidArgument("WriteBuffer::Put() is not implemented");
}


Status WriteBuffer::PutChunk(WriteOptions& write_options,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value,
uint64_t size_value_compressed,
Expand All @@ -150,16 +150,16 @@ Status WriteBuffer::PutChunk(WriteOptions& write_options,
}


Status WriteBuffer::Delete(WriteOptions& write_options, Kitten& key) {
auto empty = Kitten::NewEmptyKitten();
Status WriteBuffer::Delete(WriteOptions& write_options, ByteArray& key) {
auto empty = ByteArray::NewEmptyByteArray();
return WriteChunk(write_options, OrderType::Delete, key, empty, 0, 0, 0, 0);
}


Status WriteBuffer::WriteChunk(const WriteOptions& write_options,
const OrderType& op,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value,
uint64_t size_value_compressed,
Expand Down
14 changes: 7 additions & 7 deletions cache/write_buffer.h
Expand Up @@ -49,16 +49,16 @@ class WriteBuffer {

}
~WriteBuffer() { Close(); }
Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out);
Status Put(WriteOptions& write_options, Kitten& key, Kitten& chunk);
Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out);
Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk);
Status PutChunk(WriteOptions& write_options,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value,
uint64_t size_value_compressed,
uint32_t crc32);
Status Delete(WriteOptions& write_options, Kitten& key);
Status Delete(WriteOptions& write_options, ByteArray& key);
void Flush();

void Close () {
Expand Down Expand Up @@ -91,8 +91,8 @@ class WriteBuffer {
private:
Status WriteChunk(const WriteOptions& write_options,
const OrderType& op,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value,
uint64_t size_value_compressed,
Expand Down
47 changes: 23 additions & 24 deletions interface/interface.h
Expand Up @@ -9,7 +9,6 @@
#include "util/status.h"
#include "util/order.h"
#include "util/byte_array.h"
#include "util/kitten.h"

namespace kdb {

Expand All @@ -23,8 +22,8 @@ class Iterator {
virtual void Begin() = 0;
virtual bool IsValid() = 0;
virtual bool Next() = 0;
virtual Kitten GetKey() = 0;
virtual Kitten GetValue() = 0;
virtual ByteArray GetKey() = 0;
virtual ByteArray GetValue() = 0;
virtual MultipartReader GetMultipartValue() = 0;
};
*/
Expand All @@ -33,55 +32,55 @@ class Iterator {
class Interface {
public:
virtual ~Interface() {}
virtual Status Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) = 0;
virtual Status Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) = 0;

virtual Status Get(ReadOptions& read_options, Kitten& key, std::string* value_out) {
Kitten value;
virtual Status Get(ReadOptions& read_options, ByteArray& key, std::string* value_out) {
ByteArray value;
Status s = Get(read_options, key, &value);
if (!s.IsOK()) return s;
*value_out = value.ToString();
return s;
}

virtual Status Get(ReadOptions& read_options, std::string& key, Kitten* value_out) {
Kitten kitten_key = Kitten::NewPointerKitten(key.c_str(), key.size());
Status s = Get(read_options, kitten_key, value_out);
virtual Status Get(ReadOptions& read_options, std::string& key, ByteArray* value_out) {
ByteArray byte_array_key = ByteArray::NewPointerByteArray(key.c_str(), key.size());
Status s = Get(read_options, byte_array_key, value_out);
return s;
}

virtual Status Get(ReadOptions& read_options, std::string& key, std::string* value_out) {
Kitten kitten_key = Kitten::NewPointerKitten(key.c_str(), key.size());
Kitten value;
ByteArray byte_array_key = ByteArray::NewPointerByteArray(key.c_str(), key.size());
ByteArray value;
Status s = Get(read_options, key, &value);
if (!s.IsOK()) return s;
*value_out = value.ToString();
return s;
}

virtual Status Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) = 0;
virtual Status Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) = 0;

virtual Status Put(WriteOptions& write_options, Kitten& key, std::string& chunk) {
Kitten kitten_chunk = Kitten::NewDeepCopyKitten(chunk.c_str(), chunk.size());
return Put(write_options, key, kitten_chunk);
virtual Status Put(WriteOptions& write_options, ByteArray& key, std::string& chunk) {
ByteArray byte_array_chunk = ByteArray::NewDeepCopyByteArray(chunk.c_str(), chunk.size());
return Put(write_options, key, byte_array_chunk);
}

virtual Status Put(WriteOptions& write_options, std::string& key, Kitten& chunk) {
Kitten kitten_key = Kitten::NewDeepCopyKitten(key.c_str(), key.size());
return Put(write_options, kitten_key, chunk);
virtual Status Put(WriteOptions& write_options, std::string& key, ByteArray& chunk) {
ByteArray byte_array_key = ByteArray::NewDeepCopyByteArray(key.c_str(), key.size());
return Put(write_options, byte_array_key, chunk);
}

virtual Status Put(WriteOptions& write_options, std::string& key, std::string& chunk) {
Kitten kitten_key = Kitten::NewDeepCopyKitten(key.c_str(), key.size());
Kitten kitten_chunk = Kitten::NewDeepCopyKitten(chunk.c_str(), chunk.size());
return Put(write_options, kitten_key, kitten_chunk);
ByteArray byte_array_key = ByteArray::NewDeepCopyByteArray(key.c_str(), key.size());
ByteArray byte_array_chunk = ByteArray::NewDeepCopyByteArray(chunk.c_str(), chunk.size());
return Put(write_options, byte_array_key, byte_array_chunk);
}

virtual Status PutChunk(WriteOptions& write_options,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value) = 0;
virtual Status Delete(WriteOptions& write_options, Kitten& key) = 0;
virtual Status Delete(WriteOptions& write_options, ByteArray& key) = 0;
virtual Iterator NewIterator(ReadOptions& read_options) = 0;
virtual Status Open() = 0;
virtual void Close() = 0;
Expand Down
20 changes: 10 additions & 10 deletions interface/iterator.h
Expand Up @@ -10,8 +10,8 @@
#include "util/status.h"
#include "util/order.h"
#include "util/byte_array.h"
#include "util/kitten.h"
#include "util/options.h"
#include "util/file.h"
#include "interface/interface.h"
#include "interface/multipart.h"
#include "storage/storage_engine.h"
Expand Down Expand Up @@ -149,7 +149,7 @@ class Iterator {
}

// Get entry at the location
Kitten key, value;
ByteArray key, value;
uint64_t location_current = locations_current_[index_location_];
Status s = se_readonly_->GetEntry(read_options_, location_current, &key, &value);
if (!s.IsOK()) {
Expand All @@ -161,7 +161,7 @@ class Iterator {
// Get entry for the key found at the location, and continue if the
// locations mismatch -- i.e. the current entry has been overwritten
// by a later entry.
Kitten value_alt;
ByteArray value_alt;
uint64_t location_out;
s = se_readonly_->Get(read_options_, key, &value_alt, &location_out);
if (!s.IsOK()) {
Expand Down Expand Up @@ -191,17 +191,17 @@ class Iterator {
return false;
}

Kitten GetKey() {
ByteArray GetKey() {
std::unique_lock<std::mutex> lock(mutex_);
return key_;
}

Kitten GetValue() {
ByteArray GetValue() {
std::unique_lock<std::mutex> lock(mutex_);
if (!value_.is_compressed()) return value_;

if (value_.size() > se_readonly_->db_options_.internal__size_multipart_required) {
return Kitten();
return ByteArray();
}

// TODO-36: Uncompression should have to go through a MultipartReader. See
Expand All @@ -210,15 +210,15 @@ class Iterator {
uint64_t offset = 0;
MultipartReader mp_reader(read_options_, value_);
for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) {
Kitten part;
ByteArray part;
mp_reader.GetPart(&part);
log::trace("KingDB Get()", "Multipart loop size:%d [%s]", part.size(), part.ToString().c_str());
memcpy(buffer + offset, part.data(), part.size());
offset += part.size();
}
status_ = mp_reader.GetStatus();
if (!status_.IsOK()) log::trace("KingDB Get()", "Error in GetValue(): %s\n", status_.ToString().c_str());
return Kitten::NewShallowCopyKitten(buffer, value_.size());
return ByteArray::NewShallowCopyByteArray(buffer, value_.size());
}

MultipartReader GetMultipartValue() {
Expand Down Expand Up @@ -246,8 +246,8 @@ class Iterator {
Status status_;
bool is_closed_;

Kitten key_;
Kitten value_;
ByteArray key_;
ByteArray value_;
};

} // end namespace kdb
Expand Down
28 changes: 14 additions & 14 deletions interface/kingdb.cc
Expand Up @@ -7,8 +7,8 @@
namespace kdb {

Status KingDB::Get(ReadOptions& read_options,
Kitten& key,
Kitten* value_out,
ByteArray& key,
ByteArray* value_out,
bool want_raw_data) {
if (is_closed_) return Status::IOError("The database is not open");
log::trace("KingDB Get()", "[%s]", key.ToString().c_str());
Expand Down Expand Up @@ -47,31 +47,31 @@ Status KingDB::Get(ReadOptions& read_options,
uint64_t offset = 0;
MultipartReader mp_reader(read_options, *value_out);
for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) {
Kitten part;
ByteArray part;
mp_reader.GetPart(&part);
log::trace("KingDB Get()", "Multipart loop size:%d [%s]", part.size(), part.ToString().c_str());
memcpy(buffer + offset, part.data(), part.size());
offset += part.size();
}
*value_out = Kitten::NewShallowCopyKitten(buffer, value_out->size());
*value_out = ByteArray::NewShallowCopyByteArray(buffer, value_out->size());
}

return s;
}

Status KingDB::Get(ReadOptions& read_options, Kitten& key, Kitten* value_out) {
Status KingDB::Get(ReadOptions& read_options, ByteArray& key, ByteArray* value_out) {
return Get(read_options, key, value_out, false);
}


Status KingDB::Put(WriteOptions& write_options, Kitten& key, Kitten& chunk) {
Status KingDB::Put(WriteOptions& write_options, ByteArray& key, ByteArray& chunk) {
return PutChunk(write_options, key, chunk, 0, chunk.size());
}


Status KingDB::PutChunk(WriteOptions& write_options,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value) {
if (is_closed_) return Status::IOError("The database is not open");
Expand All @@ -84,7 +84,7 @@ Status KingDB::PutChunk(WriteOptions& write_options,
uint64_t size_chunk = chunk.size();
Status s;
for (uint64_t offset = 0; offset < size_chunk; offset += db_options_.storage__maximum_chunk_size) {
Kitten key_new, chunk_new;
ByteArray key_new, chunk_new;
if (offset + db_options_.storage__maximum_chunk_size < chunk.size()) {
chunk_new = chunk;
chunk_new.set_offset(offset);
Expand All @@ -106,8 +106,8 @@ Status KingDB::PutChunk(WriteOptions& write_options,


Status KingDB::PutChunkValidSize(WriteOptions& write_options,
Kitten& key,
Kitten& chunk,
ByteArray& key,
ByteArray& chunk,
uint64_t offset_chunk,
uint64_t size_value) {
if (is_closed_) return Status::IOError("The database is not open");
Expand All @@ -123,7 +123,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options,
bool do_compression = true;
uint64_t size_value_compressed = 0;
uint64_t offset_chunk_compressed = offset_chunk;
Kitten chunk_final;
ByteArray chunk_final;

bool is_first_chunk = (offset_chunk == 0);
bool is_last_chunk = (chunk.size() + offset_chunk == size_value);
Expand Down Expand Up @@ -185,7 +185,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options,
}

if (!s.IsOK()) return s;
Kitten chunk_compressed = Kitten::NewShallowCopyKitten(compressed, size_compressed);
ByteArray chunk_compressed = ByteArray::NewShallowCopyByteArray(compressed, size_compressed);

log::trace("KingDB::PutChunkValidSize()",
"[%s] (%" PRIu64 ") compressed size %" PRIu64 " - offset_chunk_compressed %" PRIu64,
Expand Down Expand Up @@ -240,7 +240,7 @@ Status KingDB::PutChunkValidSize(WriteOptions& write_options,


Status KingDB::Delete(WriteOptions& write_options,
Kitten& key) {
ByteArray& key) {
if (is_closed_) return Status::IOError("The database is not open");
log::trace("KingDB::Delete()", "[%s]", key.ToString().c_str());
Status s = se_->FileSystemStatus();
Expand Down

0 comments on commit 7737fa6

Please sign in to comment.