Skip to content

Commit

Permalink
Support RowSet
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jul 8, 2020
1 parent 65f4d17 commit 934a9d3
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.operation

import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}

import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session

trait Operation {

def run(): Unit
def cancel(): Unit
def close(): Unit

def getProtocolVersion: TProtocolVersion
def getResultSetSchema: TTableSchema
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): TRowSet

def getSession: Session
def getHandle: OperationHandle
def getStatus: OperationStatus

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.operation

import org.apache.kyuubi.service.AbstractService

abstract class OperationManager(name: String) extends AbstractService(name) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.session

import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.config.KyuubiConf

trait Session {

def protocol: TProtocolVersion
def handle: SessionHandle
def conf: KyuubiConf
def user: String
def password: String
def createTime: Long
def lastAccessTime: Long
def sessionManager: SessionManager

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.session

import org.apache.kyuubi.service.CompositeService

trait SessionManager extends CompositeService {



}
7 changes: 7 additions & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<scope>${spark.scope}</scope>
</dependency>

<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.4.5</version>
<scope>${spark.scope}</scope>
</dependency>

<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions kyuubi-spark-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@
<version>3.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.schema

import java.nio.ByteBuffer

import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.hive.service.rpc.thrift._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._

object RowSet {

def toRowBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val tRows = rows.map { row =>
val tRow = new TRow()
(0 until row.length).map(i => toTColumnValue(i, row, schema)).foreach(tRow.addToColVals)
tRow
}.asJava
new TRowSet(0, tRows)
}

def toColumnBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val size = rows.length
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](size))
schema.zipWithIndex.foreach { case (filed, i) =>
val tColumn = toTColumn(rows, i, filed.dataType)
tRowSet.addToColumns(tColumn)
}
tRowSet
}

private def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn = {
val nulls = new java.util.BitSet()
typ match {
case BooleanType =>
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls)
TColumn.boolVal(new TBoolColumn(values, nulls))

case ByteType =>
val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls)
TColumn.byteVal(new TByteColumn(values, nulls))

case ShortType =>
val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls)
TColumn.i16Val(new TI16Column(values, nulls))

case IntegerType =>
val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls)
TColumn.i32Val(new TI32Column(values, nulls))

case LongType =>
val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls)
TColumn.i64Val(new TI64Column(values, nulls))

case FloatType =>
val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls)
.asScala
.map {
case null => null
case o => java.lang.Double.valueOf(o.toDouble)
}.asJava
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case DoubleType =>
val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls)
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case StringType =>
val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))

case BinaryType =>
val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
.asScala
.map(ByteBuffer.wrap)
.asJava
TColumn.binaryVal(new TBinaryColumn(values, nulls))

case _ =>
val values = rows.zipWithIndex.toList.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) {
""
} else {
HiveResult.toHiveString((row.get(ordinal), typ))
}
}.asJava
TColumn.stringVal(new TStringColumn(values, nulls))
}
}

private def getOrSetAsNull[T](
rows: Seq[Row],
ordinal: Int,
nulls: java.util.BitSet,
defaultVal: T = null): java.util.List[T] = {
val size = rows.length
val ret = new java.util.ArrayList[T](size)
var idx = 0
while (idx < size) {
val row = rows(idx)
val isNull = row.isNullAt(ordinal)
if (isNull) {
nulls.set(idx, true)
ret.add(idx, defaultVal)
} else {
ret.add(idx, row.getAs[T](ordinal))
}
idx += 1
}
ret
}

implicit private def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
ByteBuffer.wrap(bitSet.toByteArray)
}

private[this] def toTColumnValue(ordinal: Int, row: Row, types: StructType): TColumnValue = {
types(ordinal).dataType match {
case BooleanType =>
val boolValue = new TBoolValue
if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal))
TColumnValue.boolVal(boolValue)

case ByteType =>
val byteValue = new TByteValue
if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal))
TColumnValue.byteVal(byteValue)

case ShortType =>
val tI16Value = new TI16Value
if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal))
TColumnValue.i16Val(tI16Value)

case IntegerType =>
val tI32Value = new TI32Value
if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal))
TColumnValue.i32Val(tI32Value)

case LongType =>
val tI64Value = new TI64Value
if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal))
TColumnValue.i64Val(tI64Value)

case FloatType =>
val tDoubleValue = new TDoubleValue
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getFloat(ordinal))
TColumnValue.doubleVal(tDoubleValue)

case DoubleType =>
val tDoubleValue = new TDoubleValue
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal))
TColumnValue.doubleVal(tDoubleValue)

case StringType =>
val tStringValue = new TStringValue
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal))
TColumnValue.stringVal(tStringValue)

case _ =>
val tStrValue = new TStringValue
if (!row.isNullAt(ordinal)) {
tStrValue.setValue(
HiveResult.toHiveString((row.get(ordinal), types(ordinal).dataType)))
}
TColumnValue.stringVal(tStrValue)
}
}
}

0 comments on commit 934a9d3

Please sign in to comment.