Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1 #9

Closed
Zhangshunyu opened this issue Sep 10, 2015 · 0 comments
Closed

1 #9

Zhangshunyu opened this issue Sep 10, 2015 · 0 comments
Assignees

Comments

@Zhangshunyu
Copy link

//变化原因在于0.98和1.0.0接口变化,如下:
//===========================
//第一:社区0.98代码地址:
//https://github.com/apache/hbase/blob/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区0.98InternalScanner.java下boolean next(List result, int limit) throws IOException;
//社区0.98RegionScanner.java下:没有int getBatch(),且boolean nextRaw(List result, int limit) throws IOException;
//===========================

//===========================
//第三:社区branch1代码地址:(master代码也一样)
//https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区branch1中InternalScanner.java下:boolean next(List result, ScannerContext scannerContext) throws IOException;
//社区branch1中RegionScanner.java下:int getBatch()以及boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
//=============================

package org.apache.spark.sql.hbase

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.coprocessor._
import org.apache.hadoop.hbase.regionserver._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.hbase.util.DataTypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}

/**

  • HBaseCoprocessorSQLReaderRDD:
    */
    class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation,
    val codegenEnabled: Boolean,
    var finalOutput: Seq[Attribute],
    var otherFilters: Option[Expression],
    @transient sqlContext: SQLContext)
    extends RDD[Row](sqlContext.sparkContext, Nil) with Logging {

    @transient var scanner: RegionScanner = _

    private def createIterator(context: TaskContext): Iterator[Row] = {
    val otherFilter: (Row) => Boolean = {
    if (otherFilters.isDefined) {
    if (codegenEnabled) {
    GeneratePredicate.generate(otherFilters.get, finalOutput)
    } else {
    InterpretedPredicate.create(otherFilters.get, finalOutput)
    }
    } else null
    }

    val projections = finalOutput.zipWithIndex
    var finished: Boolean = false
    var gotNext: Boolean = false
    val results: java.util.ArrayList[Cell] = new java.util.ArrayListCell
    val row = new GenericMutableRow(finalOutput.size)

    val iterator = new Iterator[Row] {
    override def hasNext: Boolean = {
    if (!finished) {
    if (!gotNext) {
    results.clear()
    scanner.nextRaw(results)
    finished = results.isEmpty
    gotNext = true
    }
    }
    if (finished) {
    close()
    }
    !finished
    }

    override def next(): Row = {
      if (hasNext) {
        gotNext = false
        relation.buildRowInCoprocessor(projections, results, row)
      } else {
        null
      }
    }
    
    def close() = {
      try {
        scanner.close()
        relation.closeHTable()
      } catch {
        case e: Exception => logWarning("Exception in scanner.close", e)
      }
    }
    

    }

    if (otherFilter == null) {
    new InterruptibleIterator(context, iterator)
    } else {
    new InterruptibleIterator(context, iterator.filter(otherFilter))
    }
    }

    override def getPartitions: Array[Partition] = {
    Array()
    }

    override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
    scanner = split.asInstanceOf[HBasePartition].newScanner
    createIterator(context)
    }
    }

abstract class BaseRegionScanner extends RegionScanner {
override def getBatch={0} //实现这个从接口中继承的函数
//新版本的hbase中在RegionScanner中添加了一个这样一个函数:int getBatch();
//但是这个函数在Astro继承过来之后没有用,而实例化后面的类的时候又不得不实现继承过来的函数
//所以仅是实现它防止编译报错,在Astro中并没有作用。
override def isFilterDone = false

override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)//这里把limit: Int=>scannerContext: ScannerContext
//next函数上层继承自hbase的RegionScanner,再上层继承自InternalScanner
//在0.98版本中next函数boolean next(List result, int limit) throws IOException;
//在新版本中boolean next(List result, ScannerContext scannerContext) throws IOException;

override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported")

override def getMvccReadPoint = Long.MaxValue

override def nextRaw(result: java.util.List[Cell]) = next(result)

override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext)//这里把limit: Int=>scannerContext: ScannerContext
//这里对比两个版本HBase中RegionScanner的区别:
//0.98版HBase这里定义为boolean nextRaw(List result, int limit) throws IOException;
//而新版这里定义为 boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
}

class SparkSqlRegionObserver extends BaseRegionObserver {
lazy val logger = Logger.getLogger(getClass.getName)
lazy val EmptyArray = ArrayByte

override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment],
scan: Scan,
s: RegionScanner) = {
val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX)
if (serializedPartitionIndex == null) {
logger.debug("Work without coprocessor")
super.postScannerOpen(e, scan, s)
} else {
logger.debug("Work with coprocessor")
val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex)
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE)
val outputDataType: Seq[DataType] =
HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]]

  val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY)
  val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]]

  val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK)
  val (stageId, partitionId, taskAttemptId, attemptNumber) =
    HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)]
  val taskContext = new TaskContextImpl(
    stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics)

  val regionInfo = s.getRegionInfo
  val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey)
  val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey)

  val result = subPlanRDD.compute(
    new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s),
    taskContext)

  new BaseRegionScanner() {
    override def getRegionInfo: HRegionInfo = regionInfo

    override def getMaxResultSize: Long = s.getMaxResultSize

    override def close(): Unit = s.close()

    override def next(results: java.util.List[Cell]): Boolean = {
      val hasMore: Boolean = result.hasNext
      if (hasMore) {
        val nextRow = result.next()
        val numOfCells = outputDataType.length
        for (i <- 0 until numOfCells) {
          val data = nextRow(i)
          val dataType = outputDataType(i)
          val dataOfBytes: HBaseRawType = {
            if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
          }
          results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
        }
      }
      hasMore
    }
  }
}

}
}

@xinyunh xinyunh self-assigned this Sep 10, 2015
@xinyunh xinyunh closed this as completed Sep 10, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants