Skip to content
Permalink
Browse files

Merge pull request #410 from de1acr0ix/share-refactoring

Refactor share relevant codes
  • Loading branch information
YihaoPeng committed Jan 3, 2020
2 parents c8c64ab + c7114e2 commit 457fad8e699732f2aafdc92c4aab54ec337232b7
@@ -153,7 +153,7 @@ class ShareLogParserT : public ShareLogParser {
shared_ptr<DuplicateShareChecker<SHARE>> dupShareChecker);
virtual ~ShareLogParserT();

bool init();
bool init() override;

// flush data to DB
bool flushToDB(bool removeExpiredData = true) override;
@@ -165,7 +165,7 @@ class ShareLogParserT : public ShareLogParser {
// read unchanged share data bin file, for example yestoday's file. it will
// use mmap() to get high performance. call only once will process
// the whole bin file
bool processUnchangedShareLog();
bool processUnchangedShareLog() override;

// today's file is still growing, return processed shares number.
int64_t processGrowingShareLog();
@@ -128,11 +128,11 @@ bool ShareLogWriterBase<SHARE>::flushToDisk() {
usedHandlers.insert(f);

string message;
uint32_t size = 0;
if (!share.SerializeToBuffer(message, size)) {
DLOG(INFO) << "base.SerializeToArray failed!" << std::endl;
if (!share.SerializeToString(&message)) {
DLOG(INFO) << "share.SerializeToString failed!" << std::endl;
continue;
}
uint32_t size = message.size();
f->write((char *)&size, sizeof(uint32_t));
f->write((char *)message.data(), size);
}
@@ -104,7 +104,7 @@ class ShareStats {
};

/////////////////////////////// ShareStatsDay ////////////////////////////////
static uint64_t sumRejectShares(
inline uint64_t sumRejectShares(
const std::map<uint32_t /* reason */, uint64_t /* share */> &rejectShares) {
uint64_t sum = 0;
for (const auto &itr : rejectShares) {
@@ -113,7 +113,7 @@ static uint64_t sumRejectShares(
return sum;
}

static string generateRejectDetail(
inline string generateRejectDetail(
const std::map<uint32_t /* reason */, uint64_t /* share */> &rejectShares) {
string rejectDetail = "{";

@@ -264,4 +264,33 @@ struct LocalJob {

bool operator==(uint64_t jobId) const { return jobId_ == jobId; }
};

namespace sharebase {

template <typename ShareMsg>
class Serializable : public ShareMsg {
public:
bool SerializeToStringWithVersion(string &data) const {
data.resize(sizeof(uint32_t));
*reinterpret_cast<uint32_t *>(&data[0]) = this->version();
return this->AppendToString(&data);
}
};

template <typename Derived, typename ShareMsg>
class Unserializable : public Serializable<ShareMsg> {
public:
bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {
if (nullptr == data || size <= sizeof(size)) {
return false;
}

auto version = *reinterpret_cast<const uint32_t *>(data);
return version == Derived::CURRENT_VERSION &&
this->ParseFromArray(data + sizeof(uint32_t), size - sizeof(uint32_t));
}
};

}

#endif
@@ -38,7 +38,8 @@
// If there is no forward compatibility, one of the versions of Share
// will be considered invalid, resulting in loss of users' hashrate.

class ShareBeam : public sharebase::BeamMsg {
class ShareBeam
: public sharebase::Unserializable<ShareBeam, sharebase::BeamMsg> {
public:
const static uint32_t CURRENT_VERSION =
0x0bea0001u; // first 0bea: BEAM, second 0001: version 1
@@ -108,75 +109,6 @@ class ShareBeam : public sharebase::BeamMsg {
status(),
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);

if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "base.SerializeToArray failed!" << std::endl;
return false;
}

return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {
if (nullptr == data || size <= 0) {
return false;
}

const uint8_t *payload = data;
uint32_t version = *((uint32_t *)payload);

if (version == CURRENT_VERSION) {
if (!ParseFromArray(
(const uint8_t *)(payload + sizeof(uint32_t)),
size - sizeof(uint32_t))) {
DLOG(INFO) << "share ParseFromArray failed!";
return false;
}
} else {
DLOG(INFO) << "unknow share received! data size: " << size;
return false;
}

return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "base.SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

size_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }
};

class StratumJobBeam : public StratumJob {
@@ -179,11 +179,11 @@ void StratumMinerBeam::handleRequest_Submit(
DLOG(INFO) << share.toString();

std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!" << share.toString();
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}

server.sendShare2Kafka(localJob->chainId_, message.data(), size);
server.sendShare2Kafka(localJob->chainId_, message.data(), message.size());
}
@@ -563,7 +563,7 @@ string reverse16bit(string &&hash) {
*i = *j;
*j = tmp;
}
return hash;
return std::move(hash);
}

string reverse16bit(const string &hash) {
@@ -163,7 +163,7 @@ struct ShareBitcoinBytesV2 {
}
};

class ShareBitcoin : public sharebase::BitcoinMsg {
class ShareBitcoin : public sharebase::Serializable<sharebase::BitcoinMsg> {
public:
ShareBitcoin() {
set_version(CURRENT_VERSION);
@@ -250,16 +250,6 @@ class ShareBitcoin : public sharebase::BitcoinMsg {
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);
if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "share SerializeToArray failed!";
return false;
}
return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {

if (nullptr == data || size <= 0) {
@@ -339,40 +329,6 @@ class ShareBitcoin : public sharebase::BitcoinMsg {
return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

size_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }

public:
const static uint32_t BYTES_VERSION = 0x00010003u;
const static uint32_t CURRENT_VERSION = 0x00010004u;
@@ -357,13 +357,12 @@ void StratumMinerBitcoin::handleRequest_Submit(
chainId, (char *)&sharev1, sizeof(sharev1));
} else {
std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!"
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}
server.sendShare2Kafka(chainId, message.data(), size);
server.sendShare2Kafka(chainId, message.data(), message.size());
}
}
}
@@ -77,7 +77,7 @@ class ShareBytomBytesVersion {
}
};

class ShareBytom : public sharebase::BytomMsg {
class ShareBytom : public sharebase::Serializable<sharebase::BytomMsg> {
public:
const static uint32_t BYTES_VERSION =
0x00030001u; // first 0003: bytom, second 0001: version 1.
@@ -145,32 +145,6 @@ class ShareBytom : public sharebase::BytomMsg {
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);
if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "share SerializeToArray failed!" << std::endl;
return false;
}
return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "share SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {

if (nullptr == data || size <= 0) {
@@ -219,24 +193,6 @@ class ShareBytom : public sharebase::BytomMsg {

return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

uint32_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }
};

struct BlockHeaderBytom {
@@ -287,12 +287,12 @@ void StratumMinerBytom::handleRequest_Submit(
if (isSendShareToKafka) {

std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!" << share.toString();
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}
server.sendShare2Kafka(localJob->chainId_, message.data(), size);
server.sendShare2Kafka(localJob->chainId_, message.data(), message.size());

// string shareInHex;
// Bin2Hex((uint8_t *) &share, sizeof(ShareBytom), shareInHex);

0 comments on commit 457fad8

Please sign in to comment.
You can’t perform that action at this time.