Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
0644e94
Only hadoop-3.1 upgrades built-in Hive to 2.3.4
wangyum Feb 14, 2019
3c0c72e
Fix test failed:
wangyum Feb 14, 2019
fc10762
Fix checkstyle error:
wangyum Feb 15, 2019
bb84f79
Upgrade hadoop to 3.2.0 fix test error
wangyum Feb 15, 2019
32ebcad
Fix Hadoop-2.7.x test error
wangyum Feb 16, 2019
a9f4344
Add new hive-thriftserver to support Hadoop-3.1
wangyum Feb 17, 2019
feb1ee7
Fix CliSuite test error
wangyum Feb 18, 2019
e734373
Clean some dependency
wangyum Feb 19, 2019
e39cc2d
Fix mvn install error
wangyum Feb 25, 2019
5fbfdaa
Release Kryo
wangyum Feb 28, 2019
bf47619
Merge remote-tracking branch 'upstream/master' into hadoop-3.2
wangyum Mar 10, 2019
71421d6
Merge master branch and try to test Hadoop-3.1 on jenkins
wangyum Mar 10, 2019
19d52e7
Use hadoop-3.1 checkstyle
wangyum Mar 11, 2019
3b2e454
Error testing can be passed on my machine.
wangyum Mar 12, 2019
4c1ec25
Workaround HADOOP-16180
wangyum Mar 12, 2019
d38c542
Rename hive-thriftserver2 to hive-thriftserverV2
wangyum Mar 13, 2019
d20a215
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Mar 19, 2019
3628057
Fix error
wangyum Mar 20, 2019
b7b5853
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Mar 22, 2019
94fd638
Verify SPARK-27177
wangyum Mar 22, 2019
3f11dbe
Make change smaller
wangyum Mar 23, 2019
0352585
Merge branch 'master' into hadoop-3.1-on-jenkins
wangyum Mar 29, 2019
d41b2cc
Make hive-llap-client scope to test.
wangyum Apr 5, 2019
de55cfd
update depts
wangyum Apr 5, 2019
b5813ec
This commit to test do we really need org.apache.logging.log4j:log4j-…
wangyum Apr 7, 2019
2898cb9
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Apr 7, 2019
1762f5c
improve test
wangyum Apr 7, 2019
7206a0b
Update pom.xml based on review
wangyum Apr 7, 2019
f4c4d48
Update depts
wangyum Apr 7, 2019
fea81ac
hive.parquet.version -> 1.8.1
wangyum Apr 8, 2019
fc7888d
isHive2 -> isHive23
wangyum Apr 8, 2019
5e5ffe6
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Apr 8, 2019
3357d42
Workaround `[NOT FOUND ] commons-httpclient#commons-httpclient;3.0.1…
wangyum Apr 8, 2019
f15f6d3
Remove hive-thriftserverV2
wangyum Apr 9, 2019
0c3f962
Fix hive-thriftserver with Hive 2.3.4 test issue.
wangyum Apr 10, 2019
9905ee2
Upgrade Hive Parquet to 1.10.1:
wangyum Apr 10, 2019
fecda5f
Remove duplicate code
wangyum Apr 12, 2019
5f2faf6
Add duplicate code
wangyum Apr 13, 2019
fdaf4bc
shade parquet
wangyum Apr 13, 2019
7d7848d
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Apr 22, 2019
2a06356
Revert shade parquet
wangyum Apr 22, 2019
8303bba
Fix ERROR XSLAN:
wangyum Apr 22, 2019
b1e025f
This commit to test revert the exclusion of commons-httpclient
wangyum Apr 22, 2019
aa09632
This commit to test revert the exclusion of commons-httpclient:
wangyum Apr 22, 2019
fffd2b7
This commit to test add commons-httpclient:commons-httpclient:3.1 to …
wangyum Apr 22, 2019
5b7b704
This commit to test commons-httpclient with hadoop-2.7
wangyum Apr 23, 2019
12e06d7
Revert httpclient to 3.0.1 to verify download it on our jenkins
wangyum Apr 23, 2019
54502d9
Revert httpclient to 3.0.1 to verify download it on our jenkins
wangyum Apr 24, 2019
c95cfbe
Merge remote-tracking branch 'upstream/master' into hadoop-3.1-on-jen…
wangyum Apr 24, 2019
09d749d
Revert httpclient related changes
wangyum Apr 24, 2019
0e148c5
Update deps
wangyum Apr 24, 2019
bdc1d2c
Add derby to SharedClass
wangyum Apr 26, 2019
26f7e89
Test Hive 2.3.5-SNAPSHOT
wangyum Apr 27, 2019
1158349
revert hive to 2.3.4
wangyum Apr 30, 2019
f1e550c
Verify Hive 2.3.5-SNAPSHOT (HIVE-21680 Backport HIVE-17644 to branch-…
wangyum May 7, 2019
c28fc2d
Upgrade Hive to 2.3.5
wangyum May 17, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
<hive.classifier></hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark2</hive.version>
<hive23.version>2.3.4</hive23.version>
<hive23.version>2.3.5</hive23.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
// if (isUsingHiveMetastore) {
// assert(storageFormat.properties.get("path") === expected)
// }
assert(storageFormat.locationUri === Some(expected))
assert(Some(storageFormat.locationUri.get.getPath) === Some(expected.getPath))
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}

protected def testSelectiveDictionaryEncoding(isSelective: Boolean) {
protected def testSelectiveDictionaryEncoding(isSelective: Boolean, isHive2: Boolean = false) {
val tableName = "orcTable"

withTempDir { dir =>
Expand Down Expand Up @@ -171,7 +171,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
// Hive 0.11 and RLE v2 is introduced in Hive 0.12 ORC with more improvements.
// For more details, see https://orc.apache.org/specification/
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
if (isSelective) {
if (isSelective || isHive2) {
assert(stripe.getColumns(2).getKind === DIRECT_V2)
} else {
assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
Expand Down
4 changes: 3 additions & 1 deletion sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
</goals>
<configuration>
<sources>
<source>src/gen/</source>
<source>v${hive.version.short}/src/gen/java</source>
<source>v${hive.version.short}/src/main/java</source>
<source>v${hive.version.short}/src/main/scala</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ private[hive] class SparkExecuteStatementOperation(
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
val resultRowSet: RowSet =
ThriftserverShimUtils.resultRowSet(getResultSetSchema, getProtocolVersion)

// Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils

/**
* Spark's own GetTablesOperation
Expand Down Expand Up @@ -83,7 +84,12 @@ private[hive] class SparkGetTablesOperation(
catalogTable.identifier.table,
tableType,
catalogTable.comment.getOrElse(""))
rowSet.addRow(rowData)
// Since HIVE-7575(Hive 2.0.0), adds 5 additional columns to the ResultSet of GetTables.
if (HiveUtils.isHive23) {
rowSet.addRow(rowData ++ Array(null, null, null, null, null))
} else {
rowSet.addRow(rowData)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import jline.console.ConsoleReader
import jline.console.history.FileHistory
import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
import org.apache.hadoop.hive.common.HiveInterruptUtils
Expand Down Expand Up @@ -297,9 +296,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val sessionState = SessionState.get().asInstanceOf[CliSessionState]

private val LOG = LogFactory.getLog(classOf[SparkSQLCLIDriver])

private val console = new SessionState.LogHelper(LOG)
private val console = ThriftserverShimUtils.getConsole

private val isRemoteMode = {
SparkSQLCLIDriver.isRemoteMode(sessionState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.server.HiveServer2
import org.slf4j.Logger

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._

private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext)
Expand Down Expand Up @@ -112,6 +114,10 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(s"Service: $getName is inited.")
} else {
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.SessionHandle
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2

import org.apache.spark.sql.SQLContext
Expand All @@ -45,7 +44,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
}

override def openSession(
protocol: TProtocolVersion,
protocol: ThriftserverShimUtils.TProtocolVersion,
username: String,
passwd: String,
ipAddress: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}

Expand Down Expand Up @@ -200,10 +201,13 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
}

test("Commands using SerDe provided in --jars") {
val jarName = if (HiveUtils.isHive23) {
"hive-hcatalog-core-2.3.4.jar"
} else {
"hive-hcatalog-core-0.13.1.jar"
}
val jarFile =
"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
.split("/")
.mkString(File.separator)
s"../hive/src/test/resources/$jarName".split("/").mkString(File.separator)

val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType}
import org.apache.hive.service.cli.thrift.TCLIService.Client
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
Expand Down Expand Up @@ -65,7 +64,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val user = System.getProperty("user.name")
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
val protocol = new TBinaryProtocol(transport)
val client = new ThriftCLIServiceClient(new Client(protocol))
val client = new ThriftCLIServiceClient(new ThriftserverShimUtils.Client(protocol))

transport.open()
try f(client) finally transport.close()
Expand Down Expand Up @@ -484,10 +483,13 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
withMultipleConnectionJdbcStatement("smallKV", "addJar")(
{
statement =>
val jarName = if (HiveUtils.isHive23) {
"hive-hcatalog-core-2.3.4.jar"
} else {
"hive-hcatalog-core-0.13.1.jar"
}
val jarFile =
"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
.split("/")
.mkString(File.separator)
s"../hive/src/test/resources/$jarName".split("/").mkString(File.separator)

statement.executeQuery(s"ADD JAR $jarFile")
},
Expand Down Expand Up @@ -538,7 +540,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
conf += resultSet.getString(1) -> resultSet.getString(2)
}

assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
if (HiveUtils.isHive23) {
assert(conf.get("spark.sql.hive.version") === Some("2.3.4"))
} else {
assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
}
}
}

Expand All @@ -551,7 +557,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
conf += resultSet.getString(1) -> resultSet.getString(2)
}

assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
if (HiveUtils.isHive23) {
assert(conf.get("spark.sql.hive.version") === Some("2.3.4"))
} else {
assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
}
}
}

Expand Down Expand Up @@ -629,7 +639,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val sessionHandle = client.openSession(user, "")
val sessionID = sessionHandle.getSessionId

assert(pipeoutFileList(sessionID).length == 1)
if (HiveUtils.isHive23) {
assert(pipeoutFileList(sessionID).length == 2)
} else {
assert(pipeoutFileList(sessionID).length == 1)
}

client.closeSession(sessionHandle)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite {
val tableSchema = StructType(Seq(field1, field2))
val columns = SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors()
assert(columns.size() == 2)
assert(columns.get(0).getType() == org.apache.hive.service.cli.Type.NULL_TYPE)
assert(columns.get(1).getType() == org.apache.hive.service.cli.Type.NULL_TYPE)
assert(columns.get(0).getType().getName == "VOID")
assert(columns.get(1).getType().getName == "VOID")
}

test("SPARK-20146 Comment should be preserved") {
Expand All @@ -37,9 +37,9 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite {
val tableSchema = StructType(Seq(field1, field2))
val columns = SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors()
assert(columns.size() == 2)
assert(columns.get(0).getType() == org.apache.hive.service.cli.Type.STRING_TYPE)
assert(columns.get(0).getType().getName == "STRING")
assert(columns.get(0).getComment() == "comment 1")
assert(columns.get(1).getType() == org.apache.hive.service.cli.Type.INT_TYPE)
assert(columns.get(1).getType().getName == "INT")
assert(columns.get(1).getComment() == "")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.{Arrays => JArrays, List => JList, Properties}

import org.apache.hive.jdbc.{HiveConnection, HiveQueryResultSet, Utils => JdbcUtils}
import org.apache.hive.jdbc.{HiveConnection, HiveQueryResultSet}
import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket

Expand All @@ -37,13 +36,13 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties)
val user = System.getProperty("user.name")
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
val client = new TCLIService.Client(new TBinaryProtocol(transport))
val client = new ThriftserverShimUtils.Client(new TBinaryProtocol(transport))
transport.open()
var rs: HiveQueryResultSet = null
try {
val openResp = client.OpenSession(new TOpenSessionReq)
val openResp = client.OpenSession(new ThriftserverShimUtils.TOpenSessionReq)
val sessHandle = openResp.getSessionHandle
val schemaReq = new TGetSchemasReq(sessHandle)
val schemaReq = new ThriftserverShimUtils.TGetSchemasReq(sessHandle)

if (catalog != null) {
schemaReq.setCatalogName(catalog)
Expand All @@ -55,13 +54,10 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
schemaReq.setSchemaName(schemaPattern)
}

val schemaResp = client.GetSchemas(schemaReq)
JdbcUtils.verifySuccess(schemaResp.getStatus)

rs = new HiveQueryResultSet.Builder(connection)
.setClient(client)
.setSessionHandle(sessHandle)
.setStmtHandle(schemaResp.getOperationHandle)
.setStmtHandle(client.GetSchemas(schemaReq).getOperationHandle)
.build()
f(rs)
} finally {
Expand Down Expand Up @@ -110,28 +106,24 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties)
val user = System.getProperty("user.name")
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
val client = new TCLIService.Client(new TBinaryProtocol(transport))
val client = new ThriftserverShimUtils.Client(new TBinaryProtocol(transport))
transport.open()

var rs: HiveQueryResultSet = null

try {
val openResp = client.OpenSession(new TOpenSessionReq)
val openResp = client.OpenSession(new ThriftserverShimUtils.TOpenSessionReq)
val sessHandle = openResp.getSessionHandle

val getTableReq = new TGetTablesReq(sessHandle)
val getTableReq = new ThriftserverShimUtils.TGetTablesReq(sessHandle)
getTableReq.setSchemaName(schema)
getTableReq.setTableName(tableNamePattern)
getTableReq.setTableTypes(tableTypes)

val getTableResp = client.GetTables(getTableReq)

JdbcUtils.verifySuccess(getTableResp.getStatus)

rs = new HiveQueryResultSet.Builder(connection)
.setClient(client)
.setSessionHandle(sessHandle)
.setStmtHandle(getTableResp.getOperationHandle)
.setStmtHandle(client.GetTables(getTableReq).getOperationHandle)
.build()

f(rs)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema}

/**
* Various utilities for hive-thriftserver used to upgrade the built-in Hive.
*/
private[thriftserver] object ThriftserverShimUtils {

private[thriftserver] type TProtocolVersion = org.apache.hive.service.cli.thrift.TProtocolVersion
private[thriftserver] type Client = org.apache.hive.service.cli.thrift.TCLIService.Client
private[thriftserver] type TOpenSessionReq = org.apache.hive.service.cli.thrift.TOpenSessionReq
private[thriftserver] type TGetSchemasReq = org.apache.hive.service.cli.thrift.TGetSchemasReq
private[thriftserver] type TGetTablesReq = org.apache.hive.service.cli.thrift.TGetTablesReq

private[thriftserver] def getConsole: SessionState.LogHelper = {
val LOG = LogFactory.getLog(classOf[SparkSQLCLIDriver])
new SessionState.LogHelper(LOG)
}

private[thriftserver] def resultRowSet(
getResultSetSchema: TableSchema,
getProtocolVersion: TProtocolVersion): RowSet = {
RowSetFactory.create(getResultSetSchema, getProtocolVersion)
}

}
Loading