Skip to content

Commit

Permalink
add support file type
Browse files Browse the repository at this point in the history
  • Loading branch information
baotiao committed Nov 6, 2014
1 parent d860e8a commit bca4e91
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 49 deletions.
3 changes: 1 addition & 2 deletions example/demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ int main()
mario::Mario *m = new mario::Mario(1, fh, 2);

std::string item = "a";
s = m->Put(item);
int i = 0;
int i = 2000000;
while (i--) {
s = m->Put(item);
if (!s.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion include/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Consumer
#endif

#if defined(MARIO_MMAP)
Consumer(SequentialFile *queue, uint64_t initial_offset, Handler *h, Version *version, uint32_t filenum);
Consumer(SequentialFile *queue, Handler *h, Version *version, uint32_t filenum);
uint64_t last_record_offset () const { return last_record_offset_; }
uint32_t filenum() { return filenum_; }
#endif
Expand Down
2 changes: 2 additions & 0 deletions include/mario.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ class Mario
WritableFile *writefile() { return writefile_; }

Consumer *consumer() { return consumer_; }
Consumer::Handler *h() { return h_; }

private:

struct Writer;
Producer *producer_;
Consumer *consumer_;
uint32_t consumer_num_;
Consumer::Handler *h_;
uint64_t item_num_;
Env* env_;
SequentialFile *readfile_;
Expand Down
12 changes: 6 additions & 6 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ unsigned int Consumer::ReadMemoryRecord(Slice *result)
#endif

#if defined(MARIO_MMAP)
Consumer::Consumer(SequentialFile* const queue, uint64_t initial_offset,
Handler *h, Version* version, uint32_t filenum)
Consumer::Consumer(SequentialFile* const queue, Handler *h, Version* version,
uint32_t filenum)
: h_(h),
initial_offset_(0),
last_record_offset_(initial_offset % kBlockSize),
end_of_buffer_offset_(kBlockSize),
queue_(queue),
backing_store_(new char[kBlockSize]),
Expand All @@ -86,7 +85,8 @@ Consumer::Consumer(SequentialFile* const queue, uint64_t initial_offset,
filenum_(filenum)
{
// log_info("initial_offset %llu", initial_offset);
queue_->Skip(initial_offset);
last_record_offset_ = version_->con_offset() % kBlockSize;
queue_->Skip(version_->con_offset());
// log_info("status %s", s.ToString().c_str());
}

Expand All @@ -100,7 +100,7 @@ unsigned int Consumer::ReadPhysicalRecord(Slice *result)
Status s;
if (end_of_buffer_offset_ - last_record_offset_ <= kHeaderSize) {
queue_->Skip(end_of_buffer_offset_ - last_record_offset_);
version_->rise_offset(end_of_buffer_offset_ - last_record_offset_);
version_->rise_con_offset(end_of_buffer_offset_ - last_record_offset_);
version_->StableSave();
last_record_offset_ = 0;
}
Expand All @@ -126,7 +126,7 @@ unsigned int Consumer::ReadPhysicalRecord(Slice *result)
*result = Slice(buffer_.data(), buffer_.size());
last_record_offset_ += kHeaderSize + length;
if (s.ok()) {
version_->rise_offset(kHeaderSize + length);
version_->rise_con_offset(kHeaderSize + length);
version_->StableSave();
}
return type;
Expand Down
1 change: 1 addition & 0 deletions src/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ class PosixEnv : public Env
WritableFile** result, uint64_t write_len) {
Status s;
const int fd = open(fname.c_str(), O_RDWR, 0644);
// log_info("AppendWriteableFile write_len %lld", write_len);
if (fd < 0) {
*result = NULL;
s = IOError(fname, errno);
Expand Down
43 changes: 20 additions & 23 deletions src/mario.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct Mario::Writer {

Mario::Mario(uint32_t consumer_num, Consumer::Handler *h, int32_t retry)
: consumer_num_(consumer_num),
h_(h),
item_num_(0),
env_(Env::Default()),
readfile_(NULL),
Expand All @@ -44,8 +45,7 @@ Mario::Mario(uint32_t consumer_num, Consumer::Handler *h, int32_t retry)
#if defined(MARIO_MEMORY)
pool_ = (char *)malloc(sizeof(char) * kPoolSize);
if (pool_ == NULL) {
log_warn("malloc error");
exit(-1);
log_err("malloc error");
}
producer_ = new Producer(pool_, kPoolSize);
consumer_ = new Consumer(0, h, pool_, kPoolSize);
Expand All @@ -70,10 +70,6 @@ Mario::Mario(uint32_t consumer_num, Consumer::Handler *h, int32_t retry)
log_warn("new versionfile error");
}
version_ = new Version(versionfile_);
version_->set_item_num(0);
version_->set_offset(0);
version_->set_pronum(0);
version_->set_connum(0);
version_->StableSave();
} else {
log_info("Find the exist file ");
Expand All @@ -83,22 +79,21 @@ Mario::Mario(uint32_t consumer_num, Consumer::Handler *h, int32_t retry)
version_->InitSelf();
pronum_ = version_->pronum();
connum_ = version_->connum();
log_info("Current offset %" PRIu64 " itemnum %u pronum %u connum %u", version_->offset(), version_->item_num(), version_->pronum(), version_->connum());
version_->debug();
} else {
log_warn("new REFile error");
}
profile = NewFileName(filename_, pronum_);
confile = NewFileName(filename_, connum_);
log_info("profile %s confile %s", profile.c_str(), confile.c_str());
env_->AppendWritableFile(profile, &writefile_, version_->offset());
env_->AppendWritableFile(profile, &writefile_, version_->pro_offset());
uint64_t filesize = writefile_->Filesize();
log_info("filesize %" PRIu64 "", filesize);
env_->AppendSequentialFile(confile, &readfile_);
}

producer_ = new Producer(writefile_, version_);
log_info("offset %" PRIu64 "", version_->offset());
consumer_ = new Consumer(readfile_, version_->offset(), h, version_, connum_);
consumer_ = new Consumer(readfile_, h_, version_, connum_);
env_->StartThread(&Mario::SplitLogWork, this);

#endif
Expand All @@ -122,7 +117,6 @@ Mario::~Mario()
delete producer_;

#if defined(MARIO_MMAP)
// log_info("offset %llu itemnum %u ", version_->offset(), version_->item_num());
delete version_;
delete versionfile_;
#endif
Expand All @@ -149,7 +143,7 @@ void Mario::SplitLogCall()
std::string profile;
while (1) {
uint64_t filesize = writefile_->Filesize();
log_info("filesize %llu kMmapSize %llu", filesize, kMmapSize);
// log_info("filesize %llu kMmapSize %llu", filesize, kMmapSize);
if (filesize > kMmapSize) {
{

Expand All @@ -159,8 +153,10 @@ void Mario::SplitLogCall()
pronum_++;
profile = NewFileName(filename_, pronum_);
env_->NewWritableFile(profile, &writefile_);
version_->set_pro_offset(0);
version_->set_pronum(pronum_);
version_->StableSave();
version_->debug();
producer_ = new Producer(writefile_, version_);

}
Expand Down Expand Up @@ -202,29 +198,30 @@ void Mario::BackgroundCall()
#if defined(MARIO_MMAP)
s = consumer_->Consume(scratch);
while (!s.ok()) {
s = consumer_->Consume(scratch);
log_info("consumer_ consume %s", s.ToString().c_str());
log_info("connum_ %d", connum_);
// log_info("consumer_ consume %s", s.ToString().c_str());
// log_info("connum_ %d", connum_);
std::string confile = NewFileName(filename_, connum_ + 1);
log_info("confile %s connum_ %d", confile.c_str(), connum_);
log_info("isendfile %d fileexist %d item_num %d", s.IsEndFile(), env_->FileExists(confile), version_->item_num());
// log_info("confile %s connum_ %d", confile.c_str(), connum_);
// log_info("isendfile %d fileexist %d item_num %d", s.IsEndFile(), env_->FileExists(confile), version_->item_num());
if (s.IsEndFile() && env_->FileExists(confile)) {
// log_info("Rotate file ");
delete readfile_;
env_->AppendSequentialFile(confile, &readfile_);
Consumer::Handler *ho = consumer_->h();
connum_++;
delete consumer_;
version_->set_offset(0);
version_->set_con_offset(0);
version_->set_connum(connum_);
version_->StableSave();
consumer_ = new Consumer(readfile_, 0, ho, version_, connum_);
consumer_ = new Consumer(readfile_, h_, version_, connum_);
s = consumer_->Consume(scratch);
log_info("consumer_ consume %s", s.ToString().c_str());
// log_info("consumer_ consume %s", s.ToString().c_str());
break;
} else {
mutex_.Unlock();
sleep(1);
mutex_.Lock();
}
s = consumer_->Consume(scratch);
}
version_->minus_item_num();
version_->StableSave();
Expand All @@ -236,11 +233,11 @@ void Mario::BackgroundCall()
#endif
mutex_.Unlock();
if (retry_ == -1) {
while (!consumer_->h()->processMsg(scratch)) {
while (h_->processMsg(scratch)) {
}
} else {
int retry = retry_ - 1;
while (!consumer_->h()->processMsg(scratch) && retry--) {
while (!h_->processMsg(scratch) && retry--) {
}
if (retry <= 0) {
log_warn("message retry %d time still error %s", retry_, scratch.c_str());
Expand Down
4 changes: 3 additions & 1 deletion src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status Producer::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n)
}
block_offset_ += kHeaderSize + n;
// log_info("block_offset %d", (kHeaderSize + n));
version_->rise_offset((uint64_t)(kHeaderSize + n));
version_->rise_pro_offset((uint64_t)(kHeaderSize + n));
version_->StableSave();
return s;
}
Expand Down Expand Up @@ -111,6 +111,8 @@ Status Producer::Produce(const Slice &item)
#endif
#if defined(MARIO_MMAP)
queue_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00", leftover));
version_->rise_pro_offset(leftover);
version_->StableSave();
#endif
}
block_offset_ = 0;
Expand Down
24 changes: 14 additions & 10 deletions src/version.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ namespace mario {
Version::Version(RWFile *save) :
save_(save)
{

save_->GetData();
pro_offset_ = 0;
con_offset_ = 0;
item_num_ = 0;
pronum_ = 0;
connum_ = 0;
assert(save_ != NULL);
}

Expand All @@ -20,10 +24,9 @@ Status Version::StableSave()
{

char *p = save_->GetData();
memcpy(p, &offset_, sizeof(uint64_t));
uint64_t n;
memcpy((char *)&n, p, sizeof(uint64_t));
// log_info("num n %llu", n);
memcpy(p, &pro_offset_, sizeof(uint64_t));
p += 8;
memcpy(p, &con_offset_, sizeof(uint64_t));
p += 8;
memcpy(p, &item_num_, sizeof(uint32_t));
p += 4;
Expand All @@ -38,11 +41,12 @@ Status Version::InitSelf()
{
Status s;
if (save_->GetData() != NULL) {
memcpy((char*)(&offset_), save_->GetData(), sizeof(uint64_t));
memcpy((char*)(&item_num_), save_->GetData() + 8, sizeof(uint32_t));
memcpy((char*)(&pronum_), save_->GetData() + 12, sizeof(uint32_t));
memcpy((char*)(&connum_), save_->GetData() + 16, sizeof(uint32_t));
// log_info("InitSelf offset %llu itemnum %u pronum %u connum %u", offset_, item_num_, pronum_, connum_);
memcpy((char*)(&pro_offset_), save_->GetData(), sizeof(uint64_t));
memcpy((char*)(&con_offset_), save_->GetData() + 8, sizeof(uint64_t));
memcpy((char*)(&item_num_), save_->GetData() + 16, sizeof(uint32_t));
memcpy((char*)(&pronum_), save_->GetData() + 20, sizeof(uint32_t));
memcpy((char*)(&connum_), save_->GetData() + 24, sizeof(uint32_t));
// log_info("InitSelf pro_offset %llu itemnum %u pronum %u connum %u", pro_offset_, item_num_, pronum_, connum_);
return Status::OK();
} else {
return Status::Corruption("version init error");
Expand Down
23 changes: 18 additions & 5 deletions src/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#include <stdint.h>
#include "status.h"
#include "env.h"
#include "xdebug.h"
#include <stdint.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>


namespace mario {
Expand All @@ -20,10 +24,13 @@ class Version
Status StableSave();
Status InitSelf();

uint64_t offset() { return offset_; }
void set_offset(uint64_t offset) { offset_ = offset; }
void plus_offset() { offset_++; }
void rise_offset(uint64_t offset_r) { offset_ += offset_r; }
uint64_t pro_offset() { return pro_offset_; }
void set_pro_offset(uint64_t pro_offset) { pro_offset_ = pro_offset; }
void rise_pro_offset(uint64_t r) { pro_offset_ += r; }

uint64_t con_offset() { return con_offset_; }
void set_con_offset(uint64_t con_offset) { con_offset_ = con_offset; }
void rise_con_offset(uint64_t r) { con_offset_ += r; }

uint32_t item_num() { return item_num_; }
void set_item_num(uint32_t item_num) { item_num_ = item_num; }
Expand All @@ -36,8 +43,14 @@ class Version
uint32_t connum() { return connum_; }
void set_connum(uint32_t connum) { connum_ = connum; }

void debug() {
log_info("Current pro_offset %" PRIu64 " con_offset %" PRIu64 " itemnum %u pronum %u connum %u",
pro_offset_, con_offset_, item_num_, pronum_, connum_);
}

private:
uint64_t offset_;
uint64_t pro_offset_;
uint64_t con_offset_;
uint32_t item_num_;
uint32_t pronum_;
uint32_t connum_;
Expand Down
2 changes: 1 addition & 1 deletion tool/parse_mani.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ int main(int argc, char** argv)
Version *v = new Version(versionfile);
v->InitSelf();
if (s.ok()) {
log_info("offset %llu itemnum %u pronum %u connum %u", v->offset(), v->item_num(), v->pronum(), v->connum());
log_info("pro_offset %llu con_offset %llu itemnum %u pronum %u connum %u", v->pro_offset(), v->con_offset(), v->item_num(), v->pronum(), v->connum());
} else {
log_warn("init error");
}
Expand Down

0 comments on commit bca4e91

Please sign in to comment.