From 0a8c5c3ead26fe8c3df13b463b40331a6dd40970 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 24 Sep 2015 13:27:27 -0700 Subject: [PATCH] StreamFrame API alternatives. --- .../spark/sql/streamv1/StreamFrame.scala | 93 ++++++++++++++++++ .../spark/sql/streamv1/WindowSpec.scala | 28 ++++++ .../spark/sql/streamv2/StreamFrame.scala | 95 +++++++++++++++++++ 3 files changed, 216 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streamv1/StreamFrame.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streamv1/WindowSpec.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streamv2/StreamFrame.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streamv1/StreamFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/streamv1/StreamFrame.scala new file mode 100644 index 0000000000000..ecc5d04271f8c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streamv1/StreamFrame.scala @@ -0,0 +1,93 @@ +package org.apache.spark.sql.streamv1 + +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Column, DataFrame} + + +/** + * Version A: A single StreamFrame abstraction to represent unwindowed stream and windowed stream. + * + * There are two alternatives for the blocking operations in StreamFrame when it is not windowed: + * + * A1. Blocking operations return new StreamFrames, and emits a new tuple for every update. + * As an example, sf.groupby("id").count() will emit a tuple every time we see a new record + * for "id", i.e. a running count. Note that these operations will be expensive, because + * they require ordering all the inputs by time. + * + * A2. Blocking operation throw runtime exceptions saying they are not supported. + */ +class StreamFrame { + + ///////////////////////////////////////////////////////////////////// + // Meta operations + ///////////////////////////////////////////////////////////////////// + + def schema: StructType = ??? + + def dtypes: Array[(String, String)] = ??? + + def columns: Array[String] = ??? + + def printSchema(): Unit = ??? + + def explain(extended: Boolean): Unit = ??? + + ///////////////////////////////////////////////////////////////////// + // Window specification + ///////////////////////////////////////////////////////////////////// + + def window(window: WindowSpec): StreamFrame = ??? + + ///////////////////////////////////////////////////////////////////// + // Pipelined operations: + // - works only within a bounded dataset. + // - throws runtime exception if called on an unbounded dataset. + ///////////////////////////////////////////////////////////////////// + + def select(cols: Column*): StreamFrame = ??? + + def filter(condition: Column): StreamFrame = ??? + + def drop(col: Column): StreamFrame = ??? + + def withColumn(colName: String, col: Column): StreamFrame = ??? + + def withColumnRenamed(existingName: String, newName: String): StreamFrame = ??? + + def join(right: DataFrame): StreamFrame = ??? + + def write: StreamFrameWriter = ??? + + ///////////////////////////////////////////////////////////////////// + // Blocking operations: works only within a window + ///////////////////////////////////////////////////////////////////// + + def agg(exprs: Column*): StreamFrame = ??? + + def groupby(cols: Column*): GroupedStreamFrame = ??? + + def cube(cols: Column*): GroupedStreamFrame = ??? + + def rollup(cols: Column*): GroupedStreamFrame = ??? + + def sort(sortExprs: Column*): StreamFrame = ??? + + def dropDuplicates(colNames: Seq[String]): StreamFrame = ??? + + def distinct(): StreamFrame = ??? +} + + +class GroupedStreamFrame { + + def agg(exprs: Column*): StreamFrame = ??? + + def avg(colNames: String*): StreamFrame = ??? + + // ... +} + + +class StreamFrameWriter { + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streamv1/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/streamv1/WindowSpec.scala new file mode 100644 index 0000000000000..83ed5723c82f7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streamv1/WindowSpec.scala @@ -0,0 +1,28 @@ +package org.apache.spark.sql.streamv1 + +trait WindowSpec + +class TimeBasedWindow private() extends WindowSpec { + def over(length: Long): TimeBasedWindow = ??? + def every(interval: Long): TimeBasedWindow = ??? +} + +object TimeBasedWindow { + def over(length: Long): TimeBasedWindow = { + new TimeBasedWindow().over(length) + } + + def every(interval: Long): TimeBasedWindow = { + new TimeBasedWindow().every(interval) + } +} + + +class GlobalWindow private (interval: Long) extends WindowSpec + +object GlobalWindow { + def every(interval: Long): GlobalWindow = { + new GlobalWindow(interval) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streamv2/StreamFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/streamv2/StreamFrame.scala new file mode 100644 index 0000000000000..0231414466abf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streamv2/StreamFrame.scala @@ -0,0 +1,95 @@ +package org.apache.spark.sql.streamv2 + +import org.apache.spark.sql.streamv1.WindowSpec +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Column} + + +/** + * Version B: A StreamFrame, and a WindowedStreamFrame, which can be created by StreamFrame.window. + * + * Blocking operations are only available on WindowedStreamFrame. + */ +class StreamFrame { + ///////////////////////////////////////////////////////////////////// + // Meta operations + ///////////////////////////////////////////////////////////////////// + + def schema: StructType = ??? + + def dtypes: Array[(String, String)] = ??? + + def columns: Array[String] = ??? + + def printSchema(): Unit = ??? + + def explain(extended: Boolean): Unit = ??? + + ///////////////////////////////////////////////////////////////////// + // Window specification + ///////////////////////////////////////////////////////////////////// + + def window(window: WindowSpec): WindowedStreamFrame = ??? + + ///////////////////////////////////////////////////////////////////// + // Pipelined operations: + // - works only within a bounded dataset. + // - throws runtime exception if called on an unbounded dataset. + ///////////////////////////////////////////////////////////////////// + + def select(cols: Column*): StreamFrame = ??? + + def filter(condition: Column): StreamFrame = ??? + + def drop(col: Column): StreamFrame = ??? + + def withColumn(colName: String, col: Column): StreamFrame = ??? + + def withColumnRenamed(existingName: String, newName: String): StreamFrame = ??? + + def join(right: DataFrame): StreamFrame = ??? + + def write: StreamFrameWriter = ??? + +} + + +/** + * A WindowedStreamFrame can run all the operations available on StreamFrame, and also blocking + * operations. + */ +class WindowedStreamFrame extends StreamFrame { + + ///////////////////////////////////////////////////////////////////// + // Blocking operations: works only within a window + ///////////////////////////////////////////////////////////////////// + + def agg(exprs: Column*): StreamFrame = ??? + + def groupby(cols: Column*): GroupedStreamFrame = ??? + + def cube(cols: Column*): GroupedStreamFrame = ??? + + def rollup(cols: Column*): GroupedStreamFrame = ??? + + def sort(sortExprs: Column*): StreamFrame = ??? + + def dropDuplicates(colNames: Seq[String]): StreamFrame = ??? + + def distinct(): StreamFrame = ??? +} + + +class GroupedStreamFrame { + + def agg(exprs: Column*): StreamFrame = ??? + + def avg(colNames: String*): StreamFrame = ??? + + // ... +} + + +class StreamFrameWriter { + +}