Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/precommit batcher #422

Merged
merged 19 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/miner/storage_fsm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ target_link_libraries(deal_info_manager
api
)

add_library(batcher
impl/precommit_batcher_impl.cpp
)
target_link_libraries(batcher
api
p2p::asio_scheduler
)

add_library(storage_fsm
impl/sealing_impl.cpp
impl/checks.cpp
Expand Down
150 changes: 150 additions & 0 deletions core/miner/storage_fsm/impl/precommit_batcher_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "miner/storage_fsm/impl/precommit_batcher_impl.hpp"

#include "vm/actor/actor.hpp"
#include "vm/actor/builtin/v5/miner/miner_actor.hpp"

namespace fc::mining {
using api::kPushNoSpec;
using libp2p::protocol::scheduler::toTicks;
using primitives::ChainEpoch;
using vm::actor::MethodParams;
using vm::actor::builtin::types::miner::kChainFinality;
Elestrias marked this conversation as resolved.
Show resolved Hide resolved
using vm::actor::builtin::v5::miner::PreCommitBatch;

PreCommitBatcherImpl::PreCommitEntry::PreCommitEntry(
const TokenAmount &number, const SectorPreCommitInfo &info)
: deposit(number), precommit_info(info){};

PreCommitBatcherImpl::PreCommitBatcherImpl(
const Ticks &max_time,
std::shared_ptr<FullNodeApi> api,
const Address &miner_address,
const std::shared_ptr<libp2p::protocol::Scheduler> &scheduler)
: max_delay_(max_time),
api_(std::move(api)),
miner_address_(miner_address),
closest_cutoff_(max_time) {
cutoff_start_ = std::chrono::system_clock::now();
logger_ = common::createLogger("batcher");
logger_->info("Batcher has been started");
handle_ = scheduler->schedule(max_delay_, [&]() {
std::unique_lock<std::mutex> locker(mutex_);
const auto maybe_result = sendBatch();
for (const auto &[key, cb] : callbacks_) {
cb(maybe_result);
}
callbacks_.clear();
cutoff_start_ = std::chrono::system_clock::now();
closest_cutoff_ = max_delay_;
handle_.reschedule(max_delay_);
});
}

outcome::result<CID> PreCommitBatcherImpl::sendBatch() {
// TODO(Elestrias): [FIL-398] goodFunds = mutualDeposit + MaxFee; - for
// checking payable
if (batch_storage_.size() != 0) {
logger_->info("Sending procedure started");

OUTCOME_TRY(head, api_->ChainHead());
OUTCOME_TRY(minfo, api_->StateMinerInfo(miner_address_, head->key));

PreCommitBatch::Params params;

for (const auto &data : batch_storage_) {
mutual_deposit_ += data.second.deposit;
params.sectors.push_back(data.second.precommit_info);
}

OUTCOME_TRY(encodedParams, codec::cbor::encode(params));

OUTCOME_TRY(
signed_message,
api_->MpoolPushMessage(
vm::message::UnsignedMessage(miner_address_,
minfo.worker, // TODO: handle worker
0,
mutual_deposit_,
{},
{},
PreCommitBatch::Number,
MethodParams{encodedParams}),
kPushNoSpec));

mutual_deposit_ = 0;
batch_storage_.clear();
logger_->info("Sending procedure completed");
cutoff_start_ = std::chrono::system_clock::now();
Elestrias marked this conversation as resolved.
Show resolved Hide resolved
return signed_message.getCid();
}
cutoff_start_ = std::chrono::system_clock::now();
return ERROR_TEXT("Empty Batcher");
}

void PreCommitBatcherImpl::forceSend() {
std::unique_lock<std::mutex> locker(mutex_);
forceSendWithoutLock();
}

void PreCommitBatcherImpl::forceSendWithoutLock(){
const auto maybe_result = sendBatch();
for (const auto &[key, cb] : callbacks_) {
cb(maybe_result);
}
callbacks_.clear();
cutoff_start_ = std::chrono::system_clock::now();
closest_cutoff_ = max_delay_;
handle_.reschedule(max_delay_);
}

void PreCommitBatcherImpl::setPreCommitCutoff(const ChainEpoch &current_epoch,
const SectorInfo &sector_info) {
ChainEpoch cutoff_epoch =
sector_info.ticket_epoch
+ static_cast<int64_t>(kEpochsInDay + kChainFinality);
ChainEpoch start_epoch{};
for (const auto &piece : sector_info.pieces) {
if (!piece.deal_info) {
continue;
}
start_epoch = piece.deal_info->deal_schedule.start_epoch;
if (start_epoch < cutoff_epoch) {
cutoff_epoch = start_epoch;
}
}
if (cutoff_epoch <= current_epoch) {
forceSendWithoutLock();
} else {
const Ticks temp_cutoff = toTicks(std::chrono::seconds(
(cutoff_epoch - current_epoch) * kEpochDurationSeconds));
if ((closest_cutoff_
- toTicks(std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - cutoff_start_))
> temp_cutoff)) {
cutoff_start_ = std::chrono::system_clock::now();
handle_.reschedule(temp_cutoff);
closest_cutoff_ = temp_cutoff;
}
}
}

outcome::result<void> PreCommitBatcherImpl::addPreCommit(
const SectorInfo &sector_info,
const TokenAmount &deposit,
const api::SectorPreCommitInfo &precommit_info,
const PrecommitCallback &callback) {
std::unique_lock<std::mutex> locker(mutex_, std::defer_lock);
OUTCOME_TRY(head, api_->ChainHead());

const auto &sn = sector_info.sector_number;
batch_storage_[sn] = PreCommitEntry(deposit, precommit_info);
callbacks_[sn] = callback; // TODO: batcher upper limit
setPreCommitCutoff(head->epoch(), sector_info);
return outcome::success();
}
} // namespace fc::mining
73 changes: 73 additions & 0 deletions core/miner/storage_fsm/impl/precommit_batcher_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "miner/storage_fsm/precommit_batcher.hpp"

#include <chrono>
#include <libp2p/protocol/common/scheduler.hpp>
#include <map>
#include <mutex>
#include "api/full_node/node_api.hpp"
#include "primitives/address/address.hpp"

namespace fc::mining {
using api::FullNodeApi;
using libp2p::protocol::Scheduler;
using libp2p::protocol::scheduler::Handle;
using libp2p::protocol::scheduler::Ticks;
using primitives::SectorNumber;
using primitives::address::Address;
Alexey-N-Chernyshov marked this conversation as resolved.
Show resolved Hide resolved

class PreCommitBatcherImpl : public PreCommitBatcher {
public:
PreCommitBatcherImpl(const Ticks &max_time,
std::shared_ptr<FullNodeApi> api,
const Address &miner_address,
const std::shared_ptr<Scheduler> &scheduler);

outcome::result<void> addPreCommit(
const SectorInfo &sector_info,
const TokenAmount &deposit,
const SectorPreCommitInfo &precommit_info,
const PrecommitCallback &callback) override;

void forceSend() override;

private:
struct PreCommitEntry {
PreCommitEntry() = default;

PreCommitEntry(const TokenAmount &number,
const SectorPreCommitInfo &info);

PreCommitEntry &operator=(const PreCommitEntry &other) = default;

TokenAmount deposit{};
SectorPreCommitInfo precommit_info;
};

std::mutex mutex_;
TokenAmount mutual_deposit_;
std::map<SectorNumber, PreCommitEntry> batch_storage_;
Ticks max_delay_;
std::shared_ptr<FullNodeApi> api_;
Address miner_address_;
Handle handle_;
Ticks closest_cutoff_;
std::chrono::system_clock::time_point cutoff_start_;
common::Logger logger_;
std::map<SectorNumber, PrecommitCallback> callbacks_;

void forceSendWithoutLock();

void setPreCommitCutoff(const ChainEpoch &current_epoch,
const SectorInfo &sector_info);

outcome::result<CID> sendBatch();
};

} // namespace fc::mining
29 changes: 29 additions & 0 deletions core/miner/storage_fsm/precommit_batcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once
Alexey-N-Chernyshov marked this conversation as resolved.
Show resolved Hide resolved

#include "miner/storage_fsm/types.hpp"

namespace fc::mining {
using primitives::TokenAmount;
using types::SectorInfo;
using vm::actor::builtin::types::miner::SectorPreCommitInfo;
using PrecommitCallback = std::function<void(const outcome::result<CID> &)>;

class PreCommitBatcher {
public:
virtual ~PreCommitBatcher() = default;

virtual outcome::result<void> addPreCommit(
const SectorInfo &sector_info,
const TokenAmount &deposit,
const SectorPreCommitInfo &precommit_info,
const PrecommitCallback &callback) = 0;

virtual void forceSend() = 0;
};

} // namespace fc::mining
1 change: 0 additions & 1 deletion core/miner/storage_fsm/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#pragma once

#include "miner/storage_fsm/sealing_states.hpp"
#include "miner/storage_fsm/types.hpp"
#include "primitives/piece/piece.hpp"
#include "primitives/sector/sector.hpp"
#include "primitives/tipset/tipset_key.hpp"
Expand Down
40 changes: 40 additions & 0 deletions core/vm/actor/builtin/v4/miner/miner_actor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "vm/actor/builtin/v3/miner/miner_actor.hpp"

namespace fc::vm::actor::builtin::v4::miner {

// TODO implement
using Construct = v3::miner::Construct;
using ControlAddresses = v3::miner::ControlAddresses;
using ChangeWorkerAddress = v3::miner::ChangeWorkerAddress;
using ChangePeerId = v3::miner::ChangePeerId;
using SubmitWindowedPoSt = v3::miner::SubmitWindowedPoSt;
using PreCommitSector = v3::miner::PreCommitSector;
using ProveCommitSector = v3::miner::ProveCommitSector;
using ExtendSectorExpiration = v3::miner::ExtendSectorExpiration;
using TerminateSectors = v3::miner::TerminateSectors;
using DeclareFaults = v3::miner::DeclareFaults;
using DeclareFaultsRecovered = v3::miner::DeclareFaultsRecovered;
using OnDeferredCronEvent = v3::miner::OnDeferredCronEvent;
using CheckSectorProven = v3::miner::CheckSectorProven;
using ApplyRewards = v3::miner::ApplyRewards;
using ReportConsensusFault = v3::miner::ReportConsensusFault;
using WithdrawBalance = v3::miner::WithdrawBalance;
using ConfirmSectorProofsValid = v3::miner::ConfirmSectorProofsValid;
using ChangeMultiaddresses = v3::miner::ChangeMultiaddresses;
using CompactPartitions = v3::miner::CompactPartitions;
using CompactSectorNumbers = v3::miner::CompactSectorNumbers;
using ConfirmUpdateWorkerKey = v3::miner::ConfirmUpdateWorkerKey;
using RepayDebt = v3::miner::RepayDebt;
using ChangeOwnerAddress = v3::miner::ChangeOwnerAddress;
using DisputeWindowedPoSt = v3::miner::DisputeWindowedPoSt;

// extern const ActorExports exports;

} // namespace fc::vm::actor::builtin::v4::miner
56 changes: 56 additions & 0 deletions core/vm/actor/builtin/v5/miner/miner_actor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "vm/actor/builtin/v4/miner/miner_actor.hpp"

#include "vm/actor/builtin/types/miner/sector_info.hpp"

namespace fc::vm::actor::builtin::v5::miner {
using types::miner::SectorPreCommitInfo;

// TODO implement
using Construct = v4::miner::Construct;
using ControlAddresses = v4::miner::ControlAddresses;
using ChangeWorkerAddress = v4::miner::ChangeWorkerAddress;
using ChangePeerId = v4::miner::ChangePeerId;
using SubmitWindowedPoSt = v4::miner::SubmitWindowedPoSt;
using PreCommitSector = v4::miner::PreCommitSector;
using ProveCommitSector = v4::miner::ProveCommitSector;
using ExtendSectorExpiration = v4::miner::ExtendSectorExpiration;
using TerminateSectors = v4::miner::TerminateSectors;
using DeclareFaults = v4::miner::DeclareFaults;
using DeclareFaultsRecovered = v4::miner::DeclareFaultsRecovered;
using OnDeferredCronEvent = v4::miner::OnDeferredCronEvent;
using CheckSectorProven = v4::miner::CheckSectorProven;
using ApplyRewards = v4::miner::ApplyRewards;
using ReportConsensusFault = v4::miner::ReportConsensusFault;
using WithdrawBalance = v4::miner::WithdrawBalance;
using ConfirmSectorProofsValid = v4::miner::ConfirmSectorProofsValid;
using ChangeMultiaddresses = v4::miner::ChangeMultiaddresses;
using CompactPartitions = v4::miner::CompactPartitions;
using CompactSectorNumbers = v4::miner::CompactSectorNumbers;
using ConfirmUpdateWorkerKey = v4::miner::ConfirmUpdateWorkerKey;
using RepayDebt = v4::miner::RepayDebt;
using ChangeOwnerAddress = v4::miner::ChangeOwnerAddress;
using DisputeWindowedPoSt = v4::miner::DisputeWindowedPoSt;

/**
* Collects and stores precommit messages to make a packaged sending of a
* several messages within one transaction which reduces the general amount of
* transactions in the network with reduction of a gas fee for transactions.
*/
struct PreCommitBatch : ActorMethodBase<25> {
struct Params {
std::vector<SectorPreCommitInfo> sectors;
};
ACTOR_METHOD_DECL();
};
CBOR_TUPLE(PreCommitBatch::Params, sectors);

// extern const ActorExports exports;

} // namespace fc::vm::actor::builtin::v5::miner
6 changes: 6 additions & 0 deletions test/core/miner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ addtest(events_test
target_link_libraries(events_test
events)

addtest(batcher_test
precommit_batcher_test.cpp
)
target_link_libraries(batcher_test
batcher)

addtest(precommit_policy_test
precommit_policy_test.cpp
)
Expand Down
Loading