Skip to content

Commit

Permalink
also remove analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Mar 15, 2016
1 parent c5fd485 commit 49197aa
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* @since 1.3.1
*/
def fill(value: Double, cols: Seq[String]): DataFrame = {
val columnEquals = df.sqlContext.analyzer.resolver
val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
// Only fill if the column is part of the cols list.
if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) {
Expand All @@ -182,7 +182,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* @since 1.3.1
*/
def fill(value: String, cols: Seq[String]): DataFrame = {
val columnEquals = df.sqlContext.analyzer.resolver
val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
// Only fill if the column is part of the cols list.
if (f.dataType.isInstanceOf[StringType] && cols.exists(col => columnEquals(f.name, col))) {
Expand Down Expand Up @@ -353,7 +353,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
case _: String => StringType
}

val columnEquals = df.sqlContext.analyzer.resolver
val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
val shouldReplace = cols.exists(colName => columnEquals(colName, f.name))
if (f.dataType.isInstanceOf[NumericType] && targetColumnType == DoubleType && shouldReplace) {
Expand Down Expand Up @@ -382,7 +382,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
}
}

val columnEquals = df.sqlContext.analyzer.resolver
val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
values.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
v match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
private def normalize(columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
validColumnNames.find(df.sqlContext.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
}
Expand Down
22 changes: 12 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ class Dataset[T] private[sql](
private implicit def classTag = unresolvedTEncoder.clsTag

protected[sql] def resolve(colName: String): NamedExpression = {
queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
queryExecution.analyzed.resolveQuoted(colName, sqlContext.sessionState.analyzer.resolver)
.getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
}

protected[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
queryExecution.analyzed.resolveQuoted(n.name, sqlContext.sessionState.analyzer.resolver).get
}
}

Expand Down Expand Up @@ -1400,7 +1401,7 @@ class Dataset[T] private[sql](
* @since 1.3.0
*/
def withColumn(colName: String, col: Column): DataFrame = {
val resolver = sqlContext.analyzer.resolver
val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
if (shouldReplace) {
Expand All @@ -1421,7 +1422,7 @@ class Dataset[T] private[sql](
* Returns a new [[DataFrame]] by adding a column with metadata.
*/
private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
val resolver = sqlContext.analyzer.resolver
val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
if (shouldReplace) {
Expand All @@ -1445,7 +1446,7 @@ class Dataset[T] private[sql](
* @since 1.3.0
*/
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
val resolver = sqlContext.analyzer.resolver
val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldRename = output.exists(f => resolver(f.name, existingName))
if (shouldRename) {
Expand Down Expand Up @@ -1480,7 +1481,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def drop(colNames: String*): DataFrame = {
val resolver = sqlContext.analyzer.resolver
val resolver = sqlContext.sessionState.analyzer.resolver
val remainingCols =
schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name))
if (remainingCols.size == this.schema.size) {
Expand All @@ -1501,7 +1502,8 @@ class Dataset[T] private[sql](
def drop(col: Column): DataFrame = {
val expression = col match {
case Column(u: UnresolvedAttribute) =>
queryExecution.analyzed.resolveQuoted(u.name, sqlContext.analyzer.resolver).getOrElse(u)
queryExecution.analyzed.resolveQuoted(
u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u)
case Column(expr: Expression) => expr
}
val attrs = this.logicalPlan.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class SQLContext private[sql](
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def analyzer: Analyzer = sessionState.analyzer

/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
*/
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {

def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch {
def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}

lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical)

lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters)

val partitionColumns =
AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver))
AttributeSet(
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
// Proceed with analysis.
analyzer.execute(logical)
sessionState.analyzer.execute(logical)
}
}

Expand Down

0 comments on commit 49197aa

Please sign in to comment.