Skip to content

Commit

Permalink
Add checksum for ssd cache (facebookincubator#9822)
Browse files Browse the repository at this point in the history
Summary:
This PR introduces an optional checksum feature for the SSD cache. When
enabled, a CRC-based checksum is calculated for each cache entry and
stored in the checkpoint file. Additionally, if read verification is
activated, the checksum is recalculated and verified against the stored
value when cache data is recovered from the checkpoint or loaded from
the SSD.
A new counter, `ssd_cache_read_corruptions`, is added to track the
number of corruptions detected due to checksum mismatches.

Pull Request resolved: facebookincubator#9822

Reviewed By: xiaoxmeng

Differential Revision: D57394558

Pulled By: zacw7

fbshipit-source-id: 5a1ee22c11e63fe4fe4a3f7c67775ffcd942068e
  • Loading branch information
zacw7 authored and Joe-Abraham committed Jun 7, 2024
1 parent d6d90d6 commit c6e2e90
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 125 deletions.
3 changes: 3 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ void registerVeloxMetrics() {
// Total number of errors while reading from SSD cache files.
DEFINE_METRIC(kMetricSsdCacheReadSsdErrors, facebook::velox::StatType::SUM);

// Total number of corrupted SSD data read detected by checksum.
DEFINE_METRIC(kMetricSsdCacheReadCorruptions, facebook::velox::StatType::SUM);

// Total number of errors while reading from SSD checkpoint files.
DEFINE_METRIC(
kMetricSsdCacheReadCheckpointErrors, facebook::velox::StatType::SUM);
Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ constexpr folly::StringPiece kMetricSsdCacheWriteSsdDropped{
constexpr folly::StringPiece kMetricSsdCacheWriteCheckpointErrors{
"velox.ssd_cache_write_checkpoint_errors"};

constexpr folly::StringPiece kMetricSsdCacheReadCorruptions{
"velox.ssd_cache_read_corruptions"};

constexpr folly::StringPiece kMetricSsdCacheReadSsdErrors{
"velox.ssd_cache_read_ssd_errors"};

Expand Down
2 changes: 2 additions & 0 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ void PeriodicStatsReporter::reportCacheStats() {
deltaSsdStats.writeCheckpointErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheReadSsdErrors, deltaSsdStats.readSsdErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheReadCorruptions, deltaSsdStats.readSsdCorruptions);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheReadCheckpointErrors,
deltaSsdStats.readCheckpointErrors);
Expand Down
5 changes: 4 additions & 1 deletion velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadSsdErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadCorruptions.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheCheckpointsRead.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheCheckpointsWritten.str()), 0);
Expand Down Expand Up @@ -503,6 +504,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
newSsdStats->writeSsdDropped = 10;
newSsdStats->writeCheckpointErrors = 10;
newSsdStats->readSsdErrors = 10;
newSsdStats->readSsdCorruptions = 10;
newSsdStats->readCheckpointErrors = 10;
cache.updateStats(
{.numHit = 10,
Expand Down Expand Up @@ -548,13 +550,14 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadSsdErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadCorruptions.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheReadCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheCheckpointsRead.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheCheckpointsWritten.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheRegionsEvicted.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutEntries.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheAgedOutRegions.str()), 1);
ASSERT_EQ(counterMap.size(), 50);
ASSERT_EQ(counterMap.size(), 51);
}
}

Expand Down
4 changes: 4 additions & 0 deletions velox/common/caching/CacheTTLController.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class CacheTTLController {
return instance_.get();
}

static void testingClear() {
instance_ = nullptr;
}

/// Add file opening info for a file identified by fileNum. Return true if a
/// new file entry is inserted, or if the existing file entry is updated
/// while cache deletion for the file is in progress. Return false otherwise
Expand Down
12 changes: 11 additions & 1 deletion velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ SsdCache::SsdCache(
int32_t numShards,
folly::Executor* executor,
int64_t checkpointIntervalBytes,
bool disableFileCow)
bool disableFileCow,
bool checksumEnabled,
bool checksumReadVerificationEnabled)
: filePrefix_(filePrefix),
numShards_(numShards),
groupStats_(std::make_unique<FileGroupStats>()),
Expand All @@ -46,6 +48,12 @@ SsdCache::SsdCache(
filePrefix_.find("/") == 0,
"Ssd path '{}' does not start with '/' that points to local file system.",
filePrefix_);

if (checksumReadVerificationEnabled && !checksumEnabled) {
VELOX_SSD_CACHE_LOG(WARNING)
<< "Checksum read has been disabled as checksum is not enabled.";
checksumReadVerificationEnabled = false;
}
filesystems::getFileSystem(filePrefix_, nullptr)
->mkdir(std::filesystem::path(filePrefix).parent_path().string());

Expand All @@ -62,6 +70,8 @@ SsdCache::SsdCache(
fileMaxRegions,
checkpointIntervalBytes / numShards,
disableFileCow,
checksumEnabled,
checksumReadVerificationEnabled,
executor_));
}
}
Expand Down
4 changes: 3 additions & 1 deletion velox/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class SsdCache {
int32_t numShards,
folly::Executor* executor,
int64_t checkpointIntervalBytes = 0,
bool disableFileCow = false);
bool disableFileCow = false,
bool checksumWriteEnabled = false,
bool checksumReadVerificationEnabled = false);

/// Returns the shard corresponding to 'fileId'. 'fileId' is a file id from
/// e.g. FileCacheKey.
Expand Down
108 changes: 89 additions & 19 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/Crc.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
Expand Down Expand Up @@ -124,10 +125,15 @@ SsdFile::SsdFile(
int32_t maxRegions,
int64_t checkpointIntervalBytes,
bool disableFileCow,
bool checksumEnabled,
bool checksumReadVerificationEnabled,
folly::Executor* executor)
: fileName_(filename),
maxRegions_(maxRegions),
disableFileCow_(disableFileCow),
checksumEnabled_(checksumEnabled),
checksumReadVerificationEnabled_(
checksumEnabled_ && checksumReadVerificationEnabled),
shardId_(shardId),
checkpointIntervalBytes_(checkpointIntervalBytes),
executor_(executor) {
Expand Down Expand Up @@ -222,7 +228,7 @@ CoalesceIoStats SsdFile::load(
if (pins.empty()) {
return CoalesceIoStats();
}
int payloadTotal = 0;
size_t totalPayloadBytes = 0;
for (auto i = 0; i < pins.size(); ++i) {
const auto runSize = ssdPins[i].run().size();
auto* entry = pins[i].checkedEntry();
Expand All @@ -233,7 +239,7 @@ CoalesceIoStats SsdFile::load(
succinctBytes(runSize),
succinctBytes(entry->size()));
}
payloadTotal += entry->size();
totalPayloadBytes += entry->size();
regionRead(regionIndex(ssdPins[i].run().offset()), runSize);
++stats_.entriesRead;
stats_.bytesRead += entry->size();
Expand All @@ -244,7 +250,7 @@ CoalesceIoStats SsdFile::load(
// gap. For longer payloads this is ~50-100K.
auto stats = readPins(
pins,
payloadTotal / pins.size() < 10000 ? 25000 : 50000,
totalPayloadBytes / pins.size() < 10000 ? 25000 : 50000,
// Max ranges in one preadv call. Longest gap + longest cache entry are
// under 12 ranges. If a system has a limit of 1K ranges, coalesce limit
// of 1000 is safe.
Expand All @@ -260,6 +266,9 @@ CoalesceIoStats SsdFile::load(

for (auto i = 0; i < ssdPins.size(); ++i) {
pins[i].checkedEntry()->setSsdFile(this, ssdPins[i].run().offset());
auto* entry = pins[i].checkedEntry();
auto ssdRun = ssdPins[i].run();
maybeVerifyChecksum(*entry, ssdRun);
}
return stats;
}
Expand Down Expand Up @@ -420,9 +429,13 @@ void SsdFile::write(std::vector<CachePin>& pins) {
const auto size = entry->size();
FileCacheKey key = {
entry->key().fileNum, static_cast<uint64_t>(entry->offset())};
entries_[std::move(key)] = SsdRun(offset, size);
uint32_t checksum = 0;
if (checksumEnabled_) {
checksum = checksumEntry(*entry);
}
entries_[std::move(key)] = SsdRun(offset, size, checksum);
if (FLAGS_ssd_verify_write) {
verifyWrite(*entry, SsdRun(offset, size));
verifyWrite(*entry, SsdRun(offset, size, checksum));
}
offset += size;
++stats_.entriesWritten;
Expand Down Expand Up @@ -709,15 +722,15 @@ void SsdFile::checkpoint(bool force) {
state.exceptions(std::ofstream::failbit);
state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc);
// The checkpoint state file contains:
// int32_t The 4 bytes of kCheckpointMagic,
// int32_t The 4 bytes of checkpoint version,
// int32_t maxRegions,
// int32_t numRegions,
// regionScores from the 'tracker_',
// {fileId, fileName} pairs,
// kMapMarker,
// {fileId, offset, SSdRun} triples,
// kEndMarker.
state.write(kCheckpointMagic, sizeof(int32_t));
state.write(checkpointVersion().data(), sizeof(int32_t));
state.write(asChar(&maxRegions_), sizeof(maxRegions_));
state.write(asChar(&numRegions_), sizeof(numRegions_));

Expand All @@ -739,11 +752,15 @@ void SsdFile::checkpoint(bool force) {
const auto mapMarker = kCheckpointMapMarker;
state.write(asChar(&mapMarker), sizeof(mapMarker));
for (auto& pair : entries_) {
auto id = pair.first.fileNum.id();
const auto id = pair.first.fileNum.id();
state.write(asChar(&id), sizeof(id));
state.write(asChar(&pair.first.offset), sizeof(pair.first.offset));
auto offsetAndSize = pair.second.bits();
const auto offsetAndSize = pair.second.fileBits();
state.write(asChar(&offsetAndSize), sizeof(offsetAndSize));
if (checksumEnabled_) {
const auto checksum = pair.second.checksum();
state.write(asChar(&checksum), sizeof(checksum));
}
}
} catch (const std::exception& e) {
fileSync->close();
Expand Down Expand Up @@ -804,8 +821,11 @@ void SsdFile::initializeCheckpoint() {
if (!state.is_open()) {
hasCheckpoint = false;
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(INFO)
<< "Starting shard " << shardId_ << " without checkpoint";
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint, with checksum write {}, read verification {}.",
shardId_,
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled");
}
const auto logPath = getEvictLogFilePath();
evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
Expand Down Expand Up @@ -839,6 +859,41 @@ void SsdFile::initializeCheckpoint() {
}
}

uint32_t SsdFile::checksumEntry(const AsyncDataCacheEntry& entry) const {
bits::Crc32 crc;
if (entry.tinyData()) {
crc.process_bytes(entry.tinyData(), entry.size());
} else {
int64_t bytesLeft = entry.size();
const auto& data = entry.data();
for (auto i = 0; i < data.numRuns() && bytesLeft > 0; ++i) {
const auto run = data.runAt(i);
const auto bytesToProcess = std::min<size_t>(bytesLeft, run.numBytes());
crc.process_bytes(run.data<char>(), bytesToProcess);
bytesLeft -= bytesToProcess;
}
VELOX_CHECK_EQ(bytesLeft, 0);
}
return crc.checksum();
}

void SsdFile::maybeVerifyChecksum(AsyncDataCacheEntry& entry, SsdRun& ssdRun) {
if (!checksumReadVerificationEnabled_) {
return;
}

// Verifies that the checksum matches after we read from SSD.
const auto checksum = checksumEntry(entry);
if (checksum != ssdRun.checksum()) {
++stats_.readSsdCorruptions;
VELOX_FAIL(
"IOERR: Corrupt SSD cache entry - File: {}, Offset: {}, Size: {}",
fileName_,
ssdRun.offset(),
ssdRun.size())
}
}

bool SsdFile::testingIsCowDisabled() const {
#ifdef linux
int attr{0};
Expand Down Expand Up @@ -866,9 +921,17 @@ T readNumber(std::ifstream& stream) {
} // namespace

void SsdFile::readCheckpoint(std::ifstream& state) {
char magic[4];
state.read(magic, sizeof(magic));
VELOX_CHECK_EQ(::strncmp(magic, kCheckpointMagic, 4), 0);
char versionMagic[4];
state.read(versionMagic, sizeof(versionMagic));
const auto checkpoinHasChecksum =
isChecksumEnabledOnCheckpointVersion(std::string(versionMagic, 4));
if (checksumEnabled_ && !checkpoinHasChecksum) {
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery.",
shardId_);
return;
}

const auto maxRegions = readNumber<int32_t>(state);
VELOX_CHECK_EQ(
maxRegions,
Expand Down Expand Up @@ -899,12 +962,17 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
evictedMap.insert(region);
}
for (;;) {
const uint64_t fileNum = readNumber<uint64_t>(state);
const auto fileNum = readNumber<uint64_t>(state);
if (fileNum == kCheckpointEndMarker) {
break;
}
const uint64_t offset = readNumber<uint64_t>(state);
const auto run = SsdRun(readNumber<uint64_t>(state));
const auto offset = readNumber<uint64_t>(state);
const auto fileBits = readNumber<uint64_t>(state);
uint32_t checksum = 0;
if (checkpoinHasChecksum) {
checksum = readNumber<uint32_t>(state);
}
const auto run = SsdRun(fileBits, checksum);
// Check that the recovered entry does not fall in an evicted region.
if (evictedMap.find(regionIndex(run.offset())) == evictedMap.end()) {
// The file may have a different id on restore.
Expand All @@ -925,11 +993,13 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
}
tracker_.setRegionScores(scores);
VELOX_SSD_CACHE_LOG(INFO) << fmt::format(
"Starting shard {} from checkpoint with {} entries, {} regions with {} free.",
"Starting shard {} from checkpoint with {} entries, {} regions with {} free, with checksum write {}, read verification {}.",
shardId_,
entries_.size(),
numRegions_,
writableRegions_.size());
writableRegions_.size(),
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled");
}

} // namespace facebook::velox::cache
Loading

0 comments on commit c6e2e90

Please sign in to comment.