Skip to content

Commit

Permalink
HBASE-13926 Close the scanner only after Call#setResponse.
Browse files Browse the repository at this point in the history
  • Loading branch information
anoopsjohn committed Jun 21, 2015
1 parent 04c25e0 commit e4d8fab
Show file tree
Hide file tree
Showing 17 changed files with 277 additions and 27 deletions.
Expand Up @@ -293,6 +293,11 @@ public Cell getNextIndexedKey() {
public void close() {
this.delegate.close();
}

@Override
public void shipped() throws IOException {
this.delegate.shipped();
}
};
}

Expand Down
Expand Up @@ -1083,6 +1083,11 @@ public int compareKey(CellComparator comparator, Cell key) {
public void close() {
// HBASE-12295 will add code here.
}

@Override
public void shipped() throws IOException {
// HBASE-12295 will add code here.
}
}

public Path getPath() {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.Cell;

/**
Expand All @@ -37,7 +38,7 @@
* getValue.
*/
@InterfaceAudience.Private
public interface HFileScanner {
public interface HFileScanner extends Shipper {
/**
* SeekTo or just before the passed <code>cell</code>. Examine the return
* code to figure whether we found the cell or not.
Expand Down
Expand Up @@ -63,4 +63,12 @@ public interface RpcCallContext extends Delayable {
* @return the client version info, or null if the information is not present
*/
VersionInfo getClientVersionInfo();

/**
* Sets a callback which has to be executed at the end of this RPC call. Such a callback is an
* optional one for any Rpc call.
*
* @param callback
*/
void setCallBack(RpcCallback callback);
}
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* Denotes a callback action that has to be executed at the end of an Rpc Call.
*
* @see RpcCallContext#setCallBack(RpcCallback)
*/
@InterfaceAudience.Private
public interface RpcCallback {

/**
* Called at the end of an Rpc Call {@link RpcCallContext}
*/
void run() throws IOException;
}
Expand Up @@ -306,6 +306,7 @@ class Call implements RpcCallContext {

private User user;
private InetAddress remoteAddress;
private RpcCallback callback;

Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
Expand Down Expand Up @@ -431,6 +432,16 @@ protected synchronized void setResponse(Object m, final CellScanner cells,
LOG.warn("Exception while creating response " + e);
}
this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client.
if (this.callback != null) {
try {
this.callback.run();
} catch (Exception e) {
// Don't allow any exception here to kill this handler thread.
LOG.warn("Exception while running the Rpc Callback.", e);
}
}
}

private BufferChain wrapWithSasl(BufferChain bc)
Expand Down Expand Up @@ -553,6 +564,11 @@ public InetAddress getRemoteAddress() {
public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo();
}

@Override
public void setCallBack(RpcCallback callback) {
this.callback = callback;
}
}

/** Listens on the socket. Creates jobs for the handler threads*/
Expand Down
Expand Up @@ -5225,7 +5225,7 @@ public String toString() {
/**
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
*/
class RegionScannerImpl implements RegionScanner {
class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
// Package local for testability
KeyValueHeap storeHeap = null;
/** Heap of key-values that are not essential for the provided filters and are thus read
Expand Down Expand Up @@ -5830,6 +5830,23 @@ private void abortRegionServer(String msg) throws IOException {
}
throw new UnsupportedOperationException("not able to abort RS after: " + msg);
}

@Override
public void shipped() throws IOException {
if (storeHeap != null) {
storeHeap.shipped();
}
if (joinedHeap != null) {
joinedHeap.shipped();
}
}

@Override
public void run() throws IOException {
// This is the RPC callback method executed. We do the close in of the scanner in this
// callback
this.close();
}
}

// Utility methods
Expand Down
Expand Up @@ -406,4 +406,20 @@ public Cell getNextIndexedKey() {
// here we return the next index key from the top scanner
return current == null ? null : current.getNextIndexedKey();
}

@Override
public void shipped() throws IOException {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
this.current.shipped();
}
if (this.heap != null) {
for (KeyValueScanner scanner : this.heap) {
scanner.shipped();
}
}
}
}
Expand Up @@ -29,7 +29,7 @@
* Scanner that returns the next KeyValue.
*/
@InterfaceAudience.Private
public interface KeyValueScanner {
public interface KeyValueScanner extends Shipper {
/**
* Look at the next Cell in this scanner, but do not iterate scanner.
* @return the next Cell
Expand Down
Expand Up @@ -155,11 +155,14 @@ public void close() {
* @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations
* @return The lease created.
* @throws LeaseStillHeldException
*/
public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException {
addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
addLease(lease);
return lease;
}

/**
Expand Down
Expand Up @@ -71,4 +71,9 @@ public boolean isFileScanner() {
public Cell getNextIndexedKey() {
return null;
}

@Override
public void shipped() throws IOException {
// do nothing
}
}

0 comments on commit e4d8fab

Please sign in to comment.