-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCachingChannel.scala
156 lines (131 loc) · 4.36 KB
/
CachingChannel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package org.hammerlab.channel
import java.io.IOException
import java.nio.ByteBuffer
import java.util
import grizzled.slf4j.Logging
import hammerlab.bytes._
import org.hammerlab.channel.CachingChannel.Config
import org.hammerlab.math.ceil
import scala.math.{ max, min }
/**
* [[SeekableByteChannel]] that wraps another [[SeekableByteChannel]] and caches data read from the latter in chunks of
* size [[config.blockSize]] (quantized to boundaries at whole multiples of [[config.blockSize]])
*
* @param channel underlying channel to provide a caching layer over
*/
case class CachingChannel[Channel <: SeekableByteChannel](channel: Channel)(
implicit config: Config
)
extends SeekableByteChannel
with BufferByteChannel
with Logging {
val Config(blockSize, maxReadAttempts, maximumSize) = config
val maxNumBlocks = ceil(maximumSize, blockSize).toInt
private val _buffer = ByteBuffer.allocate(blockSize)
val blocks =
new util.LinkedHashMap[Long, ByteBuffer](
(maximumSize / blockSize).toInt,
0.7f,
true
) {
override def removeEldestEntry(eldest: util.Map.Entry[Long, ByteBuffer]): Boolean =
if (size() >= maxNumBlocks) {
debug(s"Size ${size()} > max num blocks $maxNumBlocks (total size $maximumSize)")
true
} else
false
}
def getBlock(idx: Long): ByteBuffer =
if (!blocks.containsKey(idx)) {
_buffer.clear()
val start = idx * blockSize
channel.seek(start)
if (channel.size - start < _buffer.limit) {
_buffer.limit((channel.size - start).toInt)
}
val bytesToRead = _buffer.remaining()
var attempts = 0
val end = start + bytesToRead
debug(s"Fetching block $idx: [$start,$end)")
while (channel.position() < end && attempts < maxReadAttempts) {
channel.read(_buffer)
attempts += 1
}
if (channel.position() < end) {
throw new IOException(
s"Read ${channel.position() - start} of $bytesToRead bytes from $start in $attempts attempts"
)
}
val dupe = ByteBuffer.allocate(bytesToRead)
_buffer.position(0)
dupe.put(_buffer)
blocks.put(idx, dupe)
debug(s"Fetched block $idx: [$start,$end)")
dupe
} else
blocks.get(idx)
override def size: Long = channel.size
override def _seek(newPos: Long): Unit = channel.seek(newPos)
override def _read(dst: ByteBuffer): Int = {
val start = position()
if (start == size)
return -1
val end = min(size, start + dst.remaining())
val startBlock = start / blockSize
val endBlock = end / blockSize
var bytesIdx = dst.position()
var numRead = 0
for {
idx ← startBlock to endBlock
blockStart = idx * blockSize
blockEnd = (idx + 1) * blockSize
from = max((start - blockStart).toInt, 0)
to = (min(end, blockEnd) - blockStart).toInt
blockBuffer = getBlock(idx)
} {
blockBuffer.limit(to)
blockBuffer.position(from)
dst.put(blockBuffer)
numRead += to - from
}
numRead
}
override def _close(): Unit =
channel.close()
}
object CachingChannel {
/**
* Configuration options for a [[CachingChannel]]
*
* @param blockSize size of blocks to cache
* @param maxReadAttempts all read/skip operations require the full requested number of bytes to be returned in at most
* this many attempts, or they will throw an [[IOException]].
* @param maximumSize evict blocks from cache to avoid growing beyond this size
*/
case class Config(blockSize: Int = 64.KB.toInt,
maxReadAttempts: Int = 2,
maximumSize: Long = 64.MB)
object Config {
implicit val default = Config()
}
implicit def makeCachingChannel[Channel <: SeekableByteChannel](channel: Channel)(
implicit config: Config
): CachingChannel[Channel] =
CachingChannel(channel)
implicit class AddCaching[Channel <: SeekableByteChannel](channel: Channel) {
def cache(implicit config: Config): CachingChannel[Channel] =
makeCachingChannel(channel)
def cache(blockSize: Int = 64.KB.toInt,
maxReadAttempts: Int = 2,
maximumSize: Long = 64.MB): CachingChannel[Channel] =
makeCachingChannel(
channel
)(
Config(
blockSize,
maxReadAttempts,
maximumSize
)
)
}
}