Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-17822
Browse files Browse the repository at this point in the history
  • Loading branch information
techaddict committed Oct 20, 2016
2 parents 2e9298a + e895bc2 commit 635424f
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 67 deletions.
1 change: 1 addition & 0 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
54 changes: 32 additions & 22 deletions sbin/spark-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# SPARK_PID_DIR The pid files are stored. /tmp by default.
# SPARK_IDENT_STRING A string representing this instance of spark. $USER by default
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
# SPARK_NO_DAEMONIZE If set, will run the proposed command in the foreground. It will not output a PID file.
##

usage="Usage: spark-daemon.sh [--config <conf-dir>] (start|stop|submit|status) <spark-command> <spark-instance-number> <args...>"
Expand Down Expand Up @@ -122,6 +123,35 @@ if [ "$SPARK_NICENESS" = "" ]; then
export SPARK_NICENESS=0
fi

execute_command() {
local command="$@"
if [ -z ${SPARK_NO_DAEMONIZE+set} ]; then
nohup -- $command >> $log 2>&1 < /dev/null &
newpid="$!"

echo "$newpid" > "$pid"

# Poll for up to 5 seconds for the java process to start
for i in {1..10}
do
if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
break
fi
sleep 0.5
done

sleep 2
# Check if the process has died; in that case we'll tail the log so the user can see
if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
echo "failed to launch $command:"
tail -2 "$log" | sed 's/^/ /'
echo "full log in $log"
fi
else
$command
fi
}

run_command() {
mode="$1"
shift
Expand All @@ -146,13 +176,11 @@ run_command() {

case "$mode" in
(class)
nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
newpid="$!"
execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command $@
;;

(submit)
nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null &
newpid="$!"
execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class $command $@
;;

(*)
Expand All @@ -161,24 +189,6 @@ run_command() {
;;
esac

echo "$newpid" > "$pid"

#Poll for up to 5 seconds for the java process to start
for i in {1..10}
do
if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
break
fi
sleep 0.5
done

sleep 2
# Check if the process has died; in that case we'll tail the log so the user can see
if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
echo "failed to launch $command:"
tail -2 "$log" | sed 's/^/ /'
echo "full log in $log"
fi
}

case $option in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* }}}
*/
override def visitShowColumns(ctx: ShowColumnsContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier)

val lookupTable = Option(ctx.db) match {
case None => table
case Some(db) if table.database.exists(_ != db) =>
operationNotAllowed(
s"SHOW COLUMNS with conflicting databases: '$db' != '${table.database.get}'",
ctx)
case Some(db) => TableIdentifier(table.identifier, Some(db.getText))
}
ShowColumnsCommand(lookupTable)
ShowColumnsCommand(Option(ctx.db).map(_.getText), visitTableIdentifier(ctx.tableIdentifier))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command

import java.io.File
import java.net.URI
import java.nio.file.FileSystems
import java.util.Date

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -245,7 +246,27 @@ case class LoadDataCommand(
val loadPath =
if (isLocal) {
val uri = Utils.resolveURI(path)
if (!new File(uri.getPath()).exists()) {
val filePath = uri.getPath()
val exists = if (filePath.contains("*")) {
val fileSystem = FileSystems.getDefault
val pathPattern = fileSystem.getPath(filePath)
val dir = pathPattern.getParent.toString
if (dir.contains("*")) {
throw new AnalysisException(
s"LOAD DATA input path allows only filename wildcard: $path")
}

val files = new File(dir).listFiles()
if (files == null) {
false
} else {
val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
}
} else {
new File(filePath).exists()
}
if (!exists) {
throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
}
uri
Expand Down Expand Up @@ -650,14 +671,24 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableCommand {
case class ShowColumnsCommand(
databaseName: Option[String],
tableName: TableIdentifier) extends RunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("col_name", StringType, nullable = false)() :: Nil
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
val lookupTable = databaseName match {
case None => tableName
case Some(db) if tableName.database.exists(!resolver(_, db)) =>
throw new AnalysisException(
s"SHOW COLUMNS with conflicting databases: '$db' != '${tableName.database.get}'")
case Some(db) => TableIdentifier(tableName.identifier, Some(db))
}
val table = catalog.getTempViewOrPermanentTableMetadata(lookupTable)
table.schema.map { c =>
Row(c.name)
}
Expand Down
58 changes: 58 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
CREATE DATABASE showdb;

USE showdb;

CREATE TABLE showcolumn1 (col1 int, `col 2` int);
CREATE TABLE showcolumn2 (price int, qty int) partitioned by (year int, month int);
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet;
CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`;


-- only table name
SHOW COLUMNS IN showcolumn1;

-- qualified table name
SHOW COLUMNS IN showdb.showcolumn1;

-- table name and database name
SHOW COLUMNS IN showcolumn1 FROM showdb;

-- partitioned table
SHOW COLUMNS IN showcolumn2 IN showdb;

-- Non-existent table. Raise an error in this case
SHOW COLUMNS IN badtable FROM showdb;

-- database in table identifier and database name in different case
SHOW COLUMNS IN showdb.showcolumn1 from SHOWDB;

-- different database name in table identifier and database name.
-- Raise an error in this case.
SHOW COLUMNS IN showdb.showcolumn1 FROM baddb;

-- show column on temporary view
SHOW COLUMNS IN showcolumn3;

-- error temp view can't be qualified with a database
SHOW COLUMNS IN showdb.showcolumn3;

-- error temp view can't be qualified with a database
SHOW COLUMNS IN showcolumn3 FROM showdb;

-- error global temp view needs to be qualified
SHOW COLUMNS IN showcolumn4;

-- global temp view qualified with database
SHOW COLUMNS IN global_temp.showcolumn4;

-- global temp view qualified with database
SHOW COLUMNS IN showcolumn4 FROM global_temp;

DROP TABLE showcolumn1;
DROP TABLE showColumn2;
DROP VIEW showcolumn3;
DROP VIEW global_temp.showcolumn4;

use default;

DROP DATABASE showdb;

0 comments on commit 635424f

Please sign in to comment.