Skip to content

Commit

Permalink
[SPARK-31088][SQL] Add back HiveContext and createExternalTable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small.

- HiveContext
- createExternalTable APIs

### Why are the changes needed?

Avoid breaking the APIs that are commonly used.

### Does this PR introduce any user-facing change?
Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released.

### How was this patch tested?

add a new test suite for createExternalTable APIs.

Closes #27815 from gatorsmile/addAPIsBack.

Lead-authored-by: gatorsmile <gatorsmile@gmail.com>
Co-authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
gatorsmile and Ngone51 committed Mar 27, 2020
1 parent b7e4cc7 commit b9eafcb
Show file tree
Hide file tree
Showing 11 changed files with 532 additions and 11 deletions.
4 changes: 0 additions & 4 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,6 @@ license: |

### Others

- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.

- In Spark 3.0, the deprecated `HiveContext` class has been removed. Use `SparkSession.builder.enableHiveSupport()` instead.

- In Spark version 2.4, when a spark session is created via `cloneSession()`, the newly created spark session inherits its configuration from its parent `SparkContext` even though the same configuration may exist with a different value in its parent spark session. Since Spark 3.0, the configurations of a parent `SparkSession` have a higher precedence over the parent `SparkContext`. The old behavior can be restored by setting `spark.sql.legacy.sessionInitWithConfigDefaults` to `true`.

- Since Spark 3.0, if `hive.default.fileformat` is not found in `Spark SQL configuration` then it will fallback to hive-site.xml present in the `Hadoop configuration` of `SparkContext`.
Expand Down
2 changes: 0 additions & 2 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"),

// [SPARK-28980][SQL][CORE][MLLIB] Remove more old deprecated items in Spark 3
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.createExternalTable"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createExternalTable"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.classification.LogisticRegressionWithSGD$"),
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def wrapper(self, *args, **kwargs):


# for back compatibility
from pyspark.sql import SQLContext, Row
from pyspark.sql import SQLContext, HiveContext, Row

__all__ = [
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@


from pyspark.sql.types import Row
from pyspark.sql.context import SQLContext, UDFRegistration
from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration
from pyspark.sql.session import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.catalog import Catalog
Expand All @@ -55,7 +55,7 @@


__all__ = [
'SparkSession', 'SQLContext', 'UDFRegistration',
'SparkSession', 'SQLContext', 'HiveContext', 'UDFRegistration',
'DataFrame', 'GroupedData', 'Column', 'Catalog', 'Row',
'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
'DataFrameReader', 'DataFrameWriter', 'PandasCogroupedOps'
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,26 @@ def listColumns(self, tableName, dbName=None):
isBucket=jcolumn.isBucket()))
return columns

@since(2.0)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
"""
warnings.warn(
"createExternalTable is deprecated since Spark 2.2, please use createTable instead.",
DeprecationWarning)
return self.createTable(tableName, path, source, schema, **options)

@since(2.2)
def createTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates a table based on the dataset in a data source.
Expand Down
67 changes: 66 additions & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pyspark.sql.udf import UDFRegistration
from pyspark.sql.utils import install_exception_handler

__all__ = ["SQLContext"]
__all__ = ["SQLContext", "HiveContext"]


class SQLContext(object):
Expand Down Expand Up @@ -340,6 +340,24 @@ def dropTempTable(self, tableName):
"""
self.sparkSession.catalog.dropTempView(tableName)

@since(1.3)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
"""
return self.sparkSession.catalog.createExternalTable(
tableName, path, source, schema, **options)

@ignore_unicode_prefix
@since(1.0)
def sql(self, sqlQuery):
Expand Down Expand Up @@ -463,6 +481,53 @@ def streams(self):
return StreamingQueryManager(self._ssql_ctx.streams())


class HiveContext(SQLContext):
"""A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from ``hive-site.xml`` on the classpath.
It supports running both SQL and HiveQL commands.
:param sparkContext: The SparkContext to wrap.
:param jhiveContext: An optional JVM Scala HiveContext. If set, we do not instantiate a new
:class:`HiveContext` in the JVM, instead we make all calls to this object.
.. note:: Deprecated in 2.0.0. Use SparkSession.builder.enableHiveSupport().getOrCreate().
"""

def __init__(self, sparkContext, jhiveContext=None):
warnings.warn(
"HiveContext is deprecated in Spark 2.0.0. Please use " +
"SparkSession.builder.enableHiveSupport().getOrCreate() instead.",
DeprecationWarning)
if jhiveContext is None:
sparkContext._conf.set("spark.sql.catalogImplementation", "hive")
sparkSession = SparkSession.builder._sparkContext(sparkContext).getOrCreate()
else:
sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession())
SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext)

@classmethod
def _createForTesting(cls, sparkContext):
"""(Internal use only) Create a new HiveContext for testing.
All test code that touches HiveContext *must* go through this method. Otherwise,
you may end up launching multiple derby instances and encounter with incredibly
confusing error messages.
"""
jsc = sparkContext._jsc.sc()
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
return cls(sparkContext, jtestHive)

def refreshTable(self, tableName):
"""Invalidate and refresh all the cached the metadata of the given
table. For performance reasons, Spark SQL or the external data source
library it uses might cache certain metadata about a table, such as the
location of blocks. When those change outside of Spark SQL, users should
call this function to invalidate the cache.
"""
self._ssql_ctx.refreshTable(tableName)


def _test():
import os
import doctest
Expand Down
91 changes: 91 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,97 @@ class SQLContext private[sql](val sparkSession: SparkSession)
def readStream: DataStreamReader = sparkSession.readStream


/**
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String): DataFrame = {
sparkSession.catalog.createTable(tableName, path)
}

/**
* Creates an external table from the given path based on a data source
* and returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
path: String,
source: String): DataFrame = {
sparkSession.catalog.createTable(tableName, path, source)
}

/**
* Creates an external table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
sparkSession.catalog.createTable(tableName, source, options)
}

/**
* (Scala-specific)
* Creates an external table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
sparkSession.catalog.createTable(tableName, source, options)
}

/**
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
sparkSession.catalog.createTable(tableName, source, schema, options)
}

/**
* (Scala-specific)
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
sparkSession.catalog.createTable(tableName, source, schema, options)
}

/**
* Registers the given `DataFrame` as a temporary table in the catalog. Temporary tables exist
* only during the lifetime of this instance of SQLContext.
Expand Down
Loading

0 comments on commit b9eafcb

Please sign in to comment.