-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
BaseKeyValuePartitionReader.java
145 lines (126 loc) · 4.5 KB
/
BaseKeyValuePartitionReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the “License”). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package alluxio.client.keyvalue;
import alluxio.Constants;
import alluxio.client.ClientContext;
import alluxio.client.block.AlluxioBlockStore;
import alluxio.exception.AlluxioException;
import alluxio.util.io.BufferUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
/**
* Default implementation of {@link KeyValuePartitionReader} to talk to a remote key-value worker to
* get the value of a given key.
*/
@NotThreadSafe
final class BaseKeyValuePartitionReader implements KeyValuePartitionReader {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private KeyValueWorkerClient mClient;
private long mBlockId;
private boolean mClosed;
// TODO(binfan): take parition id as input
/**
* Constructs {@link BaseKeyValuePartitionReader} given a block id.
*
* @param blockId blockId of the key-value file to read from
* @throws AlluxioException if an unexpected Alluxio exception is thrown
* @throws IOException if a non-Alluxio exception occurs
*/
BaseKeyValuePartitionReader(long blockId) throws AlluxioException, IOException {
mBlockId = blockId;
BlockInfo info = AlluxioBlockStore.get().getInfo(mBlockId);
WorkerNetAddress workerAddr = info.getLocations().get(0).getWorkerAddress();
mClient = new KeyValueWorkerClient(workerAddr, ClientContext.getConf());
mClosed = false;
}
// This could be slow when value size is large, use with caution.
@Override
public byte[] get(byte[] key) throws IOException, AlluxioException {
ByteBuffer keyBuffer = ByteBuffer.wrap(key);
ByteBuffer value = getInternal(keyBuffer);
if (value == null) {
return null;
}
return BufferUtils.newByteArrayFromByteBuffer(value);
}
@Override
public ByteBuffer get(ByteBuffer key) throws IOException, AlluxioException {
return getInternal(key);
}
@Override
public void close() {
if (mClosed) {
return;
}
mClient.close();
mClosed = true;
}
/**
* Returns the value in {@link ByteBuffer} in this partition, or null if not found.
*
* @param key the key to lookup
* @return the value of this key
* @throws IOException if an I/O error occurs
* @throws AlluxioException if an Alluxio error occurs
*/
private ByteBuffer getInternal(ByteBuffer key) throws IOException, AlluxioException {
Preconditions.checkState(!mClosed, "Can not query a reader closed");
ByteBuffer value = mClient.get(mBlockId, key);
if (value.remaining() == 0) {
return null;
}
return value;
}
private class Iterator implements KeyValueIterator {
private ByteBuffer mNextKey;
/**
* Gets the first key-value pair and constructs a new key-value partition iterator.
*
* @throws IOException if a non-Alluxio error happens when getting the first key-value pair
* @throws AlluxioException if an Alluxio error happens when getting the first key-value pair
*/
public Iterator() throws IOException, AlluxioException {
mNextKey = nextKey(null);
}
@Override
public boolean hasNext() {
return mNextKey != null;
}
@Override
public KeyValuePair next() throws IOException, AlluxioException {
KeyValuePair ret = new KeyValuePair(mNextKey, get(mNextKey));
mNextKey = nextKey(mNextKey);
return ret;
}
private ByteBuffer nextKey(ByteBuffer key) throws IOException, AlluxioException {
List<ByteBuffer> nextKeys = mClient.getNextKeys(mBlockId, key, 1);
if (nextKeys.size() > 0) {
return nextKeys.get(0);
}
return null;
}
}
@Override
public KeyValueIterator iterator() throws IOException, AlluxioException {
return new Iterator();
}
@Override
public int size() throws IOException, AlluxioException {
return mClient.getSize(mBlockId);
}
}