diff --git a/include/fqc/algo/global_analyzer.h b/include/fqc/algo/global_analyzer.h index ca88167..9863c43 100644 --- a/include/fqc/algo/global_analyzer.h +++ b/include/fqc/algo/global_analyzer.h @@ -73,7 +73,7 @@ struct Minimizer { std::uint64_t hash; /// @brief Position in the read where minimizer starts - std::uint16_t position; + std::uint32_t position; /// @brief Whether the minimizer is from reverse complement bool isReverseComplement; @@ -82,7 +82,7 @@ struct Minimizer { constexpr Minimizer() noexcept : hash(0), position(0), isReverseComplement(false) {} /// @brief Construct with values - constexpr Minimizer(std::uint64_t h, std::uint16_t pos, bool rc) noexcept + constexpr Minimizer(std::uint64_t h, std::uint32_t pos, bool rc) noexcept : hash(h), position(pos), isReverseComplement(rc) {} /// @brief Equality comparison diff --git a/include/fqc/algo/pe_optimizer.h b/include/fqc/algo/pe_optimizer.h index 9151915..c0168f0 100644 --- a/include/fqc/algo/pe_optimizer.h +++ b/include/fqc/algo/pe_optimizer.h @@ -56,7 +56,7 @@ struct PEEncodedPair { bool useComplementarity = false; /// @brief If complementarity: positions where R2 differs from R1-RC. - std::vector diffPositions; + std::vector diffPositions; /// @brief If complementarity: bases at diff positions. std::vector diffBases; @@ -188,7 +188,7 @@ class PEOptimizer { [[nodiscard]] static std::string reverseComplement(std::string_view seq); /// @brief Compute diff between two sequences. - [[nodiscard]] static std::pair, std::vector> computeDiff( + [[nodiscard]] static std::pair, std::vector> computeDiff( std::string_view seq1, std::string_view seq2); }; diff --git a/src/algo/global_analyzer.cpp b/src/algo/global_analyzer.cpp index da57312..9c8eddd 100644 --- a/src/algo/global_analyzer.cpp +++ b/src/algo/global_analyzer.cpp @@ -75,7 +75,7 @@ constexpr char kComplement[256] = { struct BucketEntry { std::uint64_t readId; ///< Original read ID std::uint64_t minimizer; ///< Minimizer hash - std::uint16_t position; ///< Position in read + std::uint32_t position; ///< Position in read bool isRC; ///< Is reverse complement }; @@ -338,7 +338,7 @@ std::vector extractMinimizers(std::string_view sequence, std::size_t } bool isRC = (minHash != fwdHash); - minimizers.emplace_back(minHash, static_cast(minPos), isRC); + minimizers.emplace_back(minHash, static_cast(minPos), isRC); prevMinPos = minPos; } } diff --git a/src/algo/pe_optimizer.cpp b/src/algo/pe_optimizer.cpp index c28431f..8f30c4c 100644 --- a/src/algo/pe_optimizer.cpp +++ b/src/algo/pe_optimizer.cpp @@ -98,22 +98,24 @@ std::string PEOptimizer::reverseComplement(std::string_view seq) { return result; } -std::pair, std::vector> PEOptimizer::computeDiff( +std::pair, std::vector> PEOptimizer::computeDiff( std::string_view seq1, std::string_view seq2) { - std::vector positions; + std::vector positions; std::vector bases; + positions.reserve(seq2.length()); + bases.reserve(seq2.length()); std::size_t len = std::min(seq1.length(), seq2.length()); for (std::size_t i = 0; i < len; ++i) { if (seq1[i] != seq2[i]) { - positions.push_back(static_cast(i)); + positions.push_back(static_cast(i)); bases.push_back(seq2[i]); } } // Handle length differences for (std::size_t i = len; i < seq2.length(); ++i) { - positions.push_back(static_cast(i)); + positions.push_back(static_cast(i)); bases.push_back(seq2[i]); } @@ -182,7 +184,7 @@ PEEncodedPair PEOptimizer::encodePair(const io::PairedEndRecord& pair) const { // Compute quality deltas std::string r1QualRev(pair.read1.quality.rbegin(), pair.read1.quality.rend()); for (std::size_t i = 0; i < encoded.diffPositions.size(); ++i) { - std::uint16_t pos = encoded.diffPositions[i]; + std::uint32_t pos = encoded.diffPositions[i]; if (pos < r1QualRev.length() && pos < pair.read2.quality.length()) { std::int8_t delta = static_cast(pair.read2.quality[pos] - r1QualRev[pos]); @@ -194,7 +196,13 @@ PEEncodedPair PEOptimizer::encodePair(const io::PairedEndRecord& pair) const { // Update stats ++stats_.complementarityUsed; - stats_.bytesSaved += pair.read2.sequence.length() - encoded.diffPositions.size() * 3; + const std::size_t encodedBytes = + encoded.diffPositions.size() * (sizeof(std::uint32_t) + sizeof(char)) + + encoded.qualDelta.size() * sizeof(std::int8_t); + const std::size_t rawBytes = pair.read2.sequence.size() + pair.read2.quality.size(); + if (rawBytes > encodedBytes) { + stats_.bytesSaved += rawBytes - encodedBytes; + } } else { encoded.useComplementarity = false; encoded.seq2 = pair.read2.sequence; @@ -307,7 +315,7 @@ std::string generateR2Id(std::string_view r1Id) { // Check for existing /1 suffix if (id.length() >= 2 && id[id.length() - 1] == '1' && - (id[id.length() - 2] == '/' || id[id.length() - 2] == '.')) { + (id[id.length() - 2] == '/' || id[id.length() - 2] == '.' || id[id.length() - 2] == '_')) { id[id.length() - 1] = '2'; return id; } diff --git a/src/algo/quality_compressor.cpp b/src/algo/quality_compressor.cpp index 47e9250..f1c1163 100644 --- a/src/algo/quality_compressor.cpp +++ b/src/algo/quality_compressor.cpp @@ -580,9 +580,13 @@ Result> QualityCompressorImpl::compressSCM( Result> QualityCompressorImpl::decompressSCM( std::span data, std::span lengths) { - if (data.empty() || lengths.empty()) { + if (lengths.empty()) { return std::vector(lengths.size()); } + if (data.empty()) { + return makeError>(ErrorCode::kFormatError, + "Compressed quality payload is empty"); + } // Decompress Zstd layer first std::size_t decompressedSize = ZSTD_getFrameContentSize(data.data(), data.size()); diff --git a/src/commands/compression_engine.cpp b/src/commands/compression_engine.cpp index 5ad5f95..6712822 100644 --- a/src/commands/compression_engine.cpp +++ b/src/commands/compression_engine.cpp @@ -174,8 +174,6 @@ auto executeSingleThreadCompression(const CompressionPlan& plan, compressorConfig.readLengthClass = analysisResult.lengthClass; algo::BlockCompressor blockCompressor(compressorConfig); - std::uint64_t totalCompressedBytes = 0; - for (const auto& blockBoundary : analysisResult.blockBoundaries) { std::vector blockReads; const auto startId = blockBoundary.archiveIdStart; @@ -227,8 +225,6 @@ auto executeSingleThreadCompression(const CompressionPlan& plan, payload.auxData = compressedBlock.auxStream; fqcWriter.writeBlock(blockHeader, payload); - - totalCompressedBytes += compressedBlock.totalCompressedSize(); stats.blocksWritten++; } @@ -246,7 +242,7 @@ auto executeSingleThreadCompression(const CompressionPlan& plan, } fqcWriter.finalize(); - stats.outputBytes = totalCompressedBytes; + stats.outputBytes = std::filesystem::file_size(options.outputPath); format::unregisterWriterForCleanup(&fqcWriter); } diff --git a/src/format/fqc_reader.cpp b/src/format/fqc_reader.cpp index 858efcc..8cc1801 100644 --- a/src/format/fqc_reader.cpp +++ b/src/format/fqc_reader.cpp @@ -172,7 +172,7 @@ BlockData FQCReader::readBlock(BlockId blockId, StreamSelection selection) { result.header = readBlockHeader(blockId); // Calculate payload start position - auto payloadStart = blockIndex_[blockId].offset + BlockHeader::kSize; + auto payloadStart = blockIndex_[blockId].offset + result.header.headerSize; // Read selected streams if (hasStream(selection, StreamSelection::kIds) && result.header.sizeIds > 0) { diff --git a/src/io/fastq_parser.cpp b/src/io/fastq_parser.cpp index 2db183d..8c04a80 100644 --- a/src/io/fastq_parser.cpp +++ b/src/io/fastq_parser.cpp @@ -197,9 +197,12 @@ ParserStats FastqParser::sampleRecords(std::size_t maxSamples) { } // Save current state + const auto savedPosition = stream_ ? stream_->tellg() : std::istream::pos_type(-1); auto savedLineNumber = lineNumber_; auto savedRecordNumber = recordNumber_; auto savedStats = stats_; + auto savedLastError = lastError_; + auto savedEof = eof_; // Reset for sampling reset(); @@ -218,10 +221,15 @@ ParserStats FastqParser::sampleRecords(std::size_t maxSamples) { } // Restore state - reset(); + stream_->clear(); + if (savedPosition != std::istream::pos_type(-1)) { + stream_->seekg(savedPosition); + } + eof_ = savedEof; lineNumber_ = savedLineNumber; recordNumber_ = savedRecordNumber; stats_ = savedStats; + lastError_ = std::move(savedLastError); return sampleStats; } @@ -388,16 +396,16 @@ bool FastqParser::validateQuality(std::string_view qual) const { void FastqParser::setError(std::string message, std::string_view lineContent) { lastError_ = ParseError{ .lineNumber = lineNumber_, - .recordNumber = recordNumber_, + .recordNumber = recordNumber_ + 1, .message = std::move(message), .lineContent = std::string(lineContent.substr(0, 100)) // Truncate long lines }; } void FastqParser::trimRight(std::string& str) { - auto it = - std::find_if(str.rbegin(), str.rend(), [](unsigned char c) { return !std::isspace(c); }); - str.erase(it.base(), str.end()); + while (!str.empty() && str.back() == '\r') { + str.pop_back(); + } } // ============================================================================= diff --git a/src/pipeline/pipeline_node.cpp b/src/pipeline/pipeline_node.cpp index 8bd4ae3..208cca1 100644 --- a/src/pipeline/pipeline_node.cpp +++ b/src/pipeline/pipeline_node.cpp @@ -99,7 +99,11 @@ std::size_t BackpressureController::maxInFlight() const noexcept { } void BackpressureController::reset() noexcept { - inFlight_.store(0); + { + std::lock_guard lock(mutex_); + inFlight_.store(0); + } + cv_.notify_all(); } } // namespace fqc::pipeline diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dc00f4a..a8cc591 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -128,6 +128,11 @@ fqc_add_test(chunk_marshal_test SOURCES pipeline/chunk_marshal_test.cpp ) +# Backpressure controller regression tests +fqc_add_test(backpressure_controller_test + SOURCES pipeline/backpressure_controller_test.cpp +) + # Pipeline split-header compile test fqc_add_test(pipeline_node_headers_test SOURCES @@ -145,6 +150,11 @@ fqc_add_test(block_compressor_regression_test SOURCES algo/block_compressor_regression_test.cpp ) +# Algorithm regression tests +fqc_add_test(algo_regression_test + SOURCES algo/algo_regression_test.cpp +) + # Original-order command tests fqc_add_test(original_order_command_test SOURCES commands/original_order_command_test.cpp diff --git a/tests/algo/algo_regression_test.cpp b/tests/algo/algo_regression_test.cpp new file mode 100644 index 0000000..9bf5eee --- /dev/null +++ b/tests/algo/algo_regression_test.cpp @@ -0,0 +1,65 @@ +// ============================================================================= +// fq-compressor - Algorithm Regression Tests +// ============================================================================= + +#include "fqc/algo/global_analyzer.h" +#include "fqc/algo/pe_optimizer.h" +#include "fqc/algo/quality_compressor.h" + +#include +#include +#include + +#include + +namespace fqc::algo::test { + +TEST(AlgoRegressionTest, LosslessQualityDecompressionRejectsEmptyPayloadForNonEmptyLengths) { + QualityCompressorConfig config; + config.qualityMode = QualityMode::kLossless; + + QualityCompressor compressor(config); + const std::vector lengths = {4}; + + const auto result = compressor.decompress({}, lengths); + + EXPECT_FALSE(result.has_value()); +} + +TEST(AlgoRegressionTest, GenerateR2IdPreservesUnderscoreSuffixConvention) { + const std::string r1Id = "instrument:run:flowcell_1"; + const auto r2Id = generateR2Id(r1Id); + + EXPECT_EQ(r2Id, "instrument:run:flowcell_2"); + EXPECT_TRUE(io::arePairedIds(r1Id, r2Id)); +} + +TEST(AlgoRegressionTest, ExtractMinimizersPreservesPositionsBeyondUint16Range) { + std::string sequence(70000, 'A'); + + const auto minimizers = extractMinimizers(sequence, 1, 1); + + ASSERT_FALSE(minimizers.empty()); + EXPECT_EQ(minimizers.back().position, 69999u); +} + +TEST(AlgoRegressionTest, EncodeDecodePairHandlesDiffPositionsBeyondUint16Range) { + io::PairedEndRecord pair; + pair.read1.id = "long-read/1"; + pair.read1.sequence.assign(70000, 'A'); + pair.read1.quality.assign(70000, 'I'); + + pair.read2.id = "long-read/2"; + pair.read2.sequence.assign(70000, 'T'); + pair.read2.sequence[66000] = 'G'; + pair.read2.quality.assign(70000, 'I'); + + PEOptimizer optimizer; + const auto encoded = optimizer.encodePair(pair); + const auto decoded = optimizer.decodePair(encoded); + + EXPECT_EQ(decoded.read2.sequence, pair.read2.sequence); + EXPECT_EQ(decoded.read2.quality, pair.read2.quality); +} + +} // namespace fqc::algo::test diff --git a/tests/algo/pe_property_test.cpp b/tests/algo/pe_property_test.cpp index 71a38b5..e47f863 100644 --- a/tests/algo/pe_property_test.cpp +++ b/tests/algo/pe_property_test.cpp @@ -384,6 +384,14 @@ TEST(PairedEndTest, EmptyPairHandling) { EXPECT_FALSE(pair.isValid()); // Empty sequences are invalid } +TEST(PairedEndTest, GenerateR2IdPreservesUnderscoreConvention) { + const std::string r1Id = "instrument:run:flowcell_1"; + const std::string r2Id = generateR2Id(r1Id); + + EXPECT_EQ(r2Id, "instrument:run:flowcell_2"); + EXPECT_TRUE(io::arePairedIds(r1Id, r2Id)); +} + /// @brief Test single base pair. TEST(PairedEndTest, SingleBasePair) { io::PairedEndRecord pair; diff --git a/tests/commands/compression_engine_test.cpp b/tests/commands/compression_engine_test.cpp index a47bf2f..b1c9906 100644 --- a/tests/commands/compression_engine_test.cpp +++ b/tests/commands/compression_engine_test.cpp @@ -87,6 +87,7 @@ TEST(CompressionEngineTest, ExecutesSingleThreadArchivePlanAndReportsStats) { EXPECT_EQ(statsResult->totalBases, 8u); EXPECT_GT(statsResult->inputBytes, 0u); EXPECT_GT(statsResult->outputBytes, 0u); + EXPECT_EQ(statsResult->outputBytes, std::filesystem::file_size(outputPath)); EXPECT_EQ(statsResult->blocksWritten, 1u); EXPECT_TRUE(std::filesystem::exists(outputPath)); diff --git a/tests/format/fqc_writer_test.cpp b/tests/format/fqc_writer_test.cpp index 2b43594..379a97b 100644 --- a/tests/format/fqc_writer_test.cpp +++ b/tests/format/fqc_writer_test.cpp @@ -17,7 +17,9 @@ #include #include +#include #include +#include #include #include #include @@ -63,6 +65,33 @@ struct TempFile { } }; +static std::vector readFileBytes(const std::filesystem::path& path) { + std::ifstream file(path, std::ios::binary); + EXPECT_TRUE(file.is_open()); + return std::vector(std::istreambuf_iterator(file), + std::istreambuf_iterator()); +} + +static void writeFileBytes(const std::filesystem::path& path, std::span bytes) { + std::ofstream file(path, std::ios::binary | std::ios::trunc); + ASSERT_TRUE(file.is_open()); + file.write(reinterpret_cast(bytes.data()), + static_cast(bytes.size())); + ASSERT_TRUE(file.good()); +} + +static void writeLe32(std::vector& bytes, std::size_t offset, std::uint32_t value) { + for (std::size_t i = 0; i < sizeof(value); ++i) { + bytes[offset + i] = static_cast((value >> (8 * i)) & 0xFFu); + } +} + +static void writeLe64(std::vector& bytes, std::size_t offset, std::uint64_t value) { + for (std::size_t i = 0; i < sizeof(value); ++i) { + bytes[offset + i] = static_cast((value >> (8 * i)) & 0xFFu); + } +} + static GlobalHeader makeMinimalGlobalHeader(bool hasReorderMap = false) { GlobalHeader h; h.compressionAlgo = static_cast(CodecFamily::kAbcV1); @@ -269,4 +298,52 @@ TEST(FQCWriterTest, ReaderRejectsTruncatedBlockIndexRegion) { EXPECT_THROW(reader.open(), FormatError); } +TEST(FQCWriterTest, ReaderUsesDeclaredExtendedBlockHeaderSizeWhenReadingPayload) { + TempFile tmp; + + { + FQCWriter writer(tmp.path); + auto gh = makeMinimalGlobalHeader(false); + writer.writeGlobalHeader(gh, "test.fastq", 0); + + BlockPayload payload; + payload.seqData = {0xAA, 0xBB, 0xCC}; + writer.writeBlock(makeMinimalBlockHeader(), payload); + writer.finalize(); + } + + FQCReader baseline(tmp.path); + ASSERT_NO_THROW(baseline.open()); + ASSERT_EQ(baseline.blockCount(), 1u); + + const auto blockOffset = baseline.blockIndex().front().offset; + const auto originalIndexOffset = baseline.footer().indexOffset; + auto bytes = readFileBytes(tmp.path); + + constexpr std::uint32_t kExtendedHeaderSize = + static_cast(BlockHeader::kSize + 8); + const std::array extensionBytes = { + 0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE}; + + bytes.insert(bytes.begin() + static_cast(blockOffset + BlockHeader::kSize), + extensionBytes.begin(), + extensionBytes.end()); + + writeLe32(bytes, static_cast(blockOffset), kExtendedHeaderSize); + writeLe64(bytes, + static_cast(originalIndexOffset + extensionBytes.size() + + BlockIndex::kHeaderSize + 8), + static_cast(BlockHeader::kSize + extensionBytes.size() + 3)); + writeLe64(bytes, bytes.size() - FileFooter::kSize, originalIndexOffset + extensionBytes.size()); + + writeFileBytes(tmp.path, bytes); + + FQCReader reader(tmp.path); + ASSERT_NO_THROW(reader.open()); + + auto block = reader.readBlock(0, StreamSelection::kSequence); + EXPECT_EQ(block.header.headerSize, kExtendedHeaderSize); + EXPECT_EQ(block.seqData, (std::vector{0xAA, 0xBB, 0xCC})); +} + } // namespace fqc::format::test diff --git a/tests/io/fastq_parser_test.cpp b/tests/io/fastq_parser_test.cpp index 8f62185..025f85a 100644 --- a/tests/io/fastq_parser_test.cpp +++ b/tests/io/fastq_parser_test.cpp @@ -5,6 +5,7 @@ #include "fqc/io/fastq_parser.h" #include "fqc/common/types.h" +#include "fqc/io/stream_factory.h" #include @@ -16,4 +17,48 @@ TEST(FastqParserTest, FastqRecordUsesSharedReadRecordType) { EXPECT_TRUE((std::is_same_v)); } +TEST(FastqParserTest, LastErrorUsesOneBasedRecordNumberForFailingRecord) { + auto factory = std::make_shared(); + factory->setFileContent("broken.fastq", "@read1\nACGT\n-\nIIII\n"); + + FastqParser parser("broken.fastq", factory); + parser.open(); + + EXPECT_THROW(parser.readRecord(), FormatError); + ASSERT_TRUE(parser.lastError().has_value()); + EXPECT_EQ(parser.lastError()->lineNumber, 3u); + EXPECT_EQ(parser.lastError()->recordNumber, 1u); +} + +TEST(FastqParserTest, SampleRecordsPreservesCurrentReadPosition) { + auto factory = std::make_shared(); + factory->setFileContent("reads.fastq", + "@read1\nACGT\n+\nFFFF\n" + "@read2\nTGCA\n+\nHHHH\n"); + + FastqParser parser("reads.fastq", factory); + parser.open(); + + auto first = parser.readRecord(); + ASSERT_TRUE(first.has_value()); + EXPECT_EQ(first->id, "read1"); + + const auto sample = parser.sampleRecords(1); + EXPECT_EQ(sample.totalRecords, 1u); + + auto next = parser.readRecord(); + ASSERT_TRUE(next.has_value()); + EXPECT_EQ(next->id, "read2"); +} + +TEST(FastqParserTest, TrailingSpacesInSequenceAndQualityRemainFormatErrors) { + auto factory = std::make_shared(); + factory->setFileContent("broken.fastq", "@read1\nACGT \n+\nFFFF \n"); + + FastqParser parser("broken.fastq", factory); + parser.open(); + + EXPECT_THROW(parser.readRecord(), FormatError); +} + } // namespace fqc::io::test diff --git a/tests/pipeline/backpressure_controller_test.cpp b/tests/pipeline/backpressure_controller_test.cpp new file mode 100644 index 0000000..6f0c424 --- /dev/null +++ b/tests/pipeline/backpressure_controller_test.cpp @@ -0,0 +1,41 @@ +// ============================================================================= +// fq-compressor - Backpressure Controller Regression Tests +// ============================================================================= + +#include "fqc/pipeline/node_common.h" + +#include +#include +#include + +#include + +namespace fqc::pipeline::test { + +TEST(BackpressureControllerTest, ResetWakesBlockedAcquireCallers) { + BackpressureController controller(1); + controller.acquire(); + + std::promise acquired; + auto acquiredFuture = acquired.get_future(); + + std::thread waiter([&controller, &acquired] { + controller.acquire(); + acquired.set_value(); + }); + + ASSERT_EQ(acquiredFuture.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout); + + controller.reset(); + + const auto status = acquiredFuture.wait_for(std::chrono::milliseconds(200)); + if (status != std::future_status::ready) { + ASSERT_TRUE(controller.tryAcquire()); + controller.release(); + } + + EXPECT_EQ(status, std::future_status::ready); + waiter.join(); +} + +} // namespace fqc::pipeline::test