Skip to content
Permalink
Browse files

Refactor share relevant codes

  • Loading branch information
de1acr0ix committed Dec 30, 2019
1 parent 786d541 commit c7114e242b9f2b38a739881fbf88812a680537cc
@@ -128,11 +128,11 @@ bool ShareLogWriterBase<SHARE>::flushToDisk() {
usedHandlers.insert(f);

string message;
uint32_t size = 0;
if (!share.SerializeToBuffer(message, size)) {
DLOG(INFO) << "base.SerializeToArray failed!" << std::endl;
if (!share.SerializeToString(&message)) {
DLOG(INFO) << "share.SerializeToString failed!" << std::endl;
continue;
}
uint32_t size = message.size();
f->write((char *)&size, sizeof(uint32_t));
f->write((char *)message.data(), size);
}
@@ -264,4 +264,33 @@ struct LocalJob {

bool operator==(uint64_t jobId) const { return jobId_ == jobId; }
};

namespace sharebase {

template <typename ShareMsg>
class Serializable : public ShareMsg {
public:
bool SerializeToStringWithVersion(string &data) const {
data.resize(sizeof(uint32_t));
*reinterpret_cast<uint32_t *>(&data[0]) = this->version();
return this->AppendToString(&data);
}
};

template <typename Derived, typename ShareMsg>
class Unserializable : public Serializable<ShareMsg> {
public:
bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {
if (nullptr == data || size <= sizeof(size)) {
return false;
}

auto version = *reinterpret_cast<const uint32_t *>(data);
return version == Derived::CURRENT_VERSION &&
this->ParseFromArray(data + sizeof(uint32_t), size - sizeof(uint32_t));
}
};

}

#endif
@@ -38,7 +38,8 @@
// If there is no forward compatibility, one of the versions of Share
// will be considered invalid, resulting in loss of users' hashrate.

class ShareBeam : public sharebase::BeamMsg {
class ShareBeam
: public sharebase::Unserializable<ShareBeam, sharebase::BeamMsg> {
public:
const static uint32_t CURRENT_VERSION =
0x0bea0001u; // first 0bea: BEAM, second 0001: version 1
@@ -108,75 +109,6 @@ class ShareBeam : public sharebase::BeamMsg {
status(),
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);

if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "base.SerializeToArray failed!" << std::endl;
return false;
}

return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {
if (nullptr == data || size <= 0) {
return false;
}

const uint8_t *payload = data;
uint32_t version = *((uint32_t *)payload);

if (version == CURRENT_VERSION) {
if (!ParseFromArray(
(const uint8_t *)(payload + sizeof(uint32_t)),
size - sizeof(uint32_t))) {
DLOG(INFO) << "share ParseFromArray failed!";
return false;
}
} else {
DLOG(INFO) << "unknow share received! data size: " << size;
return false;
}

return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "base.SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

size_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }
};

class StratumJobBeam : public StratumJob {
@@ -179,11 +179,11 @@ void StratumMinerBeam::handleRequest_Submit(
DLOG(INFO) << share.toString();

std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!" << share.toString();
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}

server.sendShare2Kafka(localJob->chainId_, message.data(), size);
server.sendShare2Kafka(localJob->chainId_, message.data(), message.size());
}
@@ -163,7 +163,7 @@ struct ShareBitcoinBytesV2 {
}
};

class ShareBitcoin : public sharebase::BitcoinMsg {
class ShareBitcoin : public sharebase::Serializable<sharebase::BitcoinMsg> {
public:
ShareBitcoin() {
set_version(CURRENT_VERSION);
@@ -250,16 +250,6 @@ class ShareBitcoin : public sharebase::BitcoinMsg {
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);
if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "share SerializeToArray failed!";
return false;
}
return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {

if (nullptr == data || size <= 0) {
@@ -339,40 +329,6 @@ class ShareBitcoin : public sharebase::BitcoinMsg {
return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

size_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }

public:
const static uint32_t BYTES_VERSION = 0x00010003u;
const static uint32_t CURRENT_VERSION = 0x00010004u;
@@ -357,13 +357,12 @@ void StratumMinerBitcoin::handleRequest_Submit(
chainId, (char *)&sharev1, sizeof(sharev1));
} else {
std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!"
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}
server.sendShare2Kafka(chainId, message.data(), size);
server.sendShare2Kafka(chainId, message.data(), message.size());
}
}
}
@@ -77,7 +77,7 @@ class ShareBytomBytesVersion {
}
};

class ShareBytom : public sharebase::BytomMsg {
class ShareBytom : public sharebase::Serializable<sharebase::BytomMsg> {
public:
const static uint32_t BYTES_VERSION =
0x00030001u; // first 0003: bytom, second 0001: version 1.
@@ -145,32 +145,6 @@ class ShareBytom : public sharebase::BytomMsg {
StratumStatus::toString(status()));
}

bool SerializeToBuffer(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size);
if (!SerializeToArray((uint8_t *)data.data(), size)) {
DLOG(INFO) << "share SerializeToArray failed!" << std::endl;
return false;
}
return true;
}

bool SerializeToArrayWithLength(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

*((uint32_t *)data.data()) = size;
uint8_t *payload = (uint8_t *)data.data();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "share SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

bool UnserializeWithVersion(const uint8_t *data, uint32_t size) {

if (nullptr == data || size <= 0) {
@@ -219,24 +193,6 @@ class ShareBytom : public sharebase::BytomMsg {

return true;
}

bool SerializeToArrayWithVersion(string &data, uint32_t &size) const {
size = ByteSize();
data.resize(size + sizeof(uint32_t));

uint8_t *payload = (uint8_t *)data.data();
*((uint32_t *)payload) = version();

if (!SerializeToArray(payload + sizeof(uint32_t), size)) {
DLOG(INFO) << "SerializeToArray failed!";
return false;
}

size += sizeof(uint32_t);
return true;
}

uint32_t getsharelength() { return IsInitialized() ? ByteSize() : 0; }
};

struct BlockHeaderBytom {
@@ -287,12 +287,12 @@ void StratumMinerBytom::handleRequest_Submit(
if (isSendShareToKafka) {

std::string message;
uint32_t size = 0;
if (!share.SerializeToArrayWithVersion(message, size)) {
LOG(ERROR) << "share SerializeToBuffer failed!" << share.toString();
if (!share.SerializeToStringWithVersion(message)) {
LOG(ERROR) << "share SerializeToStringWithVersion failed!"
<< share.toString();
return;
}
server.sendShare2Kafka(localJob->chainId_, message.data(), size);
server.sendShare2Kafka(localJob->chainId_, message.data(), message.size());

// string shareInHex;
// Bin2Hex((uint8_t *) &share, sizeof(ShareBytom), shareInHex);

0 comments on commit c7114e2

Please sign in to comment.
You can’t perform that action at this time.