Skip to content
Permalink
Browse files

tools/sharelog_to_parquet: No longer generate empty parquet files

  • Loading branch information...
SwimmingTiger committed Sep 25, 2019
1 parent 56b07ad commit 8485720650c7119d8c0588d482d6836354a95940
Showing with 21 additions and 39 deletions.
  1. +1 −2 tools/sharelog_to_parquet/ShareLogParser.h
  2. +20 −37 tools/sharelog_to_parquet/ShareLogParser.inl
@@ -55,7 +55,6 @@ class ShareLogParserT : public ShareLogParser {
string outputDir_;
const string chainType_;
ParquetWriterT<SHARE> parquetWriter_;
bool openFileFailed_ = false;

//
// for processGrowingShareLog()
@@ -78,14 +77,14 @@ class ShareLogParserT : public ShareLogParser {

void parseShareLog(const uint8_t *buf, size_t len);
void parseShare(SHARE &share);
void generateEmptyParquets();
bool openParquet();

public:
ShareLogParserT(const libconfig::Config &cfg, time_t timestamp);
virtual ~ShareLogParserT();

bool init();
void closeParquet();

// read unchanged share data bin file, for example yestoday's file. it will
// use mmap() to get high performance. call only once will process
@@ -31,7 +31,6 @@ template <class SHARE>
ShareLogParserT<SHARE>::ShareLogParserT(
const libconfig::Config &cfg, time_t timestamp)
: date_(timestamp)
, hour_((timestamp - timestamp % 86400) / 3600)
, outputDir_(cfg.lookup("parquet.data_dir").operator string())
, chainType_(cfg.lookup("sharelog.chain_type").operator string())
, f_(nullptr)
@@ -70,7 +69,6 @@ bool ShareLogParserT<SHARE>::openParquet() {
if (!stat.ok()) {
LOG(ERROR) << "cannot create parquet file " << parquetPath
<< ", message: " << stat.message();
openFileFailed_ = true;
return false;
}
LOG(INFO) << "writing parquet file " << parquetPath;
@@ -83,6 +81,11 @@ bool ShareLogParserT<SHARE>::init() {
return true;
}

template <class SHARE>
void ShareLogParserT<SHARE>::closeParquet() {
parquetWriter_.close();
}

template <class SHARE>
void ShareLogParserT<SHARE>::parseShareLog(const uint8_t *buf, size_t len) {
SHARE share;
@@ -106,10 +109,8 @@ void ShareLogParserT<SHARE>::parseShareLog(const uint8_t *buf, size_t len) {
template <class SHARE>
void ShareLogParserT<SHARE>::parseShare(SHARE &share) {
time_t shareHour = share.timestamp() / 3600;
// Use loop to generate one file per hour, some may be empty files (if there
// is no share in that hour).
while (shareHour > hour_) {
hour_++;
if (shareHour > hour_) {
hour_ = shareHour;
if (!openParquet()) {
LOG(FATAL) << "cannot open parquet file, writer aborted!";
}
@@ -118,23 +119,9 @@ void ShareLogParserT<SHARE>::parseShare(SHARE &share) {
parquetWriter_.addShare(share);
}

template <class SHARE>
void ShareLogParserT<SHARE>::generateEmptyParquets() {
time_t hourEnd = (date_ + 82800 /*23h*/) / 3600;

// Generate empty parquet files for the period when there is no share
while (!openFileFailed_ && hour_ < hourEnd) {
hour_++;
openParquet();
}
parquetWriter_.close();
}

template <class SHARE>
bool ShareLogParserT<SHARE>::processUnchangedShareLog() {
try {
openFileFailed_ = true;

// open file
LOG(INFO) << "open file: " << filePath_;
zstr::ifstream f(filePath_, std::ios::binary);
@@ -144,12 +131,6 @@ bool ShareLogParserT<SHARE>::processUnchangedShareLog() {
return false;
}

openFileFailed_ = false;

if (!openParquet()) {
return false;
}

// 2000000 * 48 = 96,000,000 Bytes
string buf;
buf.resize(96000000);
@@ -194,18 +175,22 @@ bool ShareLogParserT<SHARE>::processUnchangedShareLog() {
<< " bytes fragment before EOF" << std::endl;
}

generateEmptyParquets();
closeParquet();

return true;
} catch (const zstr::Exception &ex) {
LOG(ERROR) << "open file fail: " << filePath_ << ", exception: " << ex.what();
LOG(ERROR) << "open file fail: " << filePath_
<< ", exception: " << ex.what();
return false;
} catch (const strict_fstream::Exception &ex) {
LOG(ERROR) << "open file fail: " << filePath_
<< ", exception: " << ex.what();
return false;
}
}

template <class SHARE>
int64_t ShareLogParserT<SHARE>::processGrowingShareLog() {
openFileFailed_ = true;

if (f_ == nullptr) {
bool fileOpened = true;
try {
@@ -221,18 +206,12 @@ int64_t ShareLogParserT<SHARE>::processGrowingShareLog() {
fileOpened = false;
}

if (fileOpened) {
openFileFailed_ = false;
} else {
if (!fileOpened) {
delete f_;
f_ = nullptr;

return -1;
}

if (!openParquet()) {
return false;
}
}

uint32_t readNum = 0;
@@ -404,6 +383,10 @@ void ShareLogParserServerT<SHARE>::runThreadShareLogParser() {

} /* while */

if (shareLogParser_) {
shareLogParser_->closeParquet();
}

LOG(INFO) << "thread sharelog parser stop";
}

0 comments on commit 8485720

Please sign in to comment.
You can’t perform that action at this time.