-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
caching.h
157 lines (134 loc) · 6.55 KB
/
caching.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "arrow/io/interfaces.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"
namespace arrow {
namespace io {
struct ARROW_EXPORT CacheOptions {
static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;
/// \brief The maximum distance in bytes between two consecutive
/// ranges; beyond this value, ranges are not combined
int64_t hole_size_limit;
/// \brief The maximum size in bytes of a combined range; if
/// combining two consecutive ranges would produce a range of a
/// size greater than this, they are not combined
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
/// lazy = false: request all byte ranges when PreBuffer or WillNeed is called.
/// lazy = True, prefetch_limit = 0: request merged byte ranges only after the reader
/// needs them.
/// lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges ahead of the
/// range that is currently being read.
bool lazy;
/// \brief The maximum number of ranges to be prefetched. This is only used
/// for lazy cache to asynchronously read some ranges after reading the target range.
int64_t prefetch_limit = 0;
bool operator==(const CacheOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
range_size_limit == other.range_size_limit && lazy == other.lazy &&
prefetch_limit == other.prefetch_limit;
}
/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
///
/// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in
/// milliseconds, also called call setup latency of a new read request.
/// The value is a positive integer.
/// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec
/// (per connection).
/// The value is a positive integer.
/// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth utilization fraction
/// (per connection) to maximize the net data load.
/// The value is a positive double precision number less than 1.
/// \param[in] max_ideal_request_size_mib The maximum single data request size (in MiB)
/// to maximize the net data load.
/// The value is a positive integer.
/// \return A new instance of CacheOptions.
static CacheOptions MakeFromNetworkMetrics(
int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac,
int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);
static CacheOptions Defaults();
static CacheOptions LazyDefaults();
};
namespace internal {
/// \brief A read cache designed to hide IO latencies when reading.
///
/// This class takes multiple byte ranges that an application expects to read, and
/// coalesces them into fewer, larger read requests, which benefits performance on some
/// filesystems, particularly remote ones like Amazon S3. By default, it also issues
/// these read requests in parallel up front.
///
/// To use:
/// 1. Cache() the ranges you expect to read in the future. Ideally, these ranges have
/// the exact offset and length that will later be read. The cache will combine those
/// ranges according to parameters (see constructor).
///
/// By default, the cache will also start fetching the combined ranges in parallel in
/// the background, unless CacheOptions.lazy is set.
///
/// 2. Call WaitFor() to be notified when the given ranges have been read. If
/// CacheOptions.lazy is set, I/O will be triggered in the background here instead.
/// This can be done in parallel (e.g. if parsing a file, call WaitFor() for each
/// chunk of the file that can be parsed in parallel).
///
/// 3. Call Read() to retrieve the actual data for the given ranges.
/// A synchronous application may skip WaitFor() and just call Read() - it will still
/// benefit from coalescing and parallel fetching.
class ARROW_EXPORT ReadRangeCache {
public:
static constexpr int64_t kDefaultHoleSizeLimit = 8192;
static constexpr int64_t kDefaultRangeSizeLimit = 32 * 1024 * 1024;
/// Construct a read cache with default
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx)
: ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {}
/// Construct a read cache with given options
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
CacheOptions options)
: ReadRangeCache(file, file.get(), std::move(ctx), options) {}
/// Construct a read cache with an unowned file
ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options)
: ReadRangeCache(NULLPTR, file, std::move(ctx), options) {}
~ReadRangeCache();
/// \brief Cache the given ranges in the background.
///
/// The caller must ensure that the ranges do not overlap with each other,
/// nor with previously cached ranges. Otherwise, behaviour will be undefined.
Status Cache(std::vector<ReadRange> ranges);
/// \brief Read a range previously given to Cache().
Result<std::shared_ptr<Buffer>> Read(ReadRange range);
/// \brief Wait until all ranges added so far have been cached.
Future<> Wait();
/// \brief Wait until all given ranges have been cached.
Future<> WaitFor(std::vector<ReadRange> ranges);
protected:
struct Impl;
struct LazyImpl;
ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file, RandomAccessFile* file,
IOContext ctx, CacheOptions options);
std::unique_ptr<Impl> impl_;
};
} // namespace internal
} // namespace io
} // namespace arrow