-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
RemoteBlockInStream.java
125 lines (109 loc) · 4.01 KB
/
RemoteBlockInStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package tachyon.client.block;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import com.google.common.base.Preconditions;
import tachyon.client.RemoteBlockReader;
import tachyon.client.ClientContext;
import tachyon.thrift.NetAddress;
import tachyon.util.io.BufferUtils;
/**
* This class provides a streaming API to read a block in Tachyon. The data will be transferred
* through a Tachyon worker's dataserver to the client. The instances of this class should only be
* used by one thread and are not thread safe.
*/
public class RemoteBlockInStream extends BlockInStream {
private final long mBlockId;
private final BSContext mContext;
private final long mBlockSize;
private final InetSocketAddress mLocation;
private long mPos;
/**
* Creates a new remote block input stream.
*
* @param blockId the block id
* @param blockSize the block size
* @param location the location
*/
// TODO: Modify the locking so the stream owns the lock instead of the data server
public RemoteBlockInStream(long blockId, long blockSize, NetAddress location) {
mBlockId = blockId;
mContext = BSContext.INSTANCE;
mBlockSize = blockSize;
// TODO: Validate these fields
mLocation = new InetSocketAddress(location.getMHost(), location.getMSecondaryPort());
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
if (read(b) == -1) {
return -1;
}
return BufferUtils.byteToInt(b[0]);
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkArgument(b != null, "Buffer is null");
Preconditions.checkArgument(off >= 0 && len >= 0 && len + off <= b.length,
String.format("Buffer length (%d), offset(%d), len(%d)", b.length, off, len));
if (len == 0) {
return 0;
} else if (mPos == mBlockSize) {
return -1;
}
// We read at most len bytes, but if mPos + len exceeds the length of the block, we only
// read up to the end of the block
int lengthToRead = (int) Math.min(len, mBlockSize - mPos);
int bytesLeft = lengthToRead;
while (bytesLeft > 0) {
// TODO: Fix needing to recreate reader each time
RemoteBlockReader reader =
RemoteBlockReader.Factory.createRemoteBlockReader(ClientContext.getConf());
ByteBuffer data = reader.readRemoteBlock(mLocation, mBlockId, mPos, bytesLeft);
int bytesToRead = Math.min(bytesLeft, data.remaining());
data.get(b, off, bytesToRead);
reader.close();
mPos += bytesToRead;
bytesLeft -= bytesToRead;
}
return lengthToRead;
}
@Override
public long remaining() {
return mBlockSize - mPos;
}
@Override
public void seek(long pos) throws IOException {
Preconditions.checkArgument(pos > 0, "Seek position is negative: " + pos);
Preconditions.checkArgument(pos < mBlockSize,
"Seek position: " + pos + " is past block size: " + mBlockSize);
mPos = pos;
}
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long skipped = Math.min(n, mBlockSize - mPos);
mPos += skipped;
return skipped;
}
}