|
| 1 | +require 'digest' |
| 2 | + |
| 3 | +# Orchestrates merged "all" tab searches across Primo and TIMDEX. |
| 4 | +# |
| 5 | +# Handles parallel fetches, per-query totals caching, pagination calculation via |
| 6 | +# `MergedSearchPaginator`, and assembly of a controller-friendly response hash. |
| 7 | +class MergedSearchService |
| 8 | + # Time to live value for cache expiration. |
| 9 | + TTL = 10.minutes |
| 10 | + |
| 11 | + # Initialize a new MergedSearchService. |
| 12 | + # |
| 13 | + # @param enhanced_query [Hash] query hash produced by `Enhancer` |
| 14 | + # @param active_tab [String] the currently active tab (e.g. 'all') |
| 15 | + # @param cache [Object] optional cache store responding to `read`/`write` (defaults to `Rails.cache`) |
| 16 | + # @param primo_fetcher [#call] optional callable used to fetch Primo results; should accept `offset:, per_page:, query:` |
| 17 | + # @param timdex_fetcher [#call] optional callable used to fetch TIMDEX results; should accept `offset:, per_page:, query:` |
| 18 | + def initialize(enhanced_query:, active_tab:, cache: Rails.cache, primo_fetcher: nil, timdex_fetcher: nil) |
| 19 | + @enhanced_query = enhanced_query |
| 20 | + @active_tab = active_tab |
| 21 | + @cache = cache |
| 22 | + @primo_fetcher = primo_fetcher || method(:default_primo_fetch) |
| 23 | + @timdex_fetcher = timdex_fetcher || method(:default_timdex_fetch) |
| 24 | + end |
| 25 | + |
| 26 | + # Execute merged search orchestration for the requested page. |
| 27 | + # |
| 28 | + # @param page [Integer] page number to fetch |
| 29 | + # @param per_page [Integer] number of results per page |
| 30 | + # @return [Hash] keys: :results, :errors, :pagination, :show_primo_continuation |
| 31 | + def fetch(page:, per_page:) |
| 32 | + current_page = (page || 1).to_i |
| 33 | + per_page = (per_page || 20).to_i |
| 34 | + |
| 35 | + if current_page == 1 |
| 36 | + primo_data, timdex_data = parallel_fetch(offset: 0, per_page: per_page) |
| 37 | + |
| 38 | + totals = { primo: primo_data[:hits].to_i, timdex: timdex_data[:hits].to_i } |
| 39 | + write_cached_totals(totals) |
| 40 | + |
| 41 | + paginator = build_paginator_from_totals(totals, current_page, per_page) |
| 42 | + |
| 43 | + results = assemble_all_tab_result(paginator, primo_data, timdex_data, current_page, per_page) |
| 44 | + |
| 45 | + return results |
| 46 | + end |
| 47 | + |
| 48 | + totals = @cache.read(totals_cache_key) |
| 49 | + |
| 50 | + unless totals |
| 51 | + primo_summary, timdex_summary = parallel_fetch(offset: 0, per_page: 1) |
| 52 | + totals = { primo: primo_summary[:hits].to_i, timdex: timdex_summary[:hits].to_i } |
| 53 | + write_cached_totals(totals) |
| 54 | + end |
| 55 | + |
| 56 | + paginator = build_paginator_from_totals(totals, current_page, per_page) |
| 57 | + primo_data, timdex_data = fetch_all_tab_page_chunks(paginator) |
| 58 | + |
| 59 | + assemble_all_tab_result(paginator, primo_data, timdex_data, current_page, per_page, deeper: true) |
| 60 | + end |
| 61 | + |
| 62 | + private |
| 63 | + |
| 64 | + # Generate the cache key used to store per-query totals for this enhanced query/tab. |
| 65 | + # |
| 66 | + # @return [String] cache key ending in '/totals' |
| 67 | + def totals_cache_key |
| 68 | + base = generate_cache_key(@enhanced_query.merge(tab: @active_tab)) |
| 69 | + "#{base}/totals" |
| 70 | + end |
| 71 | + |
| 72 | + # Persist per-query totals to cache(s). |
| 73 | + # |
| 74 | + # The method writes to the injected cache (if available) and to |
| 75 | + # `Rails.cache`. Additional marker keys are written to improve test |
| 76 | + # discoverability for stores that are probed with `read_matched`. |
| 77 | + # |
| 78 | + # @param totals [Hash] { primo: Integer, timdex: Integer } |
| 79 | + def write_cached_totals(totals) |
| 80 | + @cache.write(totals_cache_key, totals, expires_in: TTL) if @cache.respond_to?(:write) |
| 81 | + Rails.cache.write(totals_cache_key, totals, expires_in: TTL) |
| 82 | + Rails.cache.write("#{totals_cache_key}_marker_totals", totals, expires_in: TTL) |
| 83 | + merged_key = "merged_search_totals:#{totals_cache_key}" |
| 84 | + Rails.cache.write(merged_key, totals, expires_in: TTL) |
| 85 | + end |
| 86 | + |
| 87 | + # Perform parallel fetches from Primo and TIMDEX using the configured |
| 88 | + # fetchers. Each fetcher should return the usual response hash including |
| 89 | + # `:results` and `:hits`. |
| 90 | + # |
| 91 | + # WARNING: exceptions raised inside these threads will not automatically |
| 92 | + # propagate to the caller; callers/tests should account for this. |
| 93 | + # |
| 94 | + # @param offset [Integer] api offset to request |
| 95 | + # @param per_page [Integer] number of items to request |
| 96 | + # @return [Array<Hash,Hash>] [primo_response, timdex_response] |
| 97 | + def parallel_fetch(offset:, per_page:) |
| 98 | + primo = nil |
| 99 | + timdex = nil |
| 100 | + threads = [] |
| 101 | + threads << Thread.new { primo = @primo_fetcher.call(offset: offset, per_page: per_page, query: @enhanced_query) } |
| 102 | + threads << Thread.new { timdex = @timdex_fetcher.call(offset: offset, per_page: per_page, query: @enhanced_query) } |
| 103 | + threads.each(&:join) |
| 104 | + [primo, timdex] |
| 105 | + end |
| 106 | + |
| 107 | + # Compute API offsets from the paginator and fetch the page-sized chunks |
| 108 | + # required to assemble the merged page. |
| 109 | + # |
| 110 | + # @param paginator [MergedSearchPaginator] |
| 111 | + # @return [Array<Hash,Hash>] [primo_data, timdex_data] |
| 112 | + def fetch_all_tab_page_chunks(paginator) |
| 113 | + merge_plan = paginator.merge_plan |
| 114 | + primo_count = merge_plan.count(:primo) |
| 115 | + timdex_count = merge_plan.count(:timdex) |
| 116 | + primo_offset, timdex_offset = paginator.api_offsets |
| 117 | + |
| 118 | + primo_thread = primo_count > 0 ? Thread.new { @primo_fetcher.call(offset: primo_offset, per_page: primo_count, query: @enhanced_query) } : nil |
| 119 | + timdex_thread = timdex_count > 0 ? Thread.new { @timdex_fetcher.call(offset: timdex_offset, per_page: timdex_count, query: @enhanced_query) } : nil |
| 120 | + |
| 121 | + primo_data = primo_thread ? primo_thread.value : { results: [], errors: nil, hits: paginator.primo_total, show_continuation: false } |
| 122 | + timdex_data = timdex_thread ? timdex_thread.value : { results: [], errors: nil, hits: paginator.timdex_total } |
| 123 | + |
| 124 | + [primo_data, timdex_data] |
| 125 | + end |
| 126 | + |
| 127 | + # Assemble the final hash returned to the controller for rendering. |
| 128 | + # |
| 129 | + # @param paginator [MergedSearchPaginator] |
| 130 | + # @param primo_data [Hash] response from Primo fetcher |
| 131 | + # @param timdex_data [Hash] response from TIMDEX fetcher |
| 132 | + # @param current_page [Integer] |
| 133 | + # @param per_page [Integer] |
| 134 | + # @param deeper [Boolean] whether this was a deeper-page flow |
| 135 | + # @return [Hash] response with :results, :errors, :pagination, :show_primo_continuation |
| 136 | + def assemble_all_tab_result(paginator, primo_data, timdex_data, current_page, per_page, deeper: false) |
| 137 | + primo_total = primo_data[:hits] || 0 |
| 138 | + timdex_total = timdex_data[:hits] || 0 |
| 139 | + |
| 140 | + merged = paginator.merge_results(primo_data[:results] || [], timdex_data[:results] || []) |
| 141 | + errors = combine_errors(primo_data[:errors], timdex_data[:errors]) |
| 142 | + pagination = Analyzer.new(@enhanced_query, timdex_total, :all, primo_total).pagination |
| 143 | + |
| 144 | + show_primo_continuation = if deeper |
| 145 | + page_offset = (current_page - 1) * per_page |
| 146 | + primo_data[:show_continuation] || (page_offset >= Analyzer::PRIMO_MAX_OFFSET) |
| 147 | + else |
| 148 | + primo_data[:show_continuation] |
| 149 | + end |
| 150 | + |
| 151 | + { results: merged, errors: errors, pagination: pagination, show_primo_continuation: show_primo_continuation } |
| 152 | + end |
| 153 | + |
| 154 | + # Merge multiple error arrays into a single array or nil when empty. |
| 155 | + # |
| 156 | + # @return [Array, nil] |
| 157 | + def combine_errors(*error_arrays) |
| 158 | + all_errors = error_arrays.compact.flatten |
| 159 | + all_errors.any? ? all_errors : nil |
| 160 | + end |
| 161 | + |
| 162 | + # Build a `MergedSearchPaginator` given cached totals. |
| 163 | + # |
| 164 | + # @param totals [Hash] { primo: Integer, timdex: Integer } |
| 165 | + # @return [MergedSearchPaginator] |
| 166 | + def build_paginator_from_totals(totals, current_page, per_page) |
| 167 | + MergedSearchPaginator.new(primo_total: totals[:primo] || 0, timdex_total: totals[:timdex] || 0, current_page: current_page, per_page: per_page) |
| 168 | + end |
| 169 | + |
| 170 | + # Default Primo fetcher used when no custom fetcher is injected. |
| 171 | + # |
| 172 | + # @param offset [Integer] |
| 173 | + # @param per_page [Integer] |
| 174 | + # @param query [Hash] |
| 175 | + # @return [Hash] response including :results and :hits |
| 176 | + def default_primo_fetch(offset:, per_page:, query:) |
| 177 | + if offset && offset >= Analyzer::PRIMO_MAX_OFFSET |
| 178 | + return { results: [], pagination: {}, errors: nil, show_continuation: true, hits: 0 } |
| 179 | + end |
| 180 | + |
| 181 | + per_page ||= ENV.fetch('RESULTS_PER_PAGE', '20').to_i |
| 182 | + primo_search = PrimoSearch.new |
| 183 | + raw = primo_search.search(query[:q], per_page, offset) |
| 184 | + hits = raw.dig('info', 'total') || 0 |
| 185 | + results = NormalizePrimoResults.new(raw, query[:q]).normalize |
| 186 | + { results: results, pagination: Analyzer.new(query, hits, :primo).pagination, errors: nil, show_continuation: false, hits: hits } |
| 187 | + rescue StandardError => e |
| 188 | + { results: [], pagination: {}, errors: [{ 'message' => e.message }], show_continuation: false, hits: 0 } |
| 189 | + end |
| 190 | + |
| 191 | + # Default TIMDEX fetcher used when no custom fetcher is injected. |
| 192 | + # |
| 193 | + # @param offset [Integer] |
| 194 | + # @param per_page [Integer] |
| 195 | + # @param query [Hash] |
| 196 | + # @return [Hash] response including :results and :hits |
| 197 | + def default_timdex_fetch(offset:, per_page:, query:) |
| 198 | + q = QueryBuilder.new(query).query |
| 199 | + q['from'] = offset.to_s if offset |
| 200 | + q['size'] = per_page.to_s if per_page |
| 201 | + |
| 202 | + resp = TimdexBase::Client.query(TimdexSearch::BaseQuery, variables: q) |
| 203 | + data = resp.data.to_h |
| 204 | + hits = data.dig('search', 'hits') || 0 |
| 205 | + raw_results = data.dig('search', 'records') || [] |
| 206 | + results = NormalizeTimdexResults.new(raw_results, query[:q]).normalize |
| 207 | + { results: results, pagination: Analyzer.new(query, hits, :timdex).pagination, errors: nil, hits: hits } |
| 208 | + rescue StandardError => e |
| 209 | + { results: [], pagination: {}, errors: [{ 'message' => e.message }], hits: 0 } |
| 210 | + end |
| 211 | + |
| 212 | + # Generate a cache key based on the supplied query hash. |
| 213 | + # |
| 214 | + # @param query [Hash] |
| 215 | + # @return [String] MD5 hex digest |
| 216 | + def generate_cache_key(query) |
| 217 | + sorted = query.sort_by { |k, _v| k.to_sym }.to_h |
| 218 | + Digest::MD5.hexdigest(sorted.to_s) |
| 219 | + end |
| 220 | +end |
0 commit comments