forked from stellar/stellar-core
/
VerifyBucketWork.cpp
135 lines (122 loc) · 4.21 KB
/
VerifyBucketWork.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2015 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "historywork/VerifyBucketWork.h"
#include "bucket/BucketManager.h"
#include "crypto/Hex.h"
#include "crypto/SHA.h"
#include "main/Application.h"
#include "main/ErrorMessages.h"
#include "util/Fs.h"
#include "util/Logging.h"
#include <fmt/format.h>
#include <Tracy.hpp>
#include <medida/meter.h>
#include <medida/metrics_registry.h>
#include <fstream>
namespace stellar
{
VerifyBucketWork::VerifyBucketWork(
Application& app, std::map<std::string, std::shared_ptr<Bucket>>& buckets,
std::string const& bucketFile, uint256 const& hash)
: BasicWork(app, "verify-bucket-hash-" + bucketFile, BasicWork::RETRY_NEVER)
, mBuckets(buckets)
, mBucketFile(bucketFile)
, mHash(hash)
, mVerifyBucketSuccess(app.getMetrics().NewMeter(
{"history", "verify-bucket", "success"}, "event"))
, mVerifyBucketFailure(app.getMetrics().NewMeter(
{"history", "verify-bucket", "failure"}, "event"))
{
}
BasicWork::State
VerifyBucketWork::onRun()
{
ZoneScoped;
if (mDone)
{
if (mEc)
{
mVerifyBucketFailure.Mark();
return State::WORK_FAILURE;
}
adoptBucket();
mVerifyBucketSuccess.Mark();
return State::WORK_SUCCESS;
}
spawnVerifier();
return State::WORK_WAITING;
}
void
VerifyBucketWork::adoptBucket()
{
ZoneScoped;
assert(mDone);
assert(!mEc);
auto b = mApp.getBucketManager().adoptFileAsBucket(mBucketFile, mHash,
/*objectsPut=*/0,
/*bytesPut=*/0);
mBuckets[binToHex(mHash)] = b;
}
void
VerifyBucketWork::spawnVerifier()
{
std::string filename = mBucketFile;
uint256 hash = mHash;
Application& app = this->mApp;
std::weak_ptr<VerifyBucketWork> weak(
std::static_pointer_cast<VerifyBucketWork>(shared_from_this()));
app.postOnBackgroundThread(
[&app, filename, weak, hash]() {
SHA256 hasher;
asio::error_code ec;
{
ZoneNamedN(verifyZone, "bucket verify", true);
CLOG(INFO, "History")
<< fmt::format("Verifying bucket {}", binToHex(hash));
// ensure that the stream gets its own scope to avoid race with
// main thread
std::ifstream in(filename, std::ifstream::binary);
char buf[4096];
while (in)
{
in.read(buf, sizeof(buf));
hasher.add(ByteSlice(buf, in.gcount()));
}
uint256 vHash = hasher.finish();
if (vHash == hash)
{
CLOG(DEBUG, "History")
<< "Verified hash (" << hexAbbrev(hash) << ") for "
<< filename;
}
else
{
CLOG(WARNING, "History")
<< "FAILED verifying hash for " << filename;
CLOG(WARNING, "History")
<< "expected hash: " << binToHex(hash);
CLOG(WARNING, "History")
<< "computed hash: " << binToHex(vHash);
CLOG(WARNING, "History") << POSSIBLY_CORRUPTED_HISTORY;
ec = std::make_error_code(std::errc::io_error);
}
}
// Not ideal, but needed to prevent race conditions with
// main thread, since BasicWork's state is not thread-safe. This is
// a temporary workaround, as a cleaner solution is needed.
app.postOnMainThread(
[weak, ec]() {
auto self = weak.lock();
if (self)
{
self->mEc = ec;
self->mDone = true;
self->wakeUp();
}
},
"VerifyBucket: finish");
},
"VerifyBucket: start in background");
}
}