Skip to content

Commit

Permalink
responded to TD comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jjthomas committed Jul 1, 2016
1 parent a8e60e7 commit 3e15cc9
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Iterator;

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
*
* Usage: JavaStructuredNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Structured Streaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Counts words in UTF8 encoded, '\n' delimited text received from the network.
Usage: structured_network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Structured Streaming
would connect to receive data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
*
* Usage: StructuredNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Structured Streaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,42 @@
// scalastyle:off println
package org.apache.spark.examples.sql.streaming

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, TimestampType}

/**
* Computes the average signal from IoT device readings over a sliding window of
* configurable duration. The readings are received over the network and must be
* UTF8-encoded and separated by '\n'.
*
* A single reading should take the format
* <device name (string)>, <reading (double)>
* Counts words in UTF8 encoded, '\n' delimited text received from the network over a
* sliding window of configurable duration. Each line from the network is tagged
* with a timestamp that is used to determine the windows into which it falls.
*
* Usage: EventTimeWindow <hostname> <port> <window duration>
* <slide duration>
* <hostname> and <port> describe the TCP server that Structured Streaming would connect to
* receive data.
* Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
* <hostname> and <port> describe the TCP server that Structured Streaming
* would connect to receive data.
* <window duration> gives the size of window, specified as integer number of seconds, minutes,
* or days, e.g. "1 minute", "2 seconds"
* <slide duration> gives the amount of time successive windows are offset from one another,
* given in the same units as above
* given in the same units as above. <slide duration> should be less than or equal to
* <window duration>. If the two are equal, successive windows have no overlap.
* (<window duration> and <slide duration> must be enclosed by quotes to ensure that
* they are processed as individual arguments)
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example sql.streaming.EventTimeWindow
* `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
* localhost 9999 <window duration> <slide duration>`
*
* Type device readings in the format given above into Netcat.
*
* An example sequence of device readings:
* dev0,7.0
* dev1,8.0
* dev0,5.0
* dev1,3.0
* One recommended <window duration>, <slide duration> pair is "1 minute",
* "30 seconds"
*/
object EventTimeWindow {
object StructuredNetworkWordCountWindowed {

def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: EventTimeWindow <hostname> <port> <window duration>" +
" <slide duration>")
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration> <slide duration>")
System.exit(1)
}

Expand All @@ -71,37 +64,32 @@ object EventTimeWindow {

val spark = SparkSession
.builder
.appName("EventTimeWindow")
.appName("StructuredNetworkWordCountWindowed")
.getOrCreate()

// Create DataFrame representing the stream of input readings from connection to host:port
import spark.implicits._

// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()

// Split the readings into their individual components
val splitLines = lines.select(
split(lines.col("value"), ",").alias("pieces"),
lines.col("timestamp")
)
.load().as[(String, Timestamp)]

// Place the different components of the readings into different columns and
// cast them appropriately
val formatted = splitLines.select(
trim(splitLines.col("pieces").getItem(0)).as("device"),
trim(splitLines.col("pieces").getItem(1)).cast(DoubleType).as("signal"),
splitLines.col("timestamp")
// Split the lines into words, retaining timestamps
val words = lines.flatMap(line =>
line._1.split(" ")
.map(word => (word, line._2))
)

// Group the readings into windows and compute the signal average within each window
val windowedAvgs = formatted.groupBy(
window(formatted.col("timestamp"), windowSize, slideSize)
).avg("signal")
// Group the data by window and word and compute the count of each group
val windowedAvgs = words.groupBy(
window(words.col("_2"), windowSize, slideSize),
words.col("_1").as("word")
).count()

// Start running the query that prints the windowed averages to the console
// Start running the query that prints the windowed word counts to the console
val query = windowedAvgs.writeStream
.outputMode("complete")
.format("console")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object TextSocketSource {
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
StructField("timestamp", TimestampType) :: Nil)
val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}

/**
Expand Down Expand Up @@ -72,8 +73,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
}
TextSocketSource.this.synchronized {
lines += ((line,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(Calendar.getInstance().getTime())))
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())))
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming

import java.io.{IOException, OutputStreamWriter}
import java.net.ServerSocket
import java.sql.Timestamp
import java.util.concurrent.LinkedBlockingQueue

import org.scalatest.BeforeAndAfterEach
Expand All @@ -27,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
import testImplicits._
Expand Down Expand Up @@ -85,6 +86,47 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
}
}

test("timestamped usage") {
serverThread = new ServerThread()
serverThread.start()

val provider = new TextSocketSourceProvider
val parameters = Map("host" -> "localhost", "port" -> serverThread.port.toString,
"includeTimestamp" -> "true")
val schema = provider.sourceSchema(sqlContext, None, "", parameters)._2
assert(schema === StructType(StructField("value", StringType) ::
StructField("timestamp", TimestampType) :: Nil))

source = provider.createSource(sqlContext, "", None, "", parameters)

failAfter(streamingTimeout) {
serverThread.enqueue("hello")
while (source.getOffset.isEmpty) {
Thread.sleep(10)
}
val offset1 = source.getOffset.get
val batch1 = source.getBatch(None, offset1)
val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq
assert(batch1Seq.map(_._1) === Seq("hello"))
val batch1Stamp = batch1Seq(0)._2

serverThread.enqueue("world")
while (source.getOffset.get === offset1) {
Thread.sleep(10)
}
val offset2 = source.getOffset.get
val batch2 = source.getBatch(Some(offset1), offset2)
val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq
assert(batch2Seq.map(_._1) === Seq("world"))
val batch2Stamp = batch2Seq(0)._2
assert(!batch2Stamp.before(batch1Stamp))

// Try stopping the source to make sure this does not block forever.
source.stop()
source = null
}
}

test("params not given") {
val provider = new TextSocketSourceProvider
intercept[AnalysisException] {
Expand Down

0 comments on commit 3e15cc9

Please sign in to comment.