Skip to content
Permalink
Browse files
Remove ByteSize for protobuf (#3816)
Updates to  remove warning: 'ByteSize' is deprecated.
  • Loading branch information
thinker0 committed Apr 12, 2022
1 parent 909f60a commit f07203a33b14726c6d964f235fae1e73ae42dd1b
Showing 7 changed files with 19 additions and 19 deletions.
@@ -47,8 +47,8 @@ void Client::SendRequest(std::unique_ptr<google::protobuf::Message> _request, vo
}

void Client::SendResponse(REQID _id, const google::protobuf::Message& _response) {
sp_int32 byte_size = _response.ByteSize();
sp_uint32 data_size = OutgoingPacket::SizeRequiredToPackString(_response.GetTypeName()) +
sp_int64 byte_size = _response.ByteSizeLong();
sp_uint64 data_size = OutgoingPacket::SizeRequiredToPackString(_response.GetTypeName()) +
REQID_size + OutgoingPacket::SizeRequiredToPackProtocolBuffer(byte_size);
auto opkt = new OutgoingPacket(data_size);
CHECK_EQ(opkt->PackString(_response.GetTypeName()), 0);
@@ -111,8 +111,8 @@ void Client::InternalSendRequest(std::unique_ptr<google::protobuf::Message> _req
context_map_[rid] = std::make_pair(_expected_response_type, _ctx);

// Make the outgoing packet
sp_int32 byte_size = _request->ByteSize();
sp_uint32 sop = OutgoingPacket::SizeRequiredToPackString(_request->GetTypeName()) + REQID_size +
sp_int64 byte_size = _request->ByteSizeLong();
sp_uint64 sop = OutgoingPacket::SizeRequiredToPackString(_request->GetTypeName()) + REQID_size +
OutgoingPacket::SizeRequiredToPackProtocolBuffer(byte_size);
auto opkt = new OutgoingPacket(sop);
CHECK_EQ(opkt->PackString(_request->GetTypeName()), 0);
@@ -144,8 +144,8 @@ void Client::InternalSendMessage(const google::protobuf::Message& _message) {
REQID rid = REQID_Generator::generate_zero_reqid();

// Make the outgoing packet
sp_int32 byte_size = _message.ByteSize();
sp_uint32 sop = OutgoingPacket::SizeRequiredToPackString(_message.GetTypeName()) + REQID_size +
sp_int64 byte_size = _message.ByteSizeLong();
sp_uint64 sop = OutgoingPacket::SizeRequiredToPackString(_message.GetTypeName()) + REQID_size +
OutgoingPacket::SizeRequiredToPackProtocolBuffer(byte_size);
auto opkt = new OutgoingPacket(sop);
CHECK_EQ(opkt->PackString(_message.GetTypeName()), 0);
@@ -41,8 +41,8 @@ sp_int32 Server::Stop() { return Stop_Base(); }

void Server::SendResponse(REQID _id, Connection* _connection,
const google::protobuf::Message& _response) {
sp_int32 byte_size = _response.ByteSize();
sp_uint32 data_size = OutgoingPacket::SizeRequiredToPackString(_response.GetTypeName()) +
sp_int64 byte_size = _response.ByteSizeLong();
sp_uint64 data_size = OutgoingPacket::SizeRequiredToPackString(_response.GetTypeName()) +
REQID_size + OutgoingPacket::SizeRequiredToPackProtocolBuffer(byte_size);
auto opkt = new OutgoingPacket(data_size);
CHECK_EQ(opkt->PackString(_response.GetTypeName()), 0);
@@ -208,8 +208,8 @@ void Server::InternalSendRequest(Connection* _conn, google::protobuf::Message* _
context_map_[rid] = std::make_pair(_response_placeholder, _ctx);

// Make the outgoing packet
sp_int32 byte_size = _request->ByteSize();
sp_uint32 sop = OutgoingPacket::SizeRequiredToPackString(_request->GetTypeName()) + REQID_size +
sp_int64 byte_size = _request->ByteSizeLong();
sp_uint64 sop = OutgoingPacket::SizeRequiredToPackString(_request->GetTypeName()) + REQID_size +
OutgoingPacket::SizeRequiredToPackProtocolBuffer(byte_size);
auto opkt = new OutgoingPacket(sop);
CHECK_EQ(opkt->PackString(_request->GetTypeName()), 0);
@@ -76,9 +76,9 @@ TEST(OutgoingPacketTest, test_protobuf) {

TestMessage tm;
tm.add_message("abcdefghijklmnopqrstuvwxyz");
op.PackProtocolBuffer(tm, tm.ByteSize());
op.PackProtocolBuffer(tm, tm.ByteSizeLong());

sp_uint32 explen = PacketHeader::header_size() + sizeof(sp_uint32) + tm.ByteSize();
sp_uint64 explen = PacketHeader::header_size() + sizeof(sp_uint32) + tm.ByteSizeLong();
EXPECT_EQ(explen, op.GetBytesFilled());
}

@@ -149,18 +149,18 @@ void StMgrClient::HandlePhysicalPlan(

void StMgrClient::HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
gatewayMetrics_->updateReceivedPacketsCount(1);
gatewayMetrics_->updateReceivedPacketsSize(msg->ByteSize());
gatewayMetrics_->updateReceivedPacketsSize(msg->ByteSizeLong());
tupleWatcher_(std::move(msg));
}

void StMgrClient::SendTupleMessage(const proto::system::HeronTupleSet& msg) {
if (IsConnected()) {
gatewayMetrics_->updateSentPacketsCount(1);
gatewayMetrics_->updateSentPacketsSize(msg.ByteSize());
gatewayMetrics_->updateSentPacketsSize(msg.ByteSizeLong());
SendMessage(msg);
} else {
gatewayMetrics_->updateDroppedPacketsCount(1);
gatewayMetrics_->updateDroppedPacketsSize(msg.ByteSize());
gatewayMetrics_->updateDroppedPacketsSize(msg.ByteSizeLong());
if (++ndropped_messages_ % 100 == 0) {
LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
<< instanceProto_.stmgr_id() << " because it is not connected";
@@ -197,7 +197,7 @@ bool StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage& _msg)
tuple_set->ParsePartialFromString(_msg.set());

if (!IsConnected() || (droptuples_upon_backpressure_ && HasCausedBackPressure())) {
stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS_LOST)->incr_by(_msg.ByteSize());
stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS_LOST)->incr_by(_msg.ByteSizeLong());
if (tuple_set->has_data()) {
stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS_LOST)
->incr_by(tuple_set->data().tuples_size());
@@ -220,7 +220,7 @@ bool StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage& _msg)
}
retval = false;
} else {
stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS)->incr_by(_msg.ByteSize());
stmgr_client_metrics_->scope(METRIC_BYTES_TO_STMGRS)->incr_by(_msg.ByteSizeLong());
if (tuple_set->has_data()) {
stmgr_client_metrics_->scope(METRIC_DATA_TUPLES_TO_STMGRS)
->incr_by(tuple_set->data().tuples_size());
@@ -174,7 +174,7 @@ void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
tuple_set = __global_protobuf_pool_acquire__(tuple_set);
tuple_set->ParsePartialFromString(_message->set());

bytes_from_stmgrs_metrics_->incr_by(_message->ByteSize());
bytes_from_stmgrs_metrics_->incr_by(_message->ByteSizeLong());
if (tuple_set->has_data()) {
tuples_from_stmgrs_metrics_->incr_by(tuple_set->data().tuples_size());
} else if (tuple_set->has_control()) {
@@ -777,7 +777,7 @@ void StMgr::ProcessAcksAndFails(sp_int32 _src_task_id, sp_int32 _task_id,
void StMgr::HandleInstanceData(const sp_int32 _src_task_id, bool _local_spout,
pool_unique_ptr<proto::system::HeronTupleSet> _message) {
instance_bytes_received_metrics_->scope(std::to_string(_src_task_id))
->incr_by(_message->ByteSize());
->incr_by(_message->ByteSizeLong());

if (stateful_restorer_ && stateful_restorer_->InProgress()) {
LOG(INFO) << "Dropping data received from instance " << _src_task_id

0 comments on commit f07203a

Please sign in to comment.