Skip to content

Commit

Permalink
sserver: Fixed an issue that filling wrong userId to a share when the…
Browse files Browse the repository at this point in the history
… current chainId is different from the job's.
  • Loading branch information
YihaoPeng committed Jan 21, 2019
1 parent 58fdc1c commit b98e68f
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 84 deletions.
13 changes: 9 additions & 4 deletions src/Stratum.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ const char * StratumStatus::toString(int err) {
}

//////////////////////////////// StratumWorker ////////////////////////////////
StratumWorker::StratumWorker(): userId_(0), workerHashId_(0) {}
StratumWorker::StratumWorker(const size_t chainSize)
: chainId_(0)
, workerHashId_(0) {
userIds_.resize(chainSize, 0);
}

void StratumWorker::resetNames() {
workerHashId_ = 0;
Expand All @@ -116,16 +120,17 @@ void StratumWorker::resetNames() {
workerName_.clear();
}

string StratumWorker::getUserName(const string &fullName) const {
string StratumWorker::getUserName(const string &fullName) {
auto pos = fullName.find(".");
if (pos == fullName.npos) {
return fullName;
}
return fullName.substr(0, pos);
}

void StratumWorker::setUserID(const int32_t userId) {
userId_ = userId;
void StratumWorker::setChainIdAndUserId(const size_t chainId, const int32_t userId) {
userIds_[chainId] = userId;
chainId_ = chainId;
}

void StratumWorker::setNames(const string &fullName) {
Expand Down
24 changes: 20 additions & 4 deletions src/Stratum.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,22 @@ class StratumStatus
};

//////////////////////////////// StratumWorker ////////////////////////////////
class StratumWorker
class StratumWorkerPlain
{
public:
int32_t userId_;
int64_t workerHashId_;

string fullName_;
string userName_;
string workerName_;
};

class StratumWorker
{
public:
std::atomic<size_t> chainId_;
vector<int32_t> userIds_;
int64_t workerHashId_; // substr(0, 8, HASH(wokerName))

string fullName_; // fullName = username.workername
Expand All @@ -124,11 +136,15 @@ class StratumWorker
void resetNames();

public:
StratumWorker();
void setUserID(const int32_t userId);
StratumWorker(const size_t chainSize);

void setChainIdAndUserId(const size_t chainId, const int32_t userId);
void setNames(const string &fullName);
string getUserName(const string &fullName) const;

int32_t userId() const { return userIds_[chainId_]; }
int32_t userId(const size_t chainId) const { return userIds_[chainId]; }

static string getUserName(const string &fullName);
static int64_t calcWorkerId(const string &workerName);
};

Expand Down
22 changes: 11 additions & 11 deletions src/StratumSession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ static const string BtccomAgentPrefix = "btccom-agent/";
StratumSession::StratumSession(StratumServer &server, struct bufferevent *bev, struct sockaddr *saddr, uint32_t sessionId)
: server_(server), bev_(bev), sessionId_(sessionId)
, buffer_(evbuffer_new()), clientAgent_("unknown")
, isAgentClient_(false), isNiceHashClient_(false), chainId_(0)
, state_(CONNECTED), isDead_(false), isLongTimeout_(false)
, isAgentClient_(false), isNiceHashClient_(false)
, state_(CONNECTED), worker_(server.chains_.size())
, isDead_(false), isLongTimeout_(false)
{
assert(saddr->sa_family == AF_INET);
auto ipv4 = reinterpret_cast<struct sockaddr_in *>(saddr);
Expand Down Expand Up @@ -188,12 +189,12 @@ void StratumSession::handleLine(const std::string &line) {

void StratumSession::logAuthorizeResult(bool success) {
if (success) {
LOG(INFO) << "authorize success, userId: " << worker_.userId_
LOG(INFO) << "authorize success, userId: " << worker_.userId()
<< ", wokerHashId: " << worker_.workerHashId_
<< ", workerName: " << worker_.fullName_
<< ", clientAgent: " << clientAgent_
<< ", clientIp: " << clientIp_
<< ", chain: " << getServer().chainName(chainId_);
<< ", chain: " << getServer().chainName(worker_.chainId_);
}
else {
LOG(WARNING) << "authorize failed, workerName:" << worker_.fullName_
Expand All @@ -213,7 +214,7 @@ string StratumSession::getMinerInfoJson(const string &type) {
"}}",
date("%F %T").c_str(),
type.c_str(),
worker_.userId_, worker_.userName_.c_str(),
worker_.userId(), worker_.userName_.c_str(),
worker_.workerHashId_, worker_.workerName_.c_str(),
clientAgent_.c_str(), clientIp_.c_str(),
sessionId_);
Expand Down Expand Up @@ -266,9 +267,8 @@ bool StratumSession::switchChain(size_t chainId) {
return false;
}

chainId_ = chainId;
worker_.setUserID(userId);
server_.userInfo_->addWorker(chainId, worker_.userId_, worker_.workerHashId_, worker_.workerName_, clientAgent_);
worker_.setChainIdAndUserId(chainId, userId);
server_.userInfo_->addWorker(chainId, worker_.userId(), worker_.workerHashId_, worker_.workerName_, clientAgent_);

// sent events to kafka: miner_connect
server_.sendCommonEvents2Kafka(chainId, getMinerInfoJson("miner_connect"));
Expand Down Expand Up @@ -353,16 +353,16 @@ void StratumSession::addWorker(const std::string &clientAgent, const std::string
LOG(ERROR) << "curr stratum session has NOT auth yet";
return;
}
server_.userInfo_->addWorker(chainId_, worker_.userId_, workerId, workerName, clientAgent);
server_.userInfo_->addWorker(worker_.chainId_, worker_.userId(), workerId, workerName, clientAgent);
}

void StratumSession::markAsDead() {
// mark as dead
isDead_.store(true);

// sent event to kafka: miner_dead
if (worker_.userId_ > 0) {
server_.sendCommonEvents2Kafka(chainId_, getMinerInfoJson("miner_dead"));
if (worker_.userId() > 0) {
server_.sendCommonEvents2Kafka(worker_.chainId_, getMinerInfoJson("miner_dead"));
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/StratumSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class StratumSession : public IStratumSession {
bool isNiceHashClient_;
std::unique_ptr<StratumMessageDispatcher> dispatcher_;

std::atomic<size_t> chainId_;
State state_;
StratumWorker worker_;
std::atomic<bool> isDead_;
Expand Down Expand Up @@ -146,7 +145,7 @@ class StratumSession : public IStratumSession {
StratumMessageDispatcher &getDispatcher() override { return *dispatcher_; }
uint32_t getClientIp() const { return clientIpInt_; };
uint32_t getSessionId() const { return sessionId_; }
size_t getChainId() const { return chainId_; }
size_t getChainId() const { return worker_.chainId_; }
State getState() const { return state_; }
string getUserName() const { return worker_.userName_; }

Expand Down
8 changes: 3 additions & 5 deletions src/beam/StratumMinerBeam.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void StratumMinerBeam::handleRequest_Submit(const string &idStr, const JsonNode
share.set_version(ShareBeam::CURRENT_VERSION);
share.set_inputprefix(inputPrefix);
share.set_workerhashid(workerId_);
share.set_userid(worker.userId_);
share.set_userid(worker.userId(exjob->chainId_));
share.set_sharediff(jobDiff.currentJobDiff_);
share.set_blockbits(sjob->blockBits_);
share.set_timestamp((uint64_t) time(nullptr));
Expand Down Expand Up @@ -163,10 +163,8 @@ void StratumMinerBeam::handleRequest_Submit(const string &idStr, const JsonNode
int64_t invalidSharesNum = invalidSharesCounter_.sum(time(nullptr), INVALID_SHARE_SLIDING_WINDOWS_SIZE);
// too much invalid shares, don't send them to kafka
if (invalidSharesNum >= INVALID_SHARE_SLIDING_WINDOWS_MAX_LIMIT) {
LOG(WARNING) << "invalid share spamming, diff: "
<< share.sharediff() << ", uid: " << worker.userId_
<< ", uname: \"" << worker.userName_ << "\", ip: " << clientIp
<< "checkshare result: " << share.status();
LOG(WARNING) << "invalid share spamming, worker: " << worker.fullName_
<< ", " << share.toString();
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/beam/StratumServerBeam.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void ServerBeam::sendSolvedShare2Kafka(
"\"workerId\":%" PRId64 ",\"workerFullName\":\"%s\","
"\"blockHash\":\"%s\",\"chain\":\"%s\"}",
share.nonce(), input.c_str(), output.c_str(),
share.height(), share.blockbits(), worker.userId_,
share.height(), share.blockbits(), worker.userId(chainId),
worker.workerHashId_, filterWorkerName(worker.fullName_).c_str(),
blockHash.ToString().c_str(), "BEAM"
);
Expand Down
8 changes: 3 additions & 5 deletions src/bitcoin/StratumMinerBitcoin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void StratumMinerBitcoin::handleRequest_Submit(const string &idStr,
share.set_version(ShareBitcoin::CURRENT_VERSION);
share.set_jobid(localJob->jobId_);
share.set_workerhashid(workerId_);
share.set_userid(worker.userId_);
share.set_userid(worker.userId(exjob->chainId_));
share.set_sharediff(iter->second);
share.set_blkbits(localJob->blkBits_);
share.set_timestamp((uint64_t) time(nullptr));
Expand Down Expand Up @@ -276,10 +276,8 @@ void StratumMinerBitcoin::handleRequest_Submit(const string &idStr,
// too much invalid shares, don't send them to kafka
if (invalidSharesNum >= INVALID_SHARE_SLIDING_WINDOWS_MAX_LIMIT) {
isSendShareToKafka = false;

LOG(INFO) << "invalid share spamming, diff: "
<< share.sharediff() << ", worker: " << worker.fullName_ << ", agent: "
<< clientAgent_ << ", ip: " << session.getClientIp();
LOG(WARNING) << "invalid share spamming, worker: " << worker.fullName_
<< ", " << share.toString();
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/bitcoin/StratumSessionBitcoin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void StratumSessionBitcoin::sendMiningNotify(shared_ptr<StratumJobEx> exJobPtr,

#ifdef USER_DEFINED_COINBASE
// add the User's coinbaseInfo to the coinbase1's tail
string userCoinbaseInfo = GetServer()->userInfo_->getCoinbaseInfo(worker_.userId_);
string userCoinbaseInfo = GetServer()->userInfo_->getCoinbaseInfo(worker_.userId());
ljob.userCoinbaseInfo_ = userCoinbaseInfo;
#endif

Expand Down Expand Up @@ -362,13 +362,13 @@ void StratumSessionBitcoin::handleRequest_Authorize(const string &idStr,

void StratumSessionBitcoin::logAuthorizeResult(bool success) {
if (success) {
LOG(INFO) << "authorize success, userId: " << worker_.userId_
LOG(INFO) << "authorize success, userId: " << worker_.userId()
<< ", wokerHashId: " << worker_.workerHashId_
<< ", workerName: " << worker_.fullName_
<< ", versionMask: " << Strings::Format("%08x", versionMask_)
<< ", clientAgent: " << clientAgent_
<< ", clientIp: " << clientIp_
<< ", chain: " << getServer().chainName(chainId_);
<< ", chain: " << getServer().chainName(worker_.chainId_);
}
else {
LOG(WARNING) << "authorize failed, workerName:" << worker_.fullName_
Expand All @@ -389,7 +389,7 @@ string StratumSessionBitcoin::getMinerInfoJson(const string &type) {
"}}",
date("%F %T").c_str(),
type.c_str(),
worker_.userId_, worker_.userName_.c_str(),
worker_.userId(), worker_.userName_.c_str(),
worker_.workerHashId_, worker_.workerName_.c_str(),
clientAgent_.c_str(), clientIp_.c_str(),
sessionId_, versionMask_);
Expand Down
6 changes: 3 additions & 3 deletions src/bytom/BlockMakerBytom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void BlockMakerBytom::processSolvedShare(rd_kafka_message_t *rkmessage)
submitBlockNonBlocking(request);

// NOTE: Database save is not implemented. Need to setup mysql in test environment
StratumWorker worker;
StratumWorkerPlain worker;
worker.userId_ = r["userId"].int32();
worker.workerHashId_ = r["workerId"].int64();
worker.fullName_ = r["workerFullName"].str();
Expand Down Expand Up @@ -95,13 +95,13 @@ void BlockMakerBytom::_submitBlockThread(const string &rpcAddress, const string
}

void BlockMakerBytom::saveBlockToDBNonBlocking(const string &header, const uint32_t height,
const uint64_t networkDiff, const StratumWorker &worker) {
const uint64_t networkDiff, const StratumWorkerPlain &worker) {
boost::thread t(boost::bind(&BlockMakerBytom::_saveBlockToDBThread, this,
header, height, networkDiff, worker));
}

void BlockMakerBytom::_saveBlockToDBThread(const string &header, const uint32_t height,
const uint64_t networkDiff, const StratumWorker &worker) {
const uint64_t networkDiff, const StratumWorkerPlain &worker) {
const string nowStr = date("%F %T");
string sql;
sql = Strings::Format("INSERT INTO `found_blocks` "
Expand Down
4 changes: 2 additions & 2 deletions src/bytom/BlockMakerBytom.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class BlockMakerBytom : public BlockMaker
private:
void submitBlockNonBlocking(const string &request);
void _submitBlockThread(const string &rpcAddress, const string &rpcUserpass, const string& request);
void saveBlockToDBNonBlocking(const string &header, const uint32_t height, const uint64_t networkDiff, const StratumWorker &worker);
void _saveBlockToDBThread(const string &header, const uint32_t height, const uint64_t networkDiff, const StratumWorker &worker);
void saveBlockToDBNonBlocking(const string &header, const uint32_t height, const uint64_t networkDiff, const StratumWorkerPlain &worker);
void _saveBlockToDBThread(const string &header, const uint32_t height, const uint64_t networkDiff, const StratumWorkerPlain &worker);

};

Expand Down
8 changes: 3 additions & 5 deletions src/bytom/StratumMinerBytom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void StratumMinerBytom::handleRequest_Submit(const string &idStr, const JsonNode
share.set_version(ShareBytom::CURRENT_VERSION);
// TODO: not set: share.checkSum_
share.set_workerhashid(workerId_);
share.set_userid(worker.userId_);
share.set_userid(worker.userId(exjob->chainId_));
share.set_status(StratumStatus::REJECT_NO_REASON);
share.set_timestamp((uint32_t) time(nullptr));
IpAddress ip;
Expand Down Expand Up @@ -259,10 +259,8 @@ void StratumMinerBytom::handleRequest_Submit(const string &idStr, const JsonNode
// too much invalid shares, don't send them to kafka
if (invalidSharesNum >= INVALID_SHARE_SLIDING_WINDOWS_MAX_LIMIT) {
isSendShareToKafka = false;
LOG(WARNING) << "invalid share spamming, diff: "
<< share.sharediff() << ", uid: " << worker.userId_
<< ", uname: \"" << worker.userName_ << "\", ip: " << clientIp
<< "checkshare result: " << share.status();
LOG(WARNING) << "invalid share spamming, worker: " << worker.fullName_
<< ", " << share.toString();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/bytom/StratumServerBytom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void ServerBytom::sendSolvedShare2Kafka(
"\"height\":%lu,\"networkDiff\":%" PRIu64 ",\"userId\":%ld,"
"\"workerId\":%" PRId64 ",\"workerFullName\":\"%s\"}",
nonce, strHeader.c_str(),
height, networkDiff, worker.userId_,
height, networkDiff, worker.userId(chainId),
worker.workerHashId_, filterWorkerName(worker.fullName_).c_str());
ServerBase::sendSolvedShare2Kafka(chainId, msg.c_str(), msg.length());
}
7 changes: 3 additions & 4 deletions src/decred/StratumMinerDecred.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void StratumMinerDecred::handleRequest_Submit(const string &idStr, const JsonNod
}

ShareDecred share(workerId_,
worker.userId_,
worker.userId(exjob->chainId_),
clientIp,
localJob->jobId_,
iter->second,
Expand Down Expand Up @@ -159,9 +159,8 @@ void StratumMinerDecred::handleRequest_Submit(const string &idStr, const JsonNod
// too much invalid shares, don't send them to kafka
if (invalidSharesNum >= INVALID_SHARE_SLIDING_WINDOWS_MAX_LIMIT) {
isSendShareToKafka = false;

LOG(INFO) << "invalid share spamming, diff: " << share.sharediff() << ", worker: "
<< worker.fullName_ << ", agent: " << clientAgent_ << ", ip: " << clientIp;
LOG(WARNING) << "invalid share spamming, worker: " << worker.fullName_
<< ", " << share.toString();
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/eth/BlockMakerEth.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void BlockMakerEth::processSolvedShare(rd_kafka_message_t *rkmessage)
return;
}

StratumWorker worker;
StratumWorkerPlain worker;
worker.userId_ = r["userId"].int32();
worker.workerHashId_ = r["workerId"].int64();
worker.fullName_ = r["workerFullName"].str();
Expand Down Expand Up @@ -248,7 +248,7 @@ bool BlockMakerEth::checkRpcSubmitBlock() {
}

void BlockMakerEth::submitBlockNonBlocking(const string &nonce, const string &header, const string &mix, const vector<NodeDefinition> &nodes,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorker &worker) {
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker) {
std::vector<std::shared_ptr<std::thread>> threadPool;
std::atomic<bool> syncSubmitSuccess(false);

Expand All @@ -269,7 +269,7 @@ void BlockMakerEth::submitBlockNonBlocking(const string &nonce, const string &he
}

void BlockMakerEth::_submitBlockThread(const string &nonce, const string &header, const string &mix, const NodeDefinition &node,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorker &worker,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker,
std::atomic<bool> *syncSubmitSuccess) {
string blockHash;

Expand Down Expand Up @@ -316,7 +316,7 @@ void BlockMakerEth::_submitBlockThread(const string &nonce, const string &header
}

void BlockMakerEth::saveBlockToDB(const string &nonce, const string &header, const string &blockHash, const uint32_t height,
const string &chain, const uint64_t networkDiff, const StratumWorker &worker) {
const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker) {
const string nowStr = date("%F %T");
string sql;
sql = Strings::Format("INSERT INTO `found_blocks` "
Expand Down
6 changes: 3 additions & 3 deletions src/eth/BlockMakerEth.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class BlockMakerEth : public BlockMaker

private:
void submitBlockNonBlocking(const string &nonce, const string &header, const string &mix, const vector<NodeDefinition> &nodes,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorker &worker);
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker);
void _submitBlockThread(const string &nonce, const string &header, const string &mix, const NodeDefinition &node,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorker &worker,
const uint32_t height, const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker,
std::atomic<bool> *syncSubmitSuccess);
void saveBlockToDB(const string &nonce, const string &header, const string &blockHash, const uint32_t height,
const string &chain, const uint64_t networkDiff, const StratumWorker &worker);
const string &chain, const uint64_t networkDiff, const StratumWorkerPlain &worker);

static bool submitBlock(const string &nonce, const string &header, const string &mix,
const string &rpcUrl, const string &rpcUserPass,
Expand Down
Loading

0 comments on commit b98e68f

Please sign in to comment.