diff --git a/Utilities/JIT.cpp b/Utilities/JIT.cpp index b183f16f7d6e..e994546e361e 100644 --- a/Utilities/JIT.cpp +++ b/Utilities/JIT.cpp @@ -10,6 +10,7 @@ #include "util/v128.hpp" #include "util/simd.hpp" #include "Crypto/unzip.h" + #include #ifdef __linux__ @@ -1170,17 +1171,19 @@ class ObjectCache final : public llvm::ObjectCache //fs::file(name, fs::rewrite).write(obj.getBufferStart(), obj.getBufferSize()); name.append(".gz"); - const std::vector zbuf = zip(reinterpret_cast(obj.getBufferStart()), obj.getBufferSize()); + fs::file module_file(name, fs::rewrite); - if (zbuf.empty()) + if (!module_file) { - jit_log.error("LLVM: Failed to compress module: %s", _module->getName().data()); + jit_log.error("LLVM: Failed to create module file: %s (%s)", name, fs::g_tls_error); return; } - if (!fs::write_file(name, fs::rewrite, zbuf.data(), zbuf.size())) + if (!zip(obj.getBufferStart(), obj.getBufferSize(), module_file)) { - jit_log.error("LLVM: Failed to create module file: %s (%s)", name, fs::g_tls_error); + jit_log.error("LLVM: Failed to compress module: %s", _module->getName().data()); + module_file.close(); + fs::remove_file(name); return; } diff --git a/rpcs3/Crypto/unzip.cpp b/rpcs3/Crypto/unzip.cpp index 04687942c202..a2a0d94c5132 100644 --- a/rpcs3/Crypto/unzip.cpp +++ b/rpcs3/Crypto/unzip.cpp @@ -3,6 +3,8 @@ #include +#include "util/serialization_ext.hpp" + std::vector unzip(const void* src, usz size) { if (!src || !size) [[unlikely]] @@ -130,65 +132,35 @@ bool unzip(const void* src, usz size, fs::file& out) return is_valid; } -std::vector zip(const void* src, usz size) +bool zip(const void* src, usz size, fs::file& out, bool multi_thread_it) { - if (!src || !size) + if (!src || !size || !out) { - return {}; + return false; } - const uLong zsz = compressBound(::narrow(size)) + 256; - std::vector out(zsz); - - z_stream zs{}; -#ifndef _MSC_VER -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wold-style-cast" -#endif - int res = deflateInit2(&zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY); - if (res != Z_OK) - { - return {}; - } -#ifndef _MSC_VER -#pragma GCC diagnostic pop -#endif - zs.avail_in = static_cast(size); - zs.next_in = reinterpret_cast(src); - zs.avail_out = static_cast(out.size()); - zs.next_out = out.data(); + utils::serial compressor(!multi_thread_it || size < 0x40'0000); + compressor.m_file_handler = make_compressed_serialization_file_handler(out); - res = deflate(&zs, Z_FINISH); + std::string_view buffer_view{static_cast(src), size}; - switch (res) + while (!buffer_view.empty()) { - case Z_OK: - case Z_STREAM_END: - if (zs.avail_out) + if (!compressor.m_file_handler->is_valid()) { - out.resize(zsz - zs.avail_out); + return false; } - break; - default: - out.clear(); - break; - } - deflateEnd(&zs); + const std::string_view slice = buffer_view.substr(0, 0x50'0000); - return out; -} - -bool zip(const void* src, usz size, fs::file& out) -{ - if (!src || !size || !out) - { - return false; + compressor(slice); + compressor.breathe(); + buffer_view = buffer_view.substr(slice.size()); } - const std::vector zipped = zip(src, size); + compressor.m_file_handler->finalize(compressor); - if (zipped.empty() || out.write(zipped.data(), zipped.size()) != zipped.size()) + if (!compressor.m_file_handler->is_valid()) { return false; } diff --git a/rpcs3/Crypto/unzip.h b/rpcs3/Crypto/unzip.h index 80025ce65c9d..58de49ca0aee 100644 --- a/rpcs3/Crypto/unzip.h +++ b/rpcs3/Crypto/unzip.h @@ -16,15 +16,7 @@ inline bool unzip(const std::vector& src, fs::file& out) return unzip(src.data(), src.size(), out); } -std::vector zip(const void* src, usz size); - -template -inline std::vector zip(const T& src) -{ - return zip(src.data(), src.size()); -} - -bool zip(const void* src, usz size, fs::file& out); +bool zip(const void* src, usz size, fs::file& out, bool multi_thread_it = false); template inline bool zip(const T& src, fs::file& out) diff --git a/rpcs3/Emu/CMakeLists.txt b/rpcs3/Emu/CMakeLists.txt index e29c119c885e..a008e85a20f1 100644 --- a/rpcs3/Emu/CMakeLists.txt +++ b/rpcs3/Emu/CMakeLists.txt @@ -50,6 +50,7 @@ target_sources(rpcs3_emu PRIVATE ../util/dyn_lib.cpp ../util/sysinfo.cpp ../util/cpu_stats.cpp + ../util/serialization_ext.cpp ../../Utilities/bin_patch.cpp ../../Utilities/cheat_info.cpp ../../Utilities/cond.cpp diff --git a/rpcs3/Emu/Cell/PPUThread.cpp b/rpcs3/Emu/Cell/PPUThread.cpp index 67085ea10342..259c5fa11d38 100644 --- a/rpcs3/Emu/Cell/PPUThread.cpp +++ b/rpcs3/Emu/Cell/PPUThread.cpp @@ -3541,7 +3541,10 @@ namespace fs::stat_t get_stat() override { - return m_file.get_stat(); + fs::stat_t stat = m_file.get_stat(); + stat.size = std::min(utils::sub_saturate(stat.size, m_off), m_max_size); + stat.is_writable = false; + return stat; } bool trunc(u64) override @@ -3558,7 +3561,7 @@ namespace u64 read_at(u64 offset, void* buffer, u64 size) override { - return m_file.read_at(offset + m_off, buffer, std::min(size, utils::sub_saturate(m_max_size, m_pos))); + return m_file.read_at(offset + m_off, buffer, std::min(size, utils::sub_saturate(m_max_size, offset))); } u64 write(const void*, u64) override @@ -3585,7 +3588,7 @@ namespace u64 size() override { - return m_file.size(); + return std::min(utils::sub_saturate(m_file.size(), m_off), m_max_size); } }; } diff --git a/rpcs3/Emu/Memory/vm.cpp b/rpcs3/Emu/Memory/vm.cpp index 69e9d0b871a5..f76a68ae18cb 100644 --- a/rpcs3/Emu/Memory/vm.cpp +++ b/rpcs3/Emu/Memory/vm.cpp @@ -1617,14 +1617,14 @@ namespace vm const v128 _5 = _1 | _2; const v128 _6 = _3 | _4; const v128 _7 = _5 | _6; - return _7 == v128{}; + return gv_testz(_7); } static void serialize_memory_bytes(utils::serial& ar, u8* ptr, usz size) { ensure((size % 4096) == 0); - for (; size; ptr += 128 * 8) + for (usz iter_count = 0; size; iter_count++, ptr += 128 * 8) { const usz process_size = std::min(size, 128 * 8); size -= process_size; @@ -1633,12 +1633,20 @@ namespace vm if (ar.is_writing()) { - for (usz i = 0; i < process_size; i += 128) + for (usz i = 0; i < process_size; i += 128 * 2) { - if (!check_cache_line_zero(ptr + i)) + const u64 sample64_1 = read_from_ptr(ptr, i); + const u64 sample64_2 = read_from_ptr(ptr, i + 128); + + // Speed up testing in scenarios where it is likely non-zero data + if (sample64_1 && sample64_2) { - bitmap |= 1u << (i / 128); + bitmap |= 3u << (i / 128); + continue; } + + bitmap |= (check_cache_line_zero(ptr + i + 0) ? 0 : 1) << (i / 128); + bitmap |= (check_cache_line_zero(ptr + i + 128) ? 0 : 2) << (i / 128); } } @@ -1665,8 +1673,13 @@ namespace vm i += block_count * 128; } - ar.breathe(); + if (iter_count % 256 == 0) + { + ar.breathe(); + } } + + ar.breathe(); } void block_t::save(utils::serial& ar, std::map& shared) diff --git a/rpcs3/Emu/RSX/RSXThread.cpp b/rpcs3/Emu/RSX/RSXThread.cpp index 892ce75a3461..1febc6326084 100644 --- a/rpcs3/Emu/RSX/RSXThread.cpp +++ b/rpcs3/Emu/RSX/RSXThread.cpp @@ -4,7 +4,6 @@ #include "Emu/Cell/PPUCallback.h" #include "Emu/Cell/SPUThread.h" #include "Emu/Cell/timers.hpp" -#include "Emu/savestate_utils.hpp" #include "Capture/rsx_capture.h" #include "Common/BufferUtils.h" @@ -20,7 +19,7 @@ #include "Emu/Cell/lv2/sys_event.h" #include "Emu/Cell/lv2/sys_time.h" #include "Emu/Cell/Modules/cellGcmSys.h" -#include "Emu/savestate_utils.hpp" +#include "util/serialization_ext.hpp" #include "Overlays/overlay_perf_metrics.h" #include "Overlays/overlay_message.h" #include "Program/GLSLCommon.h" diff --git a/rpcs3/Emu/System.cpp b/rpcs3/Emu/System.cpp index 3130b6cefecf..1735ff7b960e 100644 --- a/rpcs3/Emu/System.cpp +++ b/rpcs3/Emu/System.cpp @@ -9,6 +9,7 @@ #include "Emu/perf_monitor.hpp" #include "Emu/vfs_config.h" #include "Emu/IPC_config.h" +#include "Emu/savestate_utils.hpp" #include "Emu/Cell/ErrorCodes.h" #include "Emu/Cell/PPUThread.h" @@ -40,8 +41,6 @@ #include "../Crypto/unself.h" #include "../Crypto/unzip.h" #include "util/logs.hpp" -#include "util/serialization.hpp" -#include "savestate_utils.hpp" #include #include @@ -2932,6 +2931,7 @@ void Emulator::Kill(bool allow_autoexit, bool savestate, savestate_stage* save_s // Save it first for maximum timing accuracy const u64 timestamp = get_timebased_time(); + const u64 start_time = get_system_time(); sys_log.notice("All threads have been stopped."); @@ -3128,7 +3128,9 @@ void Emulator::Kill(bool allow_autoexit, bool savestate, savestate_stage* save_s ar.seek_end(); ar.m_file_handler->finalize(ar); - if (!file.commit()) + fs::stat_t file_stat{}; + + if (!file.commit() || !fs::get_stat(path, file_stat)) { sys_log.error("Failed to write savestate to file! (path='%s', %s)", path, fs::g_tls_error); savestate = false; @@ -3152,7 +3154,7 @@ void Emulator::Kill(bool allow_autoexit, bool savestate, savestate_stage* save_s sys_log.success("Old savestate has been removed: path='%s'", old_path2); } - sys_log.success("Saved savestate! path='%s'", path); + sys_log.success("Saved savestate! path='%s' (file_size=0x%x, time_to_save=%gs)", path, file_stat.size, (get_system_time() - start_time) / 1000000.); if (!g_cfg.savestate.suspend_emu) { diff --git a/rpcs3/Emu/savestate_utils.cpp b/rpcs3/Emu/savestate_utils.cpp index 0050dec87078..663f0421aa3b 100644 --- a/rpcs3/Emu/savestate_utils.cpp +++ b/rpcs3/Emu/savestate_utils.cpp @@ -12,25 +12,11 @@ #include "System.h" #include +#include #include -#include - LOG_CHANNEL(sys_log, "SYS"); -template <> -void fmt_class_string::format(std::string& out, u64 arg) -{ - const utils::serial& ar = get_object(arg); - - - be_t sample64 = 0; - const usz read_size = std::min(ar.data.size(), sizeof(sample64)); - std::memcpy(&sample64, ar.data.data() + ar.data.size() - read_size, read_size); - - fmt::append(out, "{ %s, 0x%x/0x%x, memory=0x%x, sample64=0x%016x }", ar.is_writing() ? "writing" : "reading", ar.pos, ar.data_offset + ar.data.size(), ar.data.size(), sample64); -} - struct serial_ver_t { bool used = false; @@ -109,7 +95,7 @@ std::vector get_savestate_versioning_data(fs::file&& file, std::s file.seek(0); utils::serial ar; - ar.set_reading_state(); + ar.set_reading_state({}, true); ar.m_file_handler = filepath.ends_with(".gz") ? static_cast>(make_compressed_serialization_file_handler(std::move(file))) : make_uncompressed_serialization_file_handler(std::move(file)); @@ -287,544 +273,6 @@ bool boot_last_savestate(bool testing) return false; } -bool uncompressed_serialization_file_handler::handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) -{ - if (ar.is_writing()) - { - if (data) - { - m_file->seek(pos); - m_file->write(data, size); - return true; - } - - m_file->seek(ar.data_offset); - m_file->write(ar.data); - - if (pos == umax && size == umax) - { - // Request to flush the file to disk - m_file->sync(); - } - - ar.seek_end(); - ar.data_offset = ar.pos; - ar.data.clear(); - return true; - } - - if (!size) - { - return true; - } - - if (pos == 0 && size == umax) - { - // Discard loaded data until pos if profitable - const usz limit = ar.data_offset + ar.data.size(); - - if (ar.pos > ar.data_offset && ar.pos < limit) - { - const usz may_discard_bytes = ar.pos - ar.data_offset; - const usz moved_byte_count_on_discard = limit - ar.pos; - - // Cheeck profitability (check recycled memory and std::memmove costs) - if (may_discard_bytes >= 0x50'0000 || (may_discard_bytes >= 0x20'0000 && moved_byte_count_on_discard / may_discard_bytes < 3)) - { - ar.data_offset += may_discard_bytes; - ar.data.erase(ar.data.begin(), ar.data.begin() + may_discard_bytes); - - if (ar.data.capacity() >= 0x200'0000) - { - // Discard memory - ar.data.shrink_to_fit(); - } - } - - return true; - } - - // Discard all loaded data - ar.data_offset = ar.pos; - ar.data.clear(); - - if (ar.data.capacity() >= 0x200'0000) - { - // Discard memory - ar.data.shrink_to_fit(); - } - - return true; - } - - if (~pos < size - 1) - { - // Overflow - return false; - } - - if (ar.data.empty() && pos != ar.pos) - { - // Relocate instead of over-fetch - ar.seek_pos(pos); - } - - const usz read_pre_buffer = ar.data.empty() ? 0 : utils::sub_saturate(ar.data_offset, pos); - - if (read_pre_buffer) - { - // Read past data - // Harsh operation on performance, luckily rare and not typically needed - // Also this may would be disallowed when moving to compressed files - // This may be a result of wrong usage of breathe() function - ar.data.resize(ar.data.size() + read_pre_buffer); - std::memmove(ar.data.data() + read_pre_buffer, ar.data.data(), ar.data.size() - read_pre_buffer); - ensure(m_file->read_at(pos, ar.data.data(), read_pre_buffer) == read_pre_buffer); - ar.data_offset -= read_pre_buffer; - } - - // Adjustment to prevent overflow - const usz subtrahend = ar.data.empty() ? 0 : 1; - const usz read_past_buffer = utils::sub_saturate(pos + (size - subtrahend), ar.data_offset + (ar.data.size() - subtrahend)); - const usz read_limit = utils::sub_saturate(ar.m_max_data, ar.data_offset); - - if (read_past_buffer) - { - // Read proceeding data - // More lightweight operation, this is the common operation - // Allowed to fail, if memory is truly needed an assert would take place later - const usz old_size = ar.data.size(); - - // Try to prefetch data by reading more than requested - ar.data.resize(std::min(read_limit, std::max({ ar.data.capacity(), ar.data.size() + read_past_buffer * 3 / 2, ar.m_avoid_large_prefetch ? usz{4096} : usz{0x10'0000} }))); - ar.data.resize(m_file->read_at(old_size + ar.data_offset, data ? const_cast(data) : ar.data.data() + old_size, ar.data.size() - old_size) + old_size); - } - - return true; -} - -usz uncompressed_serialization_file_handler::get_size(const utils::serial& ar, usz recommended) const -{ - if (ar.is_writing()) - { - return m_file->size(); - } - - const usz memory_available = ar.data_offset + ar.data.size(); - - if (memory_available >= recommended) - { - // Avoid calling size() if possible - return memory_available; - } - - return std::max(m_file->size(), memory_available); -} - -void uncompressed_serialization_file_handler::finalize(utils::serial& ar) -{ - ar.seek_end(); - handle_file_op(ar, 0, umax, nullptr); - ar.data = {}; // Deallocate and clear -} - -void compressed_serialization_file_handler::initialize(utils::serial& ar) -{ - if (!m_stream.has_value()) - { - m_stream.emplace(); - } - - z_stream& m_zs = std::any_cast(m_stream); - - if (ar.is_writing()) - { - if (m_write_inited) - { - return; - } - -#ifndef _MSC_VER -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wold-style-cast" -#endif - if (m_read_inited) - { - finalize(ar); - } - - m_zs = {}; - ensure(deflateInit2(&m_zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) == Z_OK); -#ifndef _MSC_VER -#pragma GCC diagnostic pop -#endif - m_write_inited = true; - m_errored = false; - } - else - { - if (m_read_inited) - { - return; - } - - if (m_write_inited) - { - finalize(ar); - } - - m_zs.avail_in = 0; - m_zs.avail_out = 0; - m_zs.next_in = nullptr; - m_zs.next_out = nullptr; - #ifndef _MSC_VER - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wold-style-cast" - #endif - ensure(inflateInit2(&m_zs, 16 + 15) == Z_OK); - m_read_inited = true; - m_errored = false; - } -} - -bool compressed_serialization_file_handler::handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) -{ - if (ar.is_writing()) - { - initialize(ar); - - if (m_errored) - { - return false; - } - - z_stream& m_zs = std::any_cast(m_stream); - - if (data) - { - ensure(false); - } - - // Writing not at the end is forbidden - ensure(ar.pos == ar.data_offset + ar.data.size()); - - m_zs.avail_in = static_cast(ar.data.size()); - m_zs.next_in = ar.data.data(); - - do - { - m_stream_data.resize(std::max(m_stream_data.size(), ::compressBound(m_zs.avail_in))); - m_zs.avail_out = static_cast(m_stream_data.size()); - m_zs.next_out = m_stream_data.data(); - - if (deflate(&m_zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_file->write(m_stream_data.data(), m_stream_data.size() - m_zs.avail_out) != m_stream_data.size() - m_zs.avail_out) - { - m_errored = true; - deflateEnd(&m_zs); - //m_file->close(); - break; - } - } - while (m_zs.avail_out == 0); - - ar.seek_end(); - ar.data_offset = ar.pos; - ar.data.clear(); - - if (pos == umax && size == umax && *m_file) - { - // Request to flush the file to disk - m_file->sync(); - } - - return true; - } - - initialize(ar); - - if (m_errored) - { - return false; - } - - if (!size) - { - return true; - } - - if (pos == 0 && size == umax) - { - // Discard loaded data until pos if profitable - const usz limit = ar.data_offset + ar.data.size(); - - if (ar.pos > ar.data_offset && ar.pos < limit) - { - const usz may_discard_bytes = ar.pos - ar.data_offset; - const usz moved_byte_count_on_discard = limit - ar.pos; - - // Cheeck profitability (check recycled memory and std::memmove costs) - if (may_discard_bytes >= 0x50'0000 || (may_discard_bytes >= 0x20'0000 && moved_byte_count_on_discard / may_discard_bytes < 3)) - { - ar.data_offset += may_discard_bytes; - ar.data.erase(ar.data.begin(), ar.data.begin() + may_discard_bytes); - - if (ar.data.capacity() >= 0x200'0000) - { - // Discard memory - ar.data.shrink_to_fit(); - } - } - - return true; - } - - // Discard all loaded data - ar.data_offset += ar.data.size(); - ensure(ar.pos >= ar.data_offset); - ar.data.clear(); - - if (ar.data.capacity() >= 0x200'0000) - { - // Discard memory - ar.data.shrink_to_fit(); - } - - return true; - } - - if (~pos < size - 1) - { - // Overflow - return false; - } - - // TODO: Investigate if this optimization is worth an implementation for compressed stream - // if (ar.data.empty() && pos != ar.pos) - // { - // // Relocate instead of over-fetch - // ar.seek_pos(pos); - // } - - const usz read_pre_buffer = utils::sub_saturate(ar.data_offset, pos); - - if (read_pre_buffer) - { - // Not allowed with compressed data for now - // Unless someone implements mechanism for it - ensure(false); - } - - // Adjustment to prevent overflow - const usz subtrahend = ar.data.empty() ? 0 : 1; - const usz read_past_buffer = utils::sub_saturate(pos + (size - subtrahend), ar.data_offset + (ar.data.size() - subtrahend)); - const usz read_limit = utils::sub_saturate(ar.m_max_data, ar.data_offset); - - if (read_past_buffer) - { - // Read proceeding data - // More lightweight operation, this is the common operation - // Allowed to fail, if memory is truly needed an assert would take place later - const usz old_size = ar.data.size(); - - // Try to prefetch data by reading more than requested - ar.data.resize(std::min(read_limit, std::max({ ar.data.capacity(), ar.data.size() + read_past_buffer * 3 / 2, ar.m_avoid_large_prefetch ? usz{4096} : usz{0x10'0000} }))); - ar.data.resize(this->read_at(ar, old_size + ar.data_offset, data ? const_cast(data) : ar.data.data() + old_size, ar.data.size() - old_size) + old_size); - } - - return true; -} - -usz compressed_serialization_file_handler::read_at(utils::serial& ar, usz read_pos, void* data, usz size) -{ - ensure(read_pos == ar.data.size() + ar.data_offset - size); - - if (!size || m_errored) - { - return 0; - } - - initialize(ar); - - z_stream& m_zs = std::any_cast(m_stream); - - const usz total_to_read = size; - usz read_size = 0; - u8* out_data = static_cast(data); - - auto adjust_for_uint = [](usz size) - { - return static_cast(std::min(uInt{umax}, size)); - }; - - for (; read_size < total_to_read;) - { - // Drain extracted memory stash (also before first file read) - out_data = static_cast(data) + read_size; - m_zs.avail_in = adjust_for_uint(m_stream_data.size() - m_stream_data_index); - m_zs.next_in = reinterpret_cast(m_stream_data.data() + m_stream_data_index); - m_zs.next_out = out_data; - m_zs.avail_out = adjust_for_uint(size - read_size); - - while (read_size < total_to_read && m_zs.avail_in) - { - const int res = inflate(&m_zs, Z_BLOCK); - - bool need_more_file_memory = false; - - switch (res) - { - case Z_OK: - case Z_STREAM_END: - break; - case Z_BUF_ERROR: - { - if (m_zs.avail_in) - { - need_more_file_memory = true; - break; - } - - [[fallthrough]]; - } - default: - m_errored = true; - inflateEnd(&m_zs); - m_read_inited = false; - sys_log.error("Failure of compressed data reading. (res=%d, read_size=0x%x, avail_in=0x%x, avail_out=0x%x, ar=%s)", res, read_size, m_zs.avail_in, m_zs.avail_out, ar); - return read_size; - } - - read_size = m_zs.next_out - static_cast(data); - m_stream_data_index = m_zs.avail_in ? m_zs.next_in - m_stream_data.data() : m_stream_data.size(); - - // Adjust again in case the values simply did not fit into uInt - m_zs.avail_out = adjust_for_uint(utils::sub_saturate(total_to_read, read_size)); - m_zs.avail_in = adjust_for_uint(m_stream_data.size() - m_stream_data_index); - - if (need_more_file_memory) - { - break; - } - } - - if (read_size >= total_to_read) - { - break; - } - - const usz add_size = ar.m_avoid_large_prefetch ? 0x1'0000 : 0x10'0000; - const usz old_file_buf_size = m_stream_data.size(); - - m_stream_data.resize(old_file_buf_size + add_size); - m_stream_data.resize(old_file_buf_size + m_file->read_at(m_file_read_index, m_stream_data.data() + old_file_buf_size, add_size)); - - if (m_stream_data.size() == old_file_buf_size) - { - // EOF - break; - } - - m_file_read_index += m_stream_data.size() - old_file_buf_size; - } - - if (m_stream_data.size() - m_stream_data_index <= m_stream_data_index / 5) - { - // Shrink to required memory size - m_stream_data.erase(m_stream_data.begin(), m_stream_data.begin() + m_stream_data_index); - - if (m_stream_data.capacity() >= 0x200'0000) - { - // Discard memory - m_stream_data.shrink_to_fit(); - } - - m_stream_data_index = 0; - } - - return read_size; -} - -void compressed_serialization_file_handler::skip_until(utils::serial& ar) -{ - ensure(!ar.is_writing() && ar.pos >= ar.data_offset); - - if (ar.pos > ar.data_offset) - { - handle_file_op(ar, ar.data_offset, ar.pos - ar.data_offset, nullptr); - } -} - -void compressed_serialization_file_handler::finalize(utils::serial& ar) -{ - handle_file_op(ar, 0, umax, nullptr); - - if (!m_stream.has_value()) - { - return; - } - - z_stream& m_zs = std::any_cast(m_stream); - - if (m_read_inited) - { - m_read_inited = false; - ensure(inflateEnd(&m_zs) == Z_OK); - return; - } - - m_write_inited = false; - - m_zs.avail_in = 0; - m_zs.next_in = nullptr; - - m_stream_data.resize(m_zs.avail_out); - - do - { - m_zs.avail_out = static_cast(m_stream_data.size()); - m_zs.next_out = m_stream_data.data(); - - if (deflate(&m_zs, Z_FINISH) == Z_STREAM_ERROR) - { - break; - } - - m_file->write(m_stream_data.data(), m_stream_data.size() - m_zs.avail_out); - } - while (m_zs.avail_out == 0); - - m_stream_data = {}; - ensure(deflateEnd(&m_zs) == Z_OK); - ar.data = {}; // Deallocate and clear -} - -usz compressed_serialization_file_handler::get_size(const utils::serial& ar, usz recommended) const -{ - if (ar.is_writing()) - { - return m_file->size(); - } - - const usz memory_available = ar.data_offset + ar.data.size(); - - if (memory_available >= recommended) - { - // Avoid calling size() if possible - return memory_available; - } - - return std::max(utils::mul_saturate(m_file->size(), 6), memory_available); -} - -bool null_serialization_file_handler::handle_file_op(utils::serial&, usz, usz, const void*) -{ - return true; -} - -void null_serialization_file_handler::finalize(utils::serial&) -{ -} - bool load_and_check_reserved(utils::serial& ar, usz size) { u8 bytes[4096]; diff --git a/rpcs3/Emu/savestate_utils.hpp b/rpcs3/Emu/savestate_utils.hpp index 6a4169e2ffcf..2715bfc024b2 100644 --- a/rpcs3/Emu/savestate_utils.hpp +++ b/rpcs3/Emu/savestate_utils.hpp @@ -1,13 +1,6 @@ #pragma once -#include "util/serialization.hpp" - -#include - -namespace fs -{ - class file; -} +#include "util/serialization_ext.hpp" struct version_entry { @@ -17,123 +10,9 @@ struct version_entry ENABLE_BITWISE_SERIALIZATION; }; -// Uncompressed file serialization handler -struct uncompressed_serialization_file_handler : utils::serialization_file_handler -{ - const std::unique_ptr m_file_storage; - const std::add_pointer_t m_file; - - explicit uncompressed_serialization_file_handler(fs::file&& file) noexcept - : utils::serialization_file_handler() - , m_file_storage(std::make_unique(std::move(file))) - , m_file(m_file_storage.get()) - { - } - - explicit uncompressed_serialization_file_handler(const fs::file& file) noexcept - : utils::serialization_file_handler() - , m_file_storage(nullptr) - , m_file(std::addressof(file)) - { - } - - uncompressed_serialization_file_handler(const uncompressed_serialization_file_handler&) = delete; - - // Handle file read and write requests - bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; - - // Get available memory or file size - // Preferably memory size if is already greater/equal to recommended to avoid additional file ops - usz get_size(const utils::serial& ar, usz recommended) const override; - - void finalize(utils::serial& ar) override; -}; - -template requires (std::is_same_v, fs::file>) -inline std::unique_ptr make_uncompressed_serialization_file_handler(File&& file) -{ - ensure(file); - return std::make_unique(std::forward(file)); -} - -// Compressed file serialization handler -struct compressed_serialization_file_handler : utils::serialization_file_handler -{ - const std::unique_ptr m_file_storage; - const std::add_pointer_t m_file; - std::vector m_stream_data; - usz m_stream_data_index = 0; - usz m_file_read_index = 0; - bool m_write_inited = false; - bool m_read_inited = false; - bool m_errored = false; - std::any m_stream; - - explicit compressed_serialization_file_handler(fs::file&& file) noexcept - : utils::serialization_file_handler() - , m_file_storage(std::make_unique(std::move(file))) - , m_file(m_file_storage.get()) - { - } - - explicit compressed_serialization_file_handler(const fs::file& file) noexcept - : utils::serialization_file_handler() - , m_file_storage(nullptr) - , m_file(std::addressof(file)) - { - } - - compressed_serialization_file_handler(const compressed_serialization_file_handler&) = delete; - - // Handle file read and write requests - bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; - - // Get available memory or file size - // Preferably memory size if is already greater/equal to recommended to avoid additional file ops - usz get_size(const utils::serial& ar, usz recommended) const override; - void skip_until(utils::serial& ar) override; - - void finalize(utils::serial& ar) override; - -private: - usz read_at(utils::serial& ar, usz read_pos, void* data, usz size); - void initialize(utils::serial& ar); -}; - -template requires (std::is_same_v, fs::file>) -inline std::unique_ptr make_compressed_serialization_file_handler(File&& file) -{ - ensure(file); - return std::make_unique(std::forward(file)); -} - -// Null file serialization handler -struct null_serialization_file_handler : utils::serialization_file_handler -{ - explicit null_serialization_file_handler() noexcept - { - } - - // Handle file read and write requests - bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; - - void finalize(utils::serial& ar) override; - - bool is_null() const override - { - return true; - } -}; - -inline std::unique_ptr make_null_serialization_file_handler() -{ - return std::make_unique(); -} - bool load_and_check_reserved(utils::serial& ar, usz size); bool is_savestate_version_compatible(const std::vector& data, bool is_boot_check); std::vector get_savestate_versioning_data(fs::file&& file, std::string_view filepath); bool is_savestate_compatible(fs::file&& file, std::string_view filepath); std::vector read_used_savestate_versions(); -std::string get_savestate_file(std::string_view title_id, std::string_view boot_path, s64 abs_id, s64 rel_id); - +std::string get_savestate_file(std::string_view title_id, std::string_view boot_path, s64 abs_id, s64 rel_id); \ No newline at end of file diff --git a/rpcs3/Loader/TAR.cpp b/rpcs3/Loader/TAR.cpp index cc777229a5a3..ca496774f596 100644 --- a/rpcs3/Loader/TAR.cpp +++ b/rpcs3/Loader/TAR.cpp @@ -9,7 +9,7 @@ #include "util/asm.hpp" -#include "Emu/savestate_utils.hpp" +#include "util/serialization_ext.hpp" #include #include @@ -139,7 +139,7 @@ std::unique_ptr tar_object::get_file(const std::string& path, std } else { - tar_log.error("tar_object::get_file() failed to parse header: offset=0x%x, filesize=0x%x", largest_offset, max_size); + tar_log.notice("tar_object::get_file() failed to parse header: offset=0x%x, filesize=0x%x, header_first16=0x%016x", offset, max_size, read_from_ptr>(reinterpret_cast(&header))); } return { size, {} }; diff --git a/rpcs3/emucore.vcxproj b/rpcs3/emucore.vcxproj index 0d7a9a74ea1d..d95d172d0ac7 100644 --- a/rpcs3/emucore.vcxproj +++ b/rpcs3/emucore.vcxproj @@ -135,6 +135,9 @@ + + NotUsing + NotUsing @@ -622,6 +625,7 @@ + diff --git a/rpcs3/emucore.vcxproj.filters b/rpcs3/emucore.vcxproj.filters index 447430681ef8..4166ed3b3dc0 100644 --- a/rpcs3/emucore.vcxproj.filters +++ b/rpcs3/emucore.vcxproj.filters @@ -1099,6 +1099,9 @@ Emu\GPU\RSX\Overlays + + Emu + Emu @@ -2109,6 +2112,9 @@ Emu + + Utilities + Emu @@ -2389,7 +2395,7 @@ Crypto - + Emu diff --git a/rpcs3/rpcs3qt/main_window.cpp b/rpcs3/rpcs3qt/main_window.cpp index 9a1abe763af3..f5ca6984fc95 100644 --- a/rpcs3/rpcs3qt/main_window.cpp +++ b/rpcs3/rpcs3qt/main_window.cpp @@ -56,7 +56,6 @@ #include "Emu/vfs_config.h" #include "Emu/System.h" #include "Emu/system_utils.hpp" -#include "Emu/savestate_utils.hpp" #include "Crypto/unpkg.h" #include "Crypto/unself.h" @@ -70,6 +69,7 @@ #include "Utilities/Thread.h" #include "util/sysinfo.hpp" +#include "util/serialization_ext.hpp" #include "ui_main_window.h" @@ -1535,10 +1535,7 @@ void main_window::HandlePupInstallation(const QString& file_path, const QString& if (update_file_stream->m_file_handler) { // Forcefully read all the data - update_file_stream->pop(); - update_file_stream->pos = umax; - update_file_stream->pos /= 2; // Avoid internal overflows - update_file_stream->m_file_handler->handle_file_op(*update_file_stream, update_file_stream->pos, 1, nullptr); + update_file_stream->m_file_handler->handle_file_op(*update_file_stream, 0, update_file_stream->get_size(umax), nullptr); } fs::file update_file = fs::make_stream(std::move(update_file_stream->data)); diff --git a/rpcs3/util/serialization.hpp b/rpcs3/util/serialization.hpp index 29019d830c9c..313a0df26574 100644 --- a/rpcs3/util/serialization.hpp +++ b/rpcs3/util/serialization.hpp @@ -63,20 +63,31 @@ namespace utils { } + virtual bool is_valid() const + { + return true; + } + virtual void finalize(utils::serial&) = 0; }; struct serial { +private: + bool m_is_writing = true; + bool m_expect_little_data = false; +public: std::vector data; usz data_offset = 0; usz pos = 0; usz m_max_data = umax; - bool m_is_writing = true; - bool m_avoid_large_prefetch = false; std::unique_ptr m_file_handler; - serial() noexcept = default; + serial(bool expect_little_data = false) noexcept + : m_expect_little_data(expect_little_data) + { + } + serial(const serial&) = delete; serial& operator=(const serial&) = delete; explicit serial(serial&&) noexcept = default; @@ -89,6 +100,12 @@ namespace utils return m_is_writing; } + // Return true if small amounts of both input and output memory are expected (performance hint) + bool expect_little_data() const + { + return m_expect_little_data; + } + // Reserve memory for serialization void reserve(usz size) { @@ -388,7 +405,7 @@ namespace utils // Convert serialization manager to deserializion manager // If no arg is provided reuse saved buffer - void set_reading_state(std::vector&& _data = std::vector{}, bool avoid_large_prefetch = false) + void set_reading_state(std::vector&& _data = std::vector{}, bool expect_little_data = false) { if (!_data.empty()) { @@ -396,7 +413,8 @@ namespace utils } m_is_writing = false; - m_avoid_large_prefetch = avoid_large_prefetch; + m_expect_little_data = expect_little_data; + m_max_data = umax; pos = 0; data_offset = 0; } @@ -448,9 +466,9 @@ namespace utils // Execute only if past memory is known to not going be reused void breathe(bool forced = false) { - if (!forced && (!m_file_handler || (data.size() < 0x20'0000 && pos >= data_offset))) + if (!forced && (!m_file_handler || (data.size() < 0x100'0000 && pos >= data_offset))) { - // Let's not do anything if less than 32MB + // Let's not do anything if less than 16MB return; } diff --git a/rpcs3/util/serialization_ext.cpp b/rpcs3/util/serialization_ext.cpp new file mode 100644 index 000000000000..e16c5eaa9358 --- /dev/null +++ b/rpcs3/util/serialization_ext.cpp @@ -0,0 +1,738 @@ +#include "util/types.hpp" +#include "util/logs.hpp" +#include "util/asm.hpp" +#include "util/simd.hpp" +#include "util/endian.hpp" + +#include "Utilities/lockless.h" +#include "Utilities/File.h" +#include "Utilities/StrFmt.h" +#include "serialization_ext.hpp" + +#include + +LOG_CHANNEL(sys_log, "SYS"); + +template <> +void fmt_class_string::format(std::string& out, u64 arg) +{ + const utils::serial& ar = get_object(arg); + + be_t sample64 = 0; + const usz read_size = std::min(ar.data.size(), sizeof(sample64)); + std::memcpy(&sample64, ar.data.data() + ar.data.size() - read_size, read_size); + + fmt::append(out, "{ %s, 0x%x/0x%x, memory=0x%x, sample64=0x%016x }", ar.is_writing() ? "writing" : "reading", ar.pos, ar.data_offset + ar.data.size(), ar.data.size(), sample64); +} + +static constexpr uInt adjust_for_uint(usz size) +{ + return static_cast(std::min(uInt{umax}, size)); +} + +bool uncompressed_serialization_file_handler::handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) +{ + if (ar.is_writing()) + { + if (data) + { + m_file->seek(pos); + m_file->write(data, size); + return true; + } + + m_file->seek(ar.data_offset); + m_file->write(ar.data); + + if (pos == umax && size == umax) + { + // Request to flush the file to disk + m_file->sync(); + } + + ar.seek_end(); + ar.data_offset = ar.pos; + ar.data.clear(); + return true; + } + + if (!size) + { + return true; + } + + if (pos == 0 && size == umax) + { + // Discard loaded data until pos if profitable + const usz limit = ar.data_offset + ar.data.size(); + + if (ar.pos > ar.data_offset && ar.pos < limit) + { + const usz may_discard_bytes = ar.pos - ar.data_offset; + const usz moved_byte_count_on_discard = limit - ar.pos; + + // Cheeck profitability (check recycled memory and std::memmove costs) + if (may_discard_bytes >= 0x50'0000 || (may_discard_bytes >= 0x20'0000 && moved_byte_count_on_discard / may_discard_bytes < 3)) + { + ar.data_offset += may_discard_bytes; + ar.data.erase(ar.data.begin(), ar.data.begin() + may_discard_bytes); + + if (ar.data.capacity() >= 0x200'0000) + { + // Discard memory + ar.data.shrink_to_fit(); + } + } + + return true; + } + + // Discard all loaded data + ar.data_offset = ar.pos; + ar.data.clear(); + + if (ar.data.capacity() >= 0x200'0000) + { + // Discard memory + ar.data.shrink_to_fit(); + } + + return true; + } + + if (~pos < size - 1) + { + // Overflow + return false; + } + + if (ar.data.empty()) + { + // Relocate instead of over-fetch + ar.data_offset = pos; + } + + const usz read_pre_buffer = ar.data.empty() ? 0 : utils::sub_saturate(ar.data_offset, pos); + + if (read_pre_buffer) + { + // Read past data + // Harsh operation on performance, luckily rare and not typically needed + // Also this may would be disallowed when moving to compressed files + // This may be a result of wrong usage of breathe() function + ar.data.resize(ar.data.size() + read_pre_buffer); + std::memmove(ar.data.data() + read_pre_buffer, ar.data.data(), ar.data.size() - read_pre_buffer); + ensure(m_file->read_at(pos, ar.data.data(), read_pre_buffer) == read_pre_buffer); + ar.data_offset -= read_pre_buffer; + } + + // Adjustment to prevent overflow + const usz subtrahend = ar.data.empty() ? 0 : 1; + const usz read_past_buffer = utils::sub_saturate(pos + (size - subtrahend), ar.data_offset + (ar.data.size() - subtrahend)); + const usz read_limit = utils::sub_saturate(ar.m_max_data, ar.data_offset); + + if (read_past_buffer) + { + // Read proceeding data + // More lightweight operation, this is the common operation + // Allowed to fail, if memory is truly needed an assert would take place later + const usz old_size = ar.data.size(); + + // Try to prefetch data by reading more than requested + ar.data.resize(std::min(read_limit, std::max({ ar.data.capacity(), ar.data.size() + read_past_buffer * 3 / 2, ar.expect_little_data() ? usz{4096} : usz{0x10'0000} }))); + ar.data.resize(m_file->read_at(old_size + ar.data_offset, data ? const_cast(data) : ar.data.data() + old_size, ar.data.size() - old_size) + old_size); + } + + return true; +} + +usz uncompressed_serialization_file_handler::get_size(const utils::serial& ar, usz recommended) const +{ + if (ar.is_writing()) + { + return m_file->size(); + } + + const usz memory_available = ar.data_offset + ar.data.size(); + + if (memory_available >= recommended) + { + // Avoid calling size() if possible + return memory_available; + } + + return std::max(m_file->size(), memory_available); +} + +void uncompressed_serialization_file_handler::finalize(utils::serial& ar) +{ + ar.seek_end(); + handle_file_op(ar, 0, umax, nullptr); + ar.data = {}; // Deallocate and clear +} + +struct compressed_stream_data +{ + z_stream m_zs{}; + lf_queue> m_queued_data_to_process; + lf_queue> m_queued_data_to_write; + atomic_t m_pending_bytes = 0; +}; + +void compressed_serialization_file_handler::initialize(utils::serial& ar) +{ + if (!m_stream) + { + m_stream = std::make_shared(); + } + + if (ar.is_writing()) + { + if (m_write_inited) + { + return; + } + + z_stream& m_zs = m_stream->m_zs; +#ifndef _MSC_VER +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wold-style-cast" +#endif + if (m_read_inited) + { + finalize(ar); + } + + m_zs = {}; + ensure(deflateInit2(&m_zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) == Z_OK); +#ifndef _MSC_VER +#pragma GCC diagnostic pop +#endif + m_write_inited = true; + m_errored = false; + + if (!ar.expect_little_data()) + { + m_stream_data_prepare_thread = std::make_unique>>("Compressed Data Prepare Thread"sv, [this]() { this->stream_data_prepare_thread_op(); }); + m_file_writer_thread = std::make_unique>>("Compressed File Writer Thread"sv, [this]() { this->file_writer_thread_op(); }); + } + } + else + { + if (m_read_inited) + { + return; + } + + if (m_write_inited) + { + finalize(ar); + } + + z_stream& m_zs = m_stream->m_zs; + + m_zs.avail_in = 0; + m_zs.avail_out = 0; + m_zs.next_in = nullptr; + m_zs.next_out = nullptr; + #ifndef _MSC_VER + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wold-style-cast" + #endif + ensure(inflateInit2(&m_zs, 16 + 15) == Z_OK); + m_read_inited = true; + m_errored = false; + } +} + +bool compressed_serialization_file_handler::handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) +{ + if (ar.is_writing()) + { + initialize(ar); + + if (m_errored) + { + return false; + } + + auto& manager = *m_stream; + auto& stream_data = manager.m_queued_data_to_process; + + if (data) + { + ensure(false); + } + + // Writing not at the end is forbidden + ensure(ar.pos == ar.data_offset + ar.data.size()); + + if (ar.data.empty()) + { + return true; + } + + ar.seek_end(); + + if (!m_file_writer_thread) + { + // Avoid multi-threading for small files + blocked_compressed_write(ar.data); + } + else + { + while (true) + { + // Avoid flooding RAM, wait if there is too much pending memory + const usz new_value = m_pending_bytes.atomic_op([&](usz v) + { + v &= ~(1ull << 63); + + if (v + ar.data.size() > 0x400'0000) + { + v |= 1ull << 63; + } + else + { + v += ar.data.size(); + } + + return v; + }); + + if (new_value & (1ull << 63)) + { + m_pending_bytes.wait(new_value); + } + else + { + break; + } + } + + stream_data.push(std::move(ar.data)); + } + + ar.data_offset = ar.pos; + ar.data.clear(); + + if (pos == umax && size == umax && *m_file) + { + // Request to flush the file to disk + m_file->sync(); + } + + return true; + } + + initialize(ar); + + if (m_errored) + { + return false; + } + + if (!size) + { + return true; + } + + if (pos == 0 && size == umax) + { + // Discard loaded data until pos if profitable + const usz limit = ar.data_offset + ar.data.size(); + + if (ar.pos > ar.data_offset && ar.pos < limit) + { + const usz may_discard_bytes = ar.pos - ar.data_offset; + const usz moved_byte_count_on_discard = limit - ar.pos; + + // Cheeck profitability (check recycled memory and std::memmove costs) + if (may_discard_bytes >= 0x50'0000 || (may_discard_bytes >= 0x20'0000 && moved_byte_count_on_discard / may_discard_bytes < 3)) + { + ar.data_offset += may_discard_bytes; + ar.data.erase(ar.data.begin(), ar.data.begin() + may_discard_bytes); + + if (ar.data.capacity() >= 0x200'0000) + { + // Discard memory + ar.data.shrink_to_fit(); + } + } + + return true; + } + + // Discard all loaded data + ar.data_offset += ar.data.size(); + ensure(ar.pos >= ar.data_offset); + ar.data.clear(); + + if (ar.data.capacity() >= 0x200'0000) + { + // Discard memory + ar.data.shrink_to_fit(); + } + + return true; + } + + if (~pos < size - 1) + { + // Overflow + return false; + } + + // TODO: Investigate if this optimization is worth an implementation for compressed stream + // if (ar.data.empty() && pos != ar.pos) + // { + // // Relocate instead of over-fetch + // ar.seek_pos(pos); + // } + + const usz read_pre_buffer = utils::sub_saturate(ar.data_offset, pos); + + if (read_pre_buffer) + { + // Not allowed with compressed data for now + // Unless someone implements mechanism for it + ensure(false); + } + + // Adjustment to prevent overflow + const usz subtrahend = ar.data.empty() ? 0 : 1; + const usz read_past_buffer = utils::sub_saturate(pos + (size - subtrahend), ar.data_offset + (ar.data.size() - subtrahend)); + const usz read_limit = utils::sub_saturate(ar.m_max_data, ar.data_offset); + + if (read_past_buffer) + { + // Read proceeding data + // More lightweight operation, this is the common operation + // Allowed to fail, if memory is truly needed an assert would take place later + const usz old_size = ar.data.size(); + + // Try to prefetch data by reading more than requested + ar.data.resize(std::min(read_limit, std::max({ ar.data.capacity(), ar.data.size() + read_past_buffer * 3 / 2, ar.expect_little_data() ? usz{4096} : usz{0x10'0000} }))); + ar.data.resize(this->read_at(ar, old_size + ar.data_offset, data ? const_cast(data) : ar.data.data() + old_size, ar.data.size() - old_size) + old_size); + } + + return true; +} + +usz compressed_serialization_file_handler::read_at(utils::serial& ar, usz read_pos, void* data, usz size) +{ + ensure(read_pos == ar.data.size() + ar.data_offset - size); + + if (!size || m_errored) + { + return 0; + } + + initialize(ar); + + z_stream& m_zs = m_stream->m_zs; + + const usz total_to_read = size; + usz read_size = 0; + u8* out_data = static_cast(data); + + for (; read_size < total_to_read;) + { + // Drain extracted memory stash (also before first file read) + out_data = static_cast(data) + read_size; + m_zs.avail_in = adjust_for_uint(m_stream_data.size() - m_stream_data_index); + m_zs.next_in = reinterpret_cast(m_stream_data.data() + m_stream_data_index); + m_zs.next_out = out_data; + m_zs.avail_out = adjust_for_uint(size - read_size); + + while (read_size < total_to_read && m_zs.avail_in) + { + const int res = inflate(&m_zs, Z_BLOCK); + + bool need_more_file_memory = false; + + switch (res) + { + case Z_OK: + case Z_STREAM_END: + break; + case Z_BUF_ERROR: + { + if (m_zs.avail_in) + { + need_more_file_memory = true; + break; + } + + [[fallthrough]]; + } + default: + m_errored = true; + inflateEnd(&m_zs); + m_read_inited = false; + sys_log.error("Failure of compressed data reading. (res=%d, read_size=0x%x, avail_in=0x%x, avail_out=0x%x, ar=%s)", res, read_size, m_zs.avail_in, m_zs.avail_out, ar); + return read_size; + } + + read_size = m_zs.next_out - static_cast(data); + m_stream_data_index = m_zs.avail_in ? m_zs.next_in - m_stream_data.data() : m_stream_data.size(); + + // Adjust again in case the values simply did not fit into uInt + m_zs.avail_out = adjust_for_uint(utils::sub_saturate(total_to_read, read_size)); + m_zs.avail_in = adjust_for_uint(m_stream_data.size() - m_stream_data_index); + + if (need_more_file_memory) + { + break; + } + } + + if (read_size >= total_to_read) + { + break; + } + + const usz add_size = ar.expect_little_data() ? 0x1'0000 : 0x10'0000; + const usz old_file_buf_size = m_stream_data.size(); + + m_stream_data.resize(old_file_buf_size + add_size); + m_stream_data.resize(old_file_buf_size + m_file->read_at(m_file_read_index, m_stream_data.data() + old_file_buf_size, add_size)); + + if (m_stream_data.size() == old_file_buf_size) + { + // EOF + break; + } + + m_file_read_index += m_stream_data.size() - old_file_buf_size; + } + + if (m_stream_data.size() - m_stream_data_index <= m_stream_data_index / 5) + { + // Shrink to required memory size + m_stream_data.erase(m_stream_data.begin(), m_stream_data.begin() + m_stream_data_index); + + if (m_stream_data.capacity() >= 0x200'0000) + { + // Discard memory + m_stream_data.shrink_to_fit(); + } + + m_stream_data_index = 0; + } + + return read_size; +} + +void compressed_serialization_file_handler::skip_until(utils::serial& ar) +{ + ensure(!ar.is_writing() && ar.pos >= ar.data_offset); + + if (ar.pos > ar.data_offset) + { + handle_file_op(ar, ar.data_offset, ar.pos - ar.data_offset, nullptr); + } +} + +void compressed_serialization_file_handler::finalize(utils::serial& ar) +{ + handle_file_op(ar, 0, umax, nullptr); + + if (!m_stream) + { + return; + } + + auto& stream = *m_stream; + z_stream& m_zs = m_stream->m_zs; + + if (m_read_inited) + { + ensure(inflateEnd(&m_zs) == Z_OK); + m_read_inited = false; + return; + } + + stream.m_queued_data_to_process.push(std::vector()); + + if (m_file_writer_thread) + { + // Join here to avoid log messages in the destructor + (*m_file_writer_thread)(); + } + + m_stream_data_prepare_thread.reset(); + m_file_writer_thread.reset(); + + m_zs.avail_in = 0; + m_zs.next_in = nullptr; + + m_stream_data.resize(0x10'0000); + + do + { + m_zs.avail_out = static_cast(m_stream_data.size()); + m_zs.next_out = m_stream_data.data(); + + if (deflate(&m_zs, Z_FINISH) == Z_STREAM_ERROR) + { + break; + } + + m_file->write(m_stream_data.data(), m_stream_data.size() - m_zs.avail_out); + } + while (m_zs.avail_out == 0); + + m_stream_data = {}; + ensure(deflateEnd(&m_zs) == Z_OK); + m_write_inited = false; + ar.data = {}; // Deallocate and clear +} + +void compressed_serialization_file_handler::stream_data_prepare_thread_op() +{ + compressed_stream_data& stream = *m_stream; + z_stream& m_zs = stream.m_zs; + + while (true) + { + stream.m_queued_data_to_process.wait(); + + for (auto&& data : stream.m_queued_data_to_process.pop_all()) + { + if (data.empty()) + { + // Abort is requested, flush data and exit + if (!m_stream_data.empty()) + { + stream.m_queued_data_to_write.push(std::move(m_stream_data)); + } + + stream.m_queued_data_to_write.push(std::vector()); + return; + } + + m_zs.avail_in = adjust_for_uint(data.size()); + m_zs.next_in = data.data(); + + usz buffer_offset = 0; + m_stream_data.resize(::compressBound(m_zs.avail_in)); + + do + { + m_zs.avail_out = adjust_for_uint(m_stream_data.size() - buffer_offset); + m_zs.next_out = m_stream_data.data() + buffer_offset; + + if (deflate(&m_zs, Z_NO_FLUSH) == Z_STREAM_ERROR) + { + m_errored = true; + deflateEnd(&m_zs); + + // Abort + stream.m_queued_data_to_write.push(std::vector()); + break; + } + + buffer_offset = m_zs.next_out - m_stream_data.data(); + + if (m_zs.avail_out == 0) + { + m_stream_data.resize(m_stream_data.size() + (m_zs.avail_in + 3ull) / 4); + } + + m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data())); + } + while (m_zs.avail_out == 0 || m_zs.avail_in != 0); + + // Forward for file write + const usz queued_size = data.size(); + ensure(buffer_offset); + m_pending_bytes += buffer_offset - queued_size; + m_stream_data.resize(buffer_offset); + stream.m_queued_data_to_write.push(std::move(m_stream_data)); + } + } +} + +void compressed_serialization_file_handler::file_writer_thread_op() +{ + compressed_stream_data& stream = *m_stream; + + // Data recheck after an abort request is detected so there will not be any missed data + bool rechecked = false; + + while (true) + { + stream.m_queued_data_to_write.wait(); + + for (auto&& data : stream.m_queued_data_to_write.pop_all()) + { + if (data.empty()) + { + return; + } + + const usz last_size = data.size(); + m_file->write(data); + data = {}; // Deallocate before notification + + if (m_pending_bytes.sub_fetch(last_size) == 1ull << 63) + { + m_pending_bytes.notify_all(); + } + } + } +} + +void compressed_serialization_file_handler::blocked_compressed_write(const std::vector& data) +{ + z_stream& m_zs = m_stream->m_zs; + + m_zs.avail_in = adjust_for_uint(data.size()); + m_zs.next_in = data.data(); + + m_stream_data.resize(::compressBound(m_zs.avail_in)); + + do + { + m_zs.avail_out = adjust_for_uint(m_stream_data.size()); + m_zs.next_out = m_stream_data.data(); + + if (deflate(&m_zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_file->write(m_stream_data.data(), m_stream_data.size() - m_zs.avail_out) != m_stream_data.size() - m_zs.avail_out) + { + m_errored = true; + deflateEnd(&m_zs); + break; + } + + m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data())); + } + while (m_zs.avail_out == 0); +} + +usz compressed_serialization_file_handler::get_size(const utils::serial& ar, usz recommended) const +{ + if (ar.is_writing()) + { + return m_file->size(); + } + + const usz memory_available = ar.data_offset + ar.data.size(); + + if (memory_available >= recommended) + { + // Avoid calling size() if possible + return memory_available; + } + + return std::max(utils::mul_saturate(m_file->size(), 6), memory_available); +} + +bool null_serialization_file_handler::handle_file_op(utils::serial&, usz, usz, const void*) +{ + return true; +} + +void null_serialization_file_handler::finalize(utils::serial&) +{ +} diff --git a/rpcs3/util/serialization_ext.hpp b/rpcs3/util/serialization_ext.hpp new file mode 100644 index 000000000000..798b44b21ffb --- /dev/null +++ b/rpcs3/util/serialization_ext.hpp @@ -0,0 +1,136 @@ +#pragma once + +#include "util/serialization.hpp" + +#include "Utilities/Thread.h" + +namespace fs +{ + class file; +} + +// Uncompressed file serialization handler +struct uncompressed_serialization_file_handler : utils::serialization_file_handler +{ + const std::unique_ptr m_file_storage; + const std::add_pointer_t m_file; + + explicit uncompressed_serialization_file_handler(fs::file&& file) noexcept + : utils::serialization_file_handler() + , m_file_storage(std::make_unique(std::move(file))) + , m_file(m_file_storage.get()) + { + } + + explicit uncompressed_serialization_file_handler(const fs::file& file) noexcept + : utils::serialization_file_handler() + , m_file_storage(nullptr) + , m_file(std::addressof(file)) + { + } + + uncompressed_serialization_file_handler(const uncompressed_serialization_file_handler&) = delete; + + // Handle file read and write requests + bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; + + // Get available memory or file size + // Preferably memory size if is already greater/equal to recommended to avoid additional file ops + usz get_size(const utils::serial& ar, usz recommended) const override; + + void finalize(utils::serial& ar) override; +}; + +template requires (std::is_same_v, fs::file>) +inline std::unique_ptr make_uncompressed_serialization_file_handler(File&& file) +{ + ensure(file); + return std::make_unique(std::forward(file)); +} + +struct compressed_stream_data; + +// Compressed file serialization handler +struct compressed_serialization_file_handler : utils::serialization_file_handler +{ + explicit compressed_serialization_file_handler(fs::file&& file) noexcept + : utils::serialization_file_handler() + , m_file_storage(std::make_unique(std::move(file))) + , m_file(m_file_storage.get()) + { + } + + explicit compressed_serialization_file_handler(const fs::file& file) noexcept + : utils::serialization_file_handler() + , m_file_storage(nullptr) + , m_file(std::addressof(file)) + { + } + + compressed_serialization_file_handler(const compressed_serialization_file_handler&) = delete; + + // Handle file read and write requests + bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; + + // Get available memory or file size + // Preferably memory size if is already greater/equal to recommended to avoid additional file ops + usz get_size(const utils::serial& ar, usz recommended) const override; + void skip_until(utils::serial& ar) override; + + bool is_valid() const override + { + return !m_errored; + } + + void finalize(utils::serial& ar) override; + +private: + const std::unique_ptr m_file_storage; + const std::add_pointer_t m_file; + std::vector m_stream_data; + usz m_stream_data_index = 0; + usz m_file_read_index = 0; + atomic_t m_pending_bytes = 0; + bool m_write_inited = false; + bool m_read_inited = false; + bool m_errored = false; + std::shared_ptr m_stream; + std::unique_ptr>> m_stream_data_prepare_thread; + std::unique_ptr>> m_file_writer_thread; + + usz read_at(utils::serial& ar, usz read_pos, void* data, usz size); + void initialize(utils::serial& ar); + void stream_data_prepare_thread_op(); + void file_writer_thread_op(); + void blocked_compressed_write(const std::vector& data); +}; + +template requires (std::is_same_v, fs::file>) +inline std::unique_ptr make_compressed_serialization_file_handler(File&& file) +{ + ensure(file); + return std::make_unique(std::forward(file)); +} + +// Null file serialization handler +struct null_serialization_file_handler : utils::serialization_file_handler +{ + explicit null_serialization_file_handler() noexcept + { + } + + // Handle file read and write requests + bool handle_file_op(utils::serial& ar, usz pos, usz size, const void* data) override; + + void finalize(utils::serial& ar) override; + + bool is_null() const override + { + return true; + } +}; + +inline std::unique_ptr make_null_serialization_file_handler() +{ + return std::make_unique(); +}