Skip to content

Commit

Permalink
wrap SQL function with udf
Browse files Browse the repository at this point in the history
  • Loading branch information
younggyu chun committed Jul 24, 2019
2 parents d376e95 + 045191e commit e6ce51a
Show file tree
Hide file tree
Showing 149 changed files with 14,397 additions and 768 deletions.
2 changes: 1 addition & 1 deletion R/README.md
Expand Up @@ -20,7 +20,7 @@ export R_HOME=/home/username/R
Build Spark with [Maven](https://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run

```bash
build/mvn -DskipTests -Psparkr package
./build/mvn -DskipTests -Psparkr package
```

#### Running sparkR
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -25,7 +25,7 @@ This README file only contains basic setup instructions.
Spark is built using [Apache Maven](https://maven.apache.org/).
To build Spark and its example programs, run:

build/mvn -DskipTests clean package
./build/mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.)

Expand Down
Expand Up @@ -182,19 +182,19 @@ private[spark] class ExecutorMonitor(
if (updateExecutors) {
val activeShuffleIds = shuffleStages.map(_._2).toSeq
var needTimeoutUpdate = false
val activatedExecs = new mutable.ArrayBuffer[String]()
val activatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) =>
if (!exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffleIds)
if (exec.hasActiveShuffle) {
needTimeoutUpdate = true
activatedExecs += id
activatedExecs.add(id)
}
}
}

logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " +
s"needed by new job ${event.jobId}.")
logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" +
s"${event.jobId}.")

if (needTimeoutUpdate) {
nextTimeout.set(Long.MinValue)
Expand Down Expand Up @@ -233,18 +233,18 @@ private[spark] class ExecutorMonitor(
}
}

val deactivatedExecs = new mutable.ArrayBuffer[String]()
val deactivatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) =>
if (exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffles)
if (!exec.hasActiveShuffle) {
deactivatedExecs += id
deactivatedExecs.add(id)
}
}
}

logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " +
s"after job ${event.jobId} finished.")
logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " +
s"${event.jobId} finished.")
}

jobToStageIDs.remove(event.jobId).foreach { stages =>
Expand Down Expand Up @@ -448,7 +448,8 @@ private[spark] class ExecutorMonitor(
} else {
idleTimeoutMs
}
idleStart + timeout
val deadline = idleStart + timeout
if (deadline >= 0) deadline else Long.MaxValue
} else {
Long.MaxValue
}
Expand Down Expand Up @@ -491,4 +492,22 @@ private[spark] class ExecutorMonitor(
private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent {
override protected[spark] def logEvent: Boolean = false
}

/** Used to collect executor IDs for debug messages (and avoid too long messages). */
private class ExecutorIdCollector {
private val ids = if (log.isDebugEnabled) new mutable.ArrayBuffer[String]() else null
private var excess = 0

def add(id: String): Unit = if (log.isDebugEnabled) {
if (ids.size < 10) {
ids += id
} else {
excess += 1
}
}

override def toString(): String = {
ids.mkString(",") + (if (excess > 0) s" (and $excess more)" else "")
}
}
}
24 changes: 14 additions & 10 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Expand Up @@ -641,18 +641,22 @@ private[ui] class TaskPagedTable(
<td>{accumulatorsInfo(task)}</td>
}}
{if (hasInput(stage)) {
metricInfo(task) { m =>
val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
val records = m.inputMetrics.recordsRead
<td>{bytesRead} / {records}</td>
}
<td>{
metricInfo(task) { m =>
val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
val records = m.inputMetrics.recordsRead
Unparsed(s"$bytesRead / $records")
}
}</td>
}}
{if (hasOutput(stage)) {
metricInfo(task) { m =>
val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten)
val records = m.outputMetrics.recordsWritten
<td>{bytesWritten} / {records}</td>
}
<td>{
metricInfo(task) { m =>
val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten)
val records = m.outputMetrics.recordsWritten
Unparsed(s"$bytesWritten / $records")
}
}</td>
}}
{if (hasShuffleRead(stage)) {
<td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
Expand Down
Expand Up @@ -367,6 +367,26 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2"))
}

test("SPARK-28455: avoid overflow in timeout calculation") {
conf
.set(DYN_ALLOCATION_SHUFFLE_TIMEOUT, Long.MaxValue)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)

// Generate events that will make executor 1 be idle, while still holding shuffle data.
// The executor should not be eligible for removal since the timeout is basically "infinite".
val stage = stageInfo(1, shuffleId = 0)
monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage)))
clock.advance(1000L)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null))
monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))

assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}

private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1
Expand Down
4 changes: 2 additions & 2 deletions docs/README.md
Expand Up @@ -84,7 +84,7 @@ $ PRODUCTION=1 jekyll build

## API Docs (Scaladoc, Javadoc, Sphinx, roxygen2, MkDocs)

You can build just the Spark scaladoc and javadoc by running `build/sbt unidoc` from the `$SPARK_HOME` directory.
You can build just the Spark scaladoc and javadoc by running `./build/sbt unidoc` from the `$SPARK_HOME` directory.

Similarly, you can build just the PySpark docs by running `make html` from the
`$SPARK_HOME/python/docs` directory. Documentation is only generated for classes that are listed as
Expand All @@ -94,7 +94,7 @@ after [building Spark](https://github.com/apache/spark#building-spark) first.

When you run `jekyll build` in the `docs` directory, it will also copy over the scaladoc and javadoc for the various
Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a
jekyll plugin to run `build/sbt unidoc` before building the site so if you haven't run it (recently) it
jekyll plugin to run `./build/sbt unidoc` before building the site so if you haven't run it (recently) it
may take some time as it generates all of the scaladoc and javadoc using [Unidoc](https://github.com/sbt/sbt-unidoc).
The jekyll plugin also generates the PySpark docs using [Sphinx](http://sphinx-doc.org/), SparkR docs
using [roxygen2](https://cran.r-project.org/web/packages/roxygen2/index.html) and SQL docs
Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-kubernetes.md
Expand Up @@ -121,7 +121,7 @@ $ ./bin/docker-image-tool.sh -r <repo> -t my-tag -R ./kubernetes/dockerfiles/spa
To launch Spark Pi in cluster mode,

```bash
$ bin/spark-submit \
$ ./bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
Expand Down
4 changes: 2 additions & 2 deletions docs/running-on-mesos.md
Expand Up @@ -212,7 +212,7 @@ protected (port 7077 by default).

By setting the Mesos proxy config property (requires mesos version >= 1.4), `--conf spark.mesos.proxy.baseURL=http://localhost:5050` when launching the dispatcher, the mesos sandbox URI for each driver is added to the mesos dispatcher UI.

If you like to run the `MesosClusterDispatcher` with Marathon, you need to run the `MesosClusterDispatcher` in the foreground (i.e: `bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher`). Note that the `MesosClusterDispatcher` not yet supports multiple instances for HA.
If you like to run the `MesosClusterDispatcher` with Marathon, you need to run the `MesosClusterDispatcher` in the foreground (i.e: `./bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher`). Note that the `MesosClusterDispatcher` not yet supports multiple instances for HA.

The `MesosClusterDispatcher` also supports writing recovery state into Zookeeper. This will allow the `MesosClusterDispatcher` to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring `spark.deploy.recoveryMode` and related spark.deploy.zookeeper.* configurations.
For more information about these configurations please refer to the configurations [doc](configuration.html#deploy).
Expand Down Expand Up @@ -362,7 +362,7 @@ The External Shuffle Service to use is the Mesos Shuffle Service. It provides sh
on top of the Shuffle Service since Mesos doesn't yet support notifying another framework's
termination. To launch it, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all slave nodes, with `spark.shuffle.service.enabled` set to `true`.

This can also be achieved through Marathon, using a unique host constraint, and the following command: `bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService`.
This can also be achieved through Marathon, using a unique host constraint, and the following command: `./bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService`.

# Configuration

Expand Down
2 changes: 1 addition & 1 deletion docs/sql-data-sources-jdbc.md
Expand Up @@ -36,7 +36,7 @@ spark classpath. For example, to connect to postgres from the Spark Shell you wo
following command:

{% highlight bash %}
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
{% endhighlight %}

Tables from the remote database can be loaded as a DataFrame or Spark SQL temporary view using
Expand Down
4 changes: 3 additions & 1 deletion docs/sql-migration-guide-upgrade.md
Expand Up @@ -151,7 +151,9 @@ license: |

- Since Spark 3.0, substitution order of nested WITH clauses is changed and an inner CTE definition takes precedence over an outer. In version 2.4 and earlier, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` returns `1` while in version 3.0 it returns `2`. The previous behaviour can be restored by setting `spark.sql.legacy.ctePrecedence.enabled` to `true`.

- Since Spark 3.0, the `add_months` function adjusts the resulting date to a last day of month only if it is invalid. For example, `select add_months(DATE'2019-01-31', 1)` results `2019-02-28`. In Spark version 2.4 and earlier, the resulting date is adjusted when it is invalid, or the original date is a last day of months. For example, adding a month to `2019-02-28` resultes in `2019-03-31`.
- Since Spark 3.0, the `add_months` function does not adjust the resulting date to a last day of month if the original date is a last day of months. For example, `select add_months(DATE'2019-02-28', 1)` results `2019-03-28`. In Spark version 2.4 and earlier, the resulting date is adjusted when the original date is a last day of months. For example, adding a month to `2019-02-28` results in `2019-03-31`.

- Since Spark 3.0, 0-argument Java UDF is executed in the executor side identically with other UDFs. In Spark version 2.4 and earlier, 0-argument Java UDF alone was executed in the driver side, and the result was propagated to executors, which might be more performant in some cases but caused inconsistency with a correctness issue in some cases.

## Upgrading from Spark SQL 2.4 to 2.4.1

Expand Down
8 changes: 4 additions & 4 deletions docs/streaming-kinesis-integration.md
Expand Up @@ -222,17 +222,17 @@ To run the example,
<div class="codetabs">
<div data-lang="scala" markdown="1">

bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]

</div>
<div data-lang="java" markdown="1">

bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]

</div>
<div data-lang="python" markdown="1">

bin/spark-submit --jars external/kinesis-asl/target/scala-*/\
./bin/spark-submit --jars external/kinesis-asl/target/scala-*/\
spark-streaming-kinesis-asl-assembly_*.jar \
external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
[Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
Expand All @@ -244,7 +244,7 @@ To run the example,

- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.

bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10

This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.

Expand Down
10 changes: 10 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Expand Up @@ -388,6 +388,16 @@ The following configurations are optional:
<td>streaming and batch</td>
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
</tr>
<tr>
<td>minPartitions</td>
<td>int</td>
<td>none</td>
<td>streaming and batch</td>
<td>Minimum number of partitions to read from Kafka.
By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka.
If you set this option to a value greater than your topicPartitions, Spark will divvy up large
Kafka partitions to smaller pieces.</td>
</tr>
<tr>
<td>groupIdPrefix</td>
<td>string</td>
Expand Down
6 changes: 3 additions & 3 deletions docs/structured-streaming-programming-guide.md
Expand Up @@ -511,7 +511,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
There are a few built-in sources.

- **File source** - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If `latestFirst` is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-0-10-integration.html) for more details.
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.

- **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

Expand Down Expand Up @@ -582,7 +582,7 @@ Here are the details of all the sources in Spark.
<tr>
<td><b>Kafka Source</b></td>
<td>
See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a>.
See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>.
</td>
<td>Yes</td>
<td></td>
Expand Down Expand Up @@ -1835,7 +1835,7 @@ Here are the details of all the sinks in Spark.
<tr>
<td><b>Kafka Sink</b></td>
<td>Append, Update, Complete</td>
<td>See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a></td>
<td>See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td>
<td>Yes (at-least-once)</td>
<td>More details in the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td>
</tr>
Expand Down
Expand Up @@ -1001,14 +1001,14 @@ abstract class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUti
sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.contains("Cannot save interval data type into external storage.") ||
msg.contains("AVRO data source does not support calendarinterval data type."))
msg.contains("AVRO data source does not support interval data type."))

msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"avro data source does not support calendarinterval data type."))
.contains(s"avro data source does not support interval data type."))
}
}

Expand Down
Expand Up @@ -206,4 +206,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
""".stripMargin.replaceAll("\n", " "))
assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
}

test("write byte as smallint") {
sqlContext.createDataFrame(Seq((1.toByte, 2.toShort)))
.write.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
val schema = df.schema
assert(schema.head.dataType == ShortType)
assert(schema(1).dataType == ShortType)
val rows = df.collect()
assert(rows.length === 1)
assert(rows(0).getShort(0) === 1)
assert(rows(0).getShort(1) === 2)
}
}
Expand Up @@ -603,6 +603,15 @@ class SparseVector @Since("2.0.0") (

private[spark] override def asBreeze: BV[Double] = new BSV[Double](indices, values, size)

override def apply(i: Int): Double = {
if (i < 0 || i >= size) {
throw new IndexOutOfBoundsException(s"Index $i out of bounds [0, $size)")
}

val j = util.Arrays.binarySearch(indices, i)
if (j < 0) 0.0 else values(j)
}

override def foreachActive(f: (Int, Double) => Unit): Unit = {
var i = 0
val localValuesSize = values.length
Expand Down
Expand Up @@ -785,6 +785,15 @@ class SparseVector @Since("1.0.0") (

private[spark] override def asBreeze: BV[Double] = new BSV[Double](indices, values, size)

override def apply(i: Int): Double = {
if (i < 0 || i >= size) {
throw new IndexOutOfBoundsException(s"Index $i out of bounds [0, $size)")
}

val j = util.Arrays.binarySearch(indices, i)
if (j < 0) 0.0 else values(j)
}

@Since("1.6.0")
override def foreachActive(f: (Int, Double) => Unit): Unit = {
var i = 0
Expand Down

0 comments on commit e6ce51a

Please sign in to comment.