diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index f7be5f6b370ab..33588ef72ffbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -155,7 +155,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def fill(value: Double, cols: Seq[String]): DataFrame = { - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => // Only fill if the column is part of the cols list. if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) { @@ -182,7 +182,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def fill(value: String, cols: Seq[String]): DataFrame = { - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => // Only fill if the column is part of the cols list. if (f.dataType.isInstanceOf[StringType] && cols.exists(col => columnEquals(f.name, col))) { @@ -353,7 +353,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { case _: String => StringType } - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => val shouldReplace = cols.exists(colName => columnEquals(colName, f.name)) if (f.dataType.isInstanceOf[NumericType] && targetColumnType == DoubleType && shouldReplace) { @@ -382,7 +382,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { } } - val columnEquals = df.sqlContext.analyzer.resolver + val columnEquals = df.sqlContext.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => values.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) => v match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5e96b716ba73d..9951f0fabff15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -323,7 +323,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { */ private def normalize(columnName: String, columnType: String): String = { val validColumnNames = df.logicalPlan.output.map(_.name) - validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName)) + validColumnNames.find(df.sqlContext.sessionState.analyzer.resolver(_, columnName)) .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + s"existing columns (${validColumnNames.mkString(", ")})")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ef239a1e2f324..f7ef0de21c6c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -166,15 +166,16 @@ class Dataset[T] private[sql]( private implicit def classTag = unresolvedTEncoder.clsTag protected[sql] def resolve(colName: String): NamedExpression = { - queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse { - throw new AnalysisException( - s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") - } + queryExecution.analyzed.resolveQuoted(colName, sqlContext.sessionState.analyzer.resolver) + .getOrElse { + throw new AnalysisException( + s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""") + } } protected[sql] def numericColumns: Seq[Expression] = { schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n => - queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get + queryExecution.analyzed.resolveQuoted(n.name, sqlContext.sessionState.analyzer.resolver).get } } @@ -1400,7 +1401,7 @@ class Dataset[T] private[sql]( * @since 1.3.0 */ def withColumn(colName: String, col: Column): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { @@ -1421,7 +1422,7 @@ class Dataset[T] private[sql]( * Returns a new [[DataFrame]] by adding a column with metadata. */ private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { @@ -1445,7 +1446,7 @@ class Dataset[T] private[sql]( * @since 1.3.0 */ def withColumnRenamed(existingName: String, newName: String): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldRename = output.exists(f => resolver(f.name, existingName)) if (shouldRename) { @@ -1480,7 +1481,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val resolver = sqlContext.analyzer.resolver + val resolver = sqlContext.sessionState.analyzer.resolver val remainingCols = schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name)) if (remainingCols.size == this.schema.size) { @@ -1501,7 +1502,8 @@ class Dataset[T] private[sql]( def drop(col: Column): DataFrame = { val expression = col match { case Column(u: UnresolvedAttribute) => - queryExecution.analyzed.resolveQuoted(u.name, sqlContext.analyzer.resolver).getOrElse(u) + queryExecution.analyzed.resolveQuoted( + u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u) case Column(expr: Expression) => expr } val attrs = this.logicalPlan.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a8a75eaa431ac..177d78c4c08f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,6 @@ class SQLContext private[sql]( @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) protected[sql] def conf: SQLConf = sessionState.conf - protected[sql] def analyzer: Analyzer = sessionState.analyzer /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5b4254f741ab1..912b84abc187c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} */ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { - def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch { + def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch { case e: AnalysisException => val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } - lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) + lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical) lazy val withCachedData: LogicalPlan = { assertAnalyzed() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index ef95d5d28961f..84e98c0f9e0f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -67,7 +67,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val filterSet = ExpressionSet(filters) val partitionColumns = - AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver)) + AttributeSet( + l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)) val partitionKeyFilters = ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 73589add8d2b9..19c05f9cb0d9c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -205,7 +205,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) // Proceed with analysis. - analyzer.execute(logical) + sessionState.analyzer.execute(logical) } }