Permalink
Browse files

Remote BlockFetchTracker trait

This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, I think it
makes sense to merge it into the BlockFetcherIterator trait.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #39 from kayousterhout/remove_tracker and squashes the following commits:

8173939 [Kay Ousterhout] Remote BlockFetchTracker.
  • Loading branch information...
1 parent 40e080a commit edf8a56ab7eaee1f7c3b4579eb10464984d31d7a @kayousterhout kayousterhout committed with pwendell Feb 28, 2014
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-private[spark] trait BlockFetchTracker {
- def totalBlocks : Int
- def numLocalBlocks: Int
- def numRemoteBlocks: Int
- def remoteFetchTime : Long
- def fetchWaitTime: Long
- def remoteBytesRead : Long
-}
@@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
*/
private[storage]
-trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
- with Logging with BlockFetchTracker {
+trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
+ def totalBlocks: Int
+ def numLocalBlocks: Int
+ def numRemoteBlocks: Int
+ def remoteFetchTime: Long
+ def fetchWaitTime: Long
+ def remoteBytesRead: Long
}
@@ -233,7 +238,16 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}
- //an iterator that will read fetched blocks off the queue as they arrive.
+ override def totalBlocks: Int = numLocal + numRemote
+ override def numLocalBlocks: Int = numLocal
+ override def numRemoteBlocks: Int = numRemote
+ override def remoteFetchTime: Long = _remoteFetchTime
+ override def fetchWaitTime: Long = _fetchWaitTime
+ override def remoteBytesRead: Long = _remoteBytesRead
+
+
+ // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
+ // as they arrive.
@volatile protected var resultsGotten = 0
override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
@@ -251,14 +265,6 @@ object BlockFetcherIterator {
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}
-
- // Implementing BlockFetchTracker trait.
- override def totalBlocks: Int = numLocal + numRemote
- override def numLocalBlocks: Int = numLocal
- override def numRemoteBlocks: Int = numRemote
- override def remoteFetchTime: Long = _remoteFetchTime
- override def fetchWaitTime: Long = _fetchWaitTime
- override def remoteBytesRead: Long = _remoteBytesRead
}
// End of BasicBlockFetcherIterator

0 comments on commit edf8a56

Please sign in to comment.