Skip to content

Commit

Permalink
Merge pull request #241 from btccom/share-statistics
Browse files Browse the repository at this point in the history
Share statistics improvements
  • Loading branch information
Hanjiang Yu committed Feb 11, 2019
2 parents 786b1f6 + 69408d8 commit abba7b5
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 130 deletions.
10 changes: 8 additions & 2 deletions src/ShareLogParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class ShareLogParserT : public ShareLogParser {
shared_ptr<DuplicateShareChecker<SHARE>>
dupShareChecker_; // Used to detect duplicate share attacks.

bool acceptStale_; // Whether stale shares are accepted

inline int32_t getHourIdx(uint32_t ts) {
// %H Hour in 24h format (00-23)
return atoi(date("%H", ts).c_str());
Expand Down Expand Up @@ -140,7 +142,8 @@ class ShareLogParserT : public ShareLogParser {
const string &dataDir,
time_t timestamp,
const MysqlConnectInfo &poolDBInfo,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker);
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale);
~ShareLogParserT();

bool init();
Expand Down Expand Up @@ -201,6 +204,8 @@ class ShareLogParserServerT : public ShareLogParserServer {
shared_ptr<DuplicateShareChecker<SHARE>>
dupShareChecker_; // Used to detect duplicate share attacks.

bool acceptStale_; // Whether stale shares are accepted

// httpd
struct event_base *base_;
string httpdHost_;
Expand Down Expand Up @@ -237,7 +242,8 @@ class ShareLogParserServerT : public ShareLogParserServer {
unsigned short httpdPort,
const MysqlConnectInfo &poolDBInfo,
const uint32_t kFlushDBInterval,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker);
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale);
~ShareLogParserServerT();

void stop();
Expand Down
23 changes: 16 additions & 7 deletions src/ShareLogParser.inl
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,16 @@ ShareLogParserT<SHARE>::ShareLogParserT(
const string &dataDir,
time_t timestamp,
const MysqlConnectInfo &poolDBInfo,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker)
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale)
: date_(timestamp)
, chainType_(chainType)
, f_(nullptr)
, buf_(nullptr)
, incompleteShareSize_(0)
, poolDB_(poolDBInfo)
, dupShareChecker_(dupShareChecker) {
, dupShareChecker_(dupShareChecker)
, acceptStale_(acceptStale) {
pthread_rwlock_init(&rwlock_, nullptr);

{
Expand Down Expand Up @@ -208,9 +210,9 @@ void ShareLogParserT<SHARE>::parseShare(SHARE &share) {
pthread_rwlock_unlock(&rwlock_);

const uint32_t hourIdx = getHourIdx(share.timestamp());
workersStats_[wkey]->processShare(hourIdx, share);
workersStats_[ukey]->processShare(hourIdx, share);
workersStats_[pkey]->processShare(hourIdx, share);
workersStats_[wkey]->processShare(hourIdx, share, acceptStale_);
workersStats_[ukey]->processShare(hourIdx, share, acceptStale_);
workersStats_[pkey]->processShare(hourIdx, share, acceptStale_);
}

template <class SHARE>
Expand Down Expand Up @@ -763,13 +765,15 @@ ShareLogParserServerT<SHARE>::ShareLogParserServerT(
unsigned short httpdPort,
const MysqlConnectInfo &poolDBInfo,
const uint32_t kFlushDBInterval,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker)
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale)
: running_(true)
, chainType_(chainType)
, dataDir_(dataDir)
, poolDBInfo_(poolDBInfo)
, kFlushDBInterval_(kFlushDBInterval)
, dupShareChecker_(dupShareChecker)
, acceptStale_(acceptStale)
, base_(nullptr)
, httpdHost_(httpdHost)
, httpdPort_(httpdPort)
Expand Down Expand Up @@ -815,7 +819,12 @@ bool ShareLogParserServerT<SHARE>::initShareLogParser(time_t datets) {
// set new obj
shared_ptr<ShareLogParserT<SHARE>> parser =
std::make_shared<ShareLogParserT<SHARE>>(
chainType_.c_str(), dataDir_, date_, poolDBInfo_, dupShareChecker_);
chainType_.c_str(),
dataDir_,
date_,
poolDBInfo_,
dupShareChecker_,
acceptStale_);

if (!parser->init()) {
LOG(ERROR) << "parser check failure, date: " << date("%F", date_);
Expand Down
3 changes: 2 additions & 1 deletion src/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ class ShareStatsDay {
ShareStatsDay(const ShareStatsDay &r) = default;
ShareStatsDay &operator=(const ShareStatsDay &r) = default;

void processShare(uint32_t hourIdx, const SHARE &share);
void processShare(uint32_t hourIdx, const SHARE &share, bool acceptStale);
double getShareReward(const SHARE &share);
void getShareStatsHour(uint32_t hourIdx, ShareStats *stats);
void getShareStatsDay(ShareStats *stats);
};
Expand Down
26 changes: 26 additions & 0 deletions src/Statistics.inl
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,32 @@ T StatsWindow<T>::sum(int64_t beginRingIdx) {
}

/////////////////////////////// ShareStatsDay ////////////////////////////////
template <class SHARE>
void ShareStatsDay<SHARE>::processShare(
uint32_t hourIdx, const SHARE &share, bool acceptStale) {
ScopeLock sl(lock_);

if (StratumStatus::isAccepted(share.status()) &&
(acceptStale || !StratumStatus::isStale(share.status()))) {
shareAccept1h_[hourIdx] += share.sharediff();
shareAccept1d_ += share.sharediff();

double score = share.score();
double reward = getShareReward(share);
double earn = score * reward;

score1h_[hourIdx] += score;
score1d_ += score;
earn1h_[hourIdx] += earn;
earn1d_ += earn;

} else {
shareReject1h_[hourIdx] += share.sharediff();
shareReject1d_ += share.sharediff();
}
modifyHoursFlag_ |= (0x01u << hourIdx);
}

template <class SHARE>
void ShareStatsDay<SHARE>::getShareStatsHour(
uint32_t hourIdx, ShareStats *stats) {
Expand Down
7 changes: 5 additions & 2 deletions src/StatsHttpd.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class WorkerShares {
// void serialize(...);
// bool unserialize(const ...);

void processShare(const SHARE &share);
void processShare(const SHARE &share, bool acceptStale);
WorkerStatus getWorkerStatus();
void getWorkerStatus(WorkerStatus &status);
bool isExpired();
Expand Down Expand Up @@ -199,6 +199,8 @@ class StatsServerT : public StatsServer {
shared_ptr<DuplicateShareChecker<SHARE>>
dupShareChecker_; // Used to detect duplicate share attacks.

bool acceptStale_; // Whether stale shares are accepted

// httpd
struct event_base *base_;
string httpdHost_;
Expand Down Expand Up @@ -285,7 +287,8 @@ class StatsServerT : public StatsServer {
const int redisIndexPolicy,
const time_t kFlushDBInterval,
const string &fileLastFlushTime,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker);
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale);
~StatsServerT();

bool init();
Expand Down
19 changes: 11 additions & 8 deletions src/StatsHttpd.inl
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ WorkerShares<SHARE>::WorkerShares(const int64_t workerId, const int32_t userId)
}

template <class SHARE>
void WorkerShares<SHARE>::processShare(const SHARE &share) {
void WorkerShares<SHARE>::processShare(const SHARE &share, bool acceptStale) {
ScopeLock sl(lock_);
const time_t now = time(nullptr);
if (now > share.timestamp() + STATS_SLIDING_WINDOW_SECONDS) {
return;
}

if (StratumStatus::isAccepted(share.status())) {
if (StratumStatus::isAccepted(share.status()) &&
(acceptStale || !StratumStatus::isStale(share.status()))) {
acceptCount_++;
acceptShareSec_.insert(share.timestamp(), share.sharediff());
} else {
Expand Down Expand Up @@ -126,7 +127,8 @@ StatsServerT<SHARE>::StatsServerT(
const int redisIndexPolicy,
const time_t kFlushDBInterval,
const string &fileLastFlushTime,
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker)
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker,
bool acceptStale)
: running_(true)
, totalWorkerCount_(0)
, totalUserCount_(0)
Expand All @@ -151,6 +153,7 @@ StatsServerT<SHARE>::StatsServerT(
, lastFlushTime_(0)
, fileLastFlushTime_(fileLastFlushTime)
, dupShareChecker_(dupShareChecker)
, acceptStale_(acceptStale)
, base_(nullptr)
, httpdHost_(httpdHost)
, httpdPort_(httpdPort)
Expand Down Expand Up @@ -301,7 +304,7 @@ void StatsServerT<SHARE>::processShare(const SHARE &share) {
if (now > share.timestamp() + STATS_SLIDING_WINDOW_SECONDS) {
return;
}
poolWorker_.processShare(share);
poolWorker_.processShare(share, acceptStale_);

WorkerKey key(share.userid(), share.workerhashid());
_processShare(key, share);
Expand All @@ -319,19 +322,19 @@ void StatsServerT<SHARE>::_processShare(WorkerKey &key, const SHARE &share) {
shared_ptr<WorkerShares<SHARE>> workerShare = nullptr, userShare = nullptr;

if (workerItr != workerSet_.end()) {
workerItr->second->processShare(share);
workerItr->second->processShare(share, acceptStale_);
} else {
workerShare =
make_shared<WorkerShares<SHARE>>(share.workerhashid(), share.userid());
workerShare->processShare(share);
workerShare->processShare(share, acceptStale_);
}

if (userItr != userSet_.end()) {
userItr->second->processShare(share);
userItr->second->processShare(share, acceptStale_);
} else {
userShare =
make_shared<WorkerShares<SHARE>>(share.workerhashid(), share.userid());
userShare->processShare(share);
userShare->processShare(share, acceptStale_);
}

if (workerShare != nullptr || userShare != nullptr) {
Expand Down
24 changes: 2 additions & 22 deletions src/bitcoin/StatisticsBitcoin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,8 @@
#include "BitcoinUtils.h"

template <>
void ShareStatsDay<ShareBitcoin>::processShare(
uint32_t hourIdx, const ShareBitcoin &share) {
ScopeLock sl(lock_);

if (StratumStatus::isAccepted(share.status())) {
shareAccept1h_[hourIdx] += share.sharediff();
shareAccept1d_ += share.sharediff();

double score = share.score();
double reward = GetBlockReward(share.height(), Params().GetConsensus());
double earn = score * reward;

score1h_[hourIdx] += score;
score1d_ += score;
earn1h_[hourIdx] += earn;
earn1d_ += earn;

} else {
shareReject1h_[hourIdx] += share.sharediff();
shareReject1d_ += share.sharediff();
}
modifyHoursFlag_ |= (0x01u << hourIdx);
double ShareStatsDay<ShareBitcoin>::getShareReward(const ShareBitcoin &share) {
return GetBlockReward(share.height(), Params().GetConsensus());
}

/////////////// template instantiation ///////////////
Expand Down
24 changes: 2 additions & 22 deletions src/bytom/StatisticsBytom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,8 @@
#include "BytomUtils.h"

template <>
void ShareStatsDay<ShareBytom>::processShare(
uint32_t hourIdx, const ShareBytom &share) {
ScopeLock sl(lock_);

if (StratumStatus::isAccepted(share.status())) {
shareAccept1h_[hourIdx] += share.sharediff();
shareAccept1d_ += share.sharediff();

double score = share.score();
double reward = GetBlockRewardBytom(share.height());
double earn = score * reward;

score1h_[hourIdx] += score;
score1d_ += score;
earn1h_[hourIdx] += earn;
earn1d_ += earn;

} else {
shareReject1h_[hourIdx] += share.sharediff();
shareReject1d_ += share.sharediff();
}
modifyHoursFlag_ |= (0x01u << hourIdx);
double ShareStatsDay<ShareBytom>::getShareReward(const ShareBytom &share) {
return GetBlockRewardBytom(share.height());
}

/////////////// template instantiation ///////////////
Expand Down
30 changes: 5 additions & 25 deletions src/decred/StatisticsDecred.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,11 @@
#include "DecredUtils.h"

template <>
void ShareStatsDay<ShareDecred>::processShare(
uint32_t hourIdx, const ShareDecred &share) {
ScopeLock sl(lock_);

if (StratumStatus::isAccepted(share.status())) {
shareAccept1h_[hourIdx] += share.sharediff();
shareAccept1d_ += share.sharediff();

double score = share.score();
double reward = GetBlockRewardDecredWork(
share.height(),
share.voters(),
NetworkParamsDecred::get((NetworkDecred)share.network()));
double earn = score * reward;

score1h_[hourIdx] += score;
score1d_ += score;
earn1h_[hourIdx] += earn;
earn1d_ += earn;

} else {
shareReject1h_[hourIdx] += share.sharediff();
shareReject1d_ += share.sharediff();
}
modifyHoursFlag_ |= (0x01u << hourIdx);
double ShareStatsDay<ShareDecred>::getShareReward(const ShareDecred &share) {
return GetBlockRewardDecredWork(
share.height(),
share.voters(),
NetworkParamsDecred::get((NetworkDecred)share.network()));
}

/////////////// template instantiation ///////////////
Expand Down
25 changes: 2 additions & 23 deletions src/eth/StatisticsEth.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,8 @@
#include "StatisticsEth.h"

template <>
void ShareStatsDay<ShareEth>::processShare(
uint32_t hourIdx, const ShareEth &share) {
ScopeLock sl(lock_);

if (StratumStatus::isAccepted(share.status())) {
shareAccept1h_[hourIdx] += share.sharediff();
shareAccept1d_ += share.sharediff();

double score = share.score();
double reward =
EthConsensus::getStaticBlockReward(share.height(), share.getChain());
double earn = score * reward;

score1h_[hourIdx] += score;
score1d_ += score;
earn1h_[hourIdx] += earn;
earn1d_ += earn;

} else {
shareReject1h_[hourIdx] += share.sharediff();
shareReject1d_ += share.sharediff();
}
modifyHoursFlag_ |= (0x01u << hourIdx);
double ShareStatsDay<ShareEth>::getShareReward(const ShareEth &share) {
return EthConsensus::getStaticBlockReward(share.height(), share.getChain());
}

template class ShareStatsDay<ShareEth>;
Loading

0 comments on commit abba7b5

Please sign in to comment.