Skip to content

Commit

Permalink
Merge pull request #29 from VirtusLab/named-columns
Browse files Browse the repository at this point in the history
Replace tuples of columns with `Columns(...)` idiom
  • Loading branch information
prolativ committed May 21, 2023
2 parents 39792d2 + d97b52b commit dc10a93
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 210 deletions.
53 changes: 29 additions & 24 deletions USAGE-DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
:warning: This library is in its early stage of development - the syntax and type hierarchy might still change,
the coverage of Spark's API is far from being complete and more tests are needed.

### First steps

1) Add Iskra as a dependency to your project, e.g.

* in a file compiled with Scala CLI:
Expand All @@ -20,8 +22,8 @@ the coverage of Spark's API is far from being complete and more tests are needed
libraryDependencies += "org.virtuslab" %% "iskra" % "0.0.4-SNAPSHOT"
```

Iskra is built with Scala 3.1.3 so it's compatible with Scala 3.1.x and newer minor releases (starting from 3.2.0 you'll get code completions for names of columns in REPL and Metals!).
Iskra transitively depends on Spark 3.2.0.
Iskra is built with Scala 3.3.0 so it's compatible with Scala 3.3.x (LTS) and newer minor releases.
Iskra transitively depends on Spark 3.2.0.

2) Import the basic definitions from the API
```scala
Expand All @@ -39,41 +41,44 @@ Iskra transitively depends on Spark 3.2.0.
```

4) Create a typed data frame in either of the two ways:
a) by using `toDF` extension method on a `Seq` of case classes, e.g.
* by using `.toDF` extension method on a `Seq` of case classes, e.g.
```scala
Seq(Foo(1, "abc"), Foo(2, "xyz")).toDF
```
Note that this variant of `toDF` comes from `org.virtuslab.iskra.api` rather than from `spark.implicits`.
Note that this variant of `.toDF` comes from `org.virtuslab.iskra.api` rather than from `spark.implicits`.

b) by taking a good old (untyped) data frame and calling `typed` extension method on it with a type parameter representing a case class, e.g.
* by taking a good old (untyped) data frame and calling `typed` extension method on it with a type parameter representing a case class, e.g.
```scala
df.typed[Foo]
```
In case you needed to get back to the unsafe world of untyped data frames for some reason, just call `.untyped` on a typed data frame.

5) Data frames created as above are called `ClassDataFrame`s as their compile-time schema is represented by a case class. To perform most of operations (e.g. selections, joins, etc.) on such a data frame you'll need to turn it into a structural data frame (`StructDataFrame`) first by calling `.asStruct` on it. (This restriction might be loosened in the future.)
5) Follow your intuition of a Spark developer :wink: This library is intended to maximally resemble the original API of Spark (e.g. by using the same names of methods, etc.) where possible, although trying to make the code feel more like regular Scala without unnecessary boilerplate but with some other syntactic improvements.

6) On the other hand, some operations (like `.collect()`), can be performed only on a `ClassDataFrame` because they require a specific case class model to be known at compile time. To transform a `StructDataFrame` back to a `ClassDataFrame` with a given case class `A` as model use `.asClass[A]`.
6) Look at the [examples](src/test/example/).

7) From now on, just try to follow your intuition of a Spark developer :wink:
### Key concepts and differences to untyped Spark

This library is intended to maximally resemble the original API of Spark (e.g. by using the same names of methods, etc.) where possible, although trying to make the code feel more like regular Scala without unnecessary boilerplate and adding some other syntactic improvements.
Data frames created with `.toDF` or `.typed` like above are called `ClassDataFrame`s as their compile-time schema is represented by a case class. As an alternative to case classes, schemas of data frames can be expressed structurally. If this is a case, such a data frame is called a `StructDataFrame`. Some data frame operations might be available only for either `ClassDataFrame`s or `StructDataFrame` while some operations are available for both. This depends on the semantics of each operation and on implementational restrictions (which might get lifted in the future). To turn a `ClassDataFrame` into a `StructDataFrame` or vice versa use `.asStruct` or `.asClass[A]` method respectively.

Most important differences:
* Refer to columns (also with prefixes specifying the alias for a dataframe in case of ambiguities) simply with `$.foo.bar` instead of `$"foo.bar"` or `col("foo.bar")`. Use backticks when necessary, e.g. ``$.`column with spaces` ``.
* From inside of `.select(...)` or `.select{...}` you should return something that is a named column or a tuple of named columns. Because of how Scala syntax works you can write simply `.select($.x, $.y)` instead of `select(($.x, $.y))`. With braces you can compute intermediate values like
```scala
.select {
val sum = ($.x + $.y).as("sum")
($.x, $.y, sum)
}
```

* Syntax for joins looks slightly more like SQL, but with dots and parentheses as for usual method calls, e.g.
```scala
foos.innerJoin(bars).on($.foos.barId === $.bars.id).select(...)
```
When operating on a data frame, `$` represents the schema of this frame, from which columns can be selected like ordinary class memebers. So to refer to a column called `foo` instead of writing `col("foo")` or `$"foo"` write `$.foo`. If the name of a column is not a valid Scala identifier, you can use backticks, e.g. ``$.`column with spaces` ``. Similarly the syntax `$.foo.bar` can be used to refer to a column originating from a specific data frame to avoid ambiguities. This corresponds to `col("foo.bar")` or `$"foo.bar"` in vanilla Spark.

* As you might have noticed above, the aliases for `foos` and `bars` were automatically inferred

8) For reference look at the [examples](src/test/example/) and the [API docs](https://virtuslab.github.io/iskra/)
Some operations like `.select(...)` or `.agg(...)` accept potentially multiple columns as arguments. You can pass individual columns separately, like `.select($.foo, $.bar)` or you can aggregate them usings `Columns(...)`, i.e. `select(Columns($.foo, $.bar))`. `Columns` will eventually get flattened so these who syntaxes are semantically equivalent. However, `Columns(...)` syntax might come in handy e.g. if you needed to embed a block of code as an argument to `.select { ... }`, e.g.
```scala
.select {
val sum = ($.x + $.y).as("sum")
Columns($.x, $.y, sum)
}
```

The syntax for joins looks slightly more like SQL, but with dots and parentheses as for usual method calls, e.g.
```scala
foos.innerJoin(bars).on($.foos.barId === $.bars.id).select(...)
```

As you might have noticed above, the aliases for `foos` and `bars` were automatically inferred so you don't have to write
```scala
foos.as("foos").innerJoin(bars.as("bars"))
```
35 changes: 33 additions & 2 deletions src/main/Column.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
package org.virtuslab.iskra

import org.apache.spark.sql.{ Column => UntypedColumn}
import scala.quoted.*

import org.apache.spark.sql.{Column => UntypedColumn}
import types.DataType

sealed trait NamedColumns[Schema](val underlyingColumns: Seq[UntypedColumn])

object Columns:
transparent inline def apply(inline columns: NamedColumns[?]*): NamedColumns[?] = ${ applyImpl('columns) }

private def applyImpl(columns: Expr[Seq[NamedColumns[?]]])(using Quotes): Expr[NamedColumns[?]] =
import quotes.reflect.*

val columnValuesWithTypes = columns match
case Varargs(colExprs) =>
colExprs.map { arg =>
arg match
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
}

val columnsValues = columnValuesWithTypes.map(_._1)
val columnsTypes = columnValuesWithTypes.map(_._2)

val schemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)

schemaTpe match
case '[s] =>
'{
val cols = ${ Expr.ofSeq(columnsValues) }.flatten
new NamedColumns[s](cols) {}
}

class Column[+T <: DataType](val untyped: UntypedColumn):

inline def name(using v: ValueOf[Name]): Name = v.value
Expand Down Expand Up @@ -30,7 +59,9 @@ object Column:
inline def ||[T2 <: DataType](col2: Column[T2])(using op: ColumnOp.Or[T1, T2]): Column[op.Out] = op(col1, col2)

@annotation.showAsInfix
class :=[L <: LabeledColumn.Label, T <: DataType](untyped: UntypedColumn) extends Column[T](untyped)
class :=[L <: LabeledColumn.Label, T <: DataType](untyped: UntypedColumn)
extends Column[T](untyped)
with NamedColumns[(L := T) *: EmptyTuple](Seq(untyped))

@annotation.showAsInfix
trait /[+Prefix <: Name, +Suffix <: Name]
Expand Down
1 change: 0 additions & 1 deletion src/main/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ object DataFrame:
export Join.dataFrameJoinOps
export GroupBy.dataFrameGroupByOps
export Where.dataFrameWhereOps
export WithColumn.dataFrameWithColumnOps
export WithColumns.dataFrameWithColumnsOps

given dataFrameOps: {} with
Expand Down
8 changes: 4 additions & 4 deletions src/main/DataFrameBuilders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object DataFrameBuilders:
val encodeFun: Expr[A => sql.Row] = '{ (value: A) => sql.Row(${ encoder }.encode(value)) }
(schema, encodeFun)

'{
val rowRDD = ${ spark }.sparkContext.parallelize(${ seq }.map(${ encodeFun }))
ClassDataFrame[A](${ spark }.createDataFrame(rowRDD, ${ schema }))
}
'{
val rowRDD = ${ spark }.sparkContext.parallelize(${ seq }.map(${ encodeFun }))
ClassDataFrame[A](${ spark }.createDataFrame(rowRDD, ${ schema }))
}
9 changes: 8 additions & 1 deletion src/main/FrameSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.virtuslab.iskra

import scala.quoted.*
import scala.deriving.Mirror
import types.DataType
import types.{DataType, Encoder, StructEncoder}
import MacroHelpers.TupleSubtype

object FrameSchema:
Expand Down Expand Up @@ -40,3 +40,10 @@ object FrameSchema:
case '[(label := column) *: tail] => isValidType(Type.of[tail])
case '[label := column] => true
case _ => false

def schemaTypeFromColumnsTypes(colTypes: Seq[Type[?]])(using Quotes): Type[? <: Tuple] =
colTypes match
case Nil => Type.of[EmptyTuple]
case '[TupleSubtype[headTpes]] :: tail =>
schemaTypeFromColumnsTypes(tail) match
case '[TupleSubtype[tailTpes]] => Type.of[Tuple.Concat[headTpes, tailTpes]]
88 changes: 45 additions & 43 deletions src/main/Grouping.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,46 @@ object GroupBy:

given groupByOps: {} with
extension [View <: SchemaView](groupBy: GroupBy[View])
transparent inline def apply[GroupingColumns](groupingColumns: View ?=> GroupingColumns) = ${ applyImpl[View, GroupingColumns]('groupBy, 'groupingColumns) }
transparent inline def apply(inline groupingColumns: View ?=> NamedColumns[?]*) = ${ applyImpl[View]('groupBy, 'groupingColumns) }

def groupByImpl[S : Type](df: Expr[StructDataFrame[S]])(using Quotes): Expr[GroupBy[?]] =
import quotes.reflect.asTerm
val viewExpr = SchemaView.schemaViewExpr[StructDataFrame[S]]
val viewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[S]]
viewExpr.asTerm.tpe.asType match
case '[SchemaView.Subtype[v]] =>
'{ GroupBy[v](${ viewExpr }.asInstanceOf[v], ${ df }.untyped) }

def applyImpl[View <: SchemaView : Type, GroupingColumns : Type](groupBy: Expr[GroupBy[View]], groupingColumns: Expr[View ?=> GroupingColumns])(using Quotes): Expr[GroupedDataFrame[View]] =
def applyImpl[View <: SchemaView : Type](groupBy: Expr[GroupBy[View]], groupingColumns: Expr[Seq[View ?=> NamedColumns[?]]])(using Quotes): Expr[GroupedDataFrame[View]] =
import quotes.reflect.*
Type.of[GroupingColumns] match
case '[name := colType] =>
val groupedViewExpr = SchemaView.schemaViewExpr[StructDataFrame[name := colType]]
groupedViewExpr.asTerm.tpe.asType match
case '[SchemaView.Subtype[gv]] =>
'{
new GroupedDataFrame[View]:
type GroupingKeys = (name := colType) *: EmptyTuple
type GroupedView = gv
def underlying = ${ groupBy }.underlying.groupBy(${ groupingColumns }(using ${ groupBy }.view).asInstanceOf[name := colType].untyped)
def fullView = ${ groupBy }.view
def groupedView = ${ groupedViewExpr }.asInstanceOf[GroupedView]
}

case '[TupleSubtype[s]] if FrameSchema.isValidType(Type.of[s]) =>
val groupedViewExpr = SchemaView.schemaViewExpr[StructDataFrame[s]]
val columnValuesWithTypes = groupingColumns match
case Varargs(colExprs) =>
colExprs.map { arg =>
val reduced = Term.betaReduce('{$arg(using ${ groupBy }.view)}.asTerm).get
reduced.asExpr match
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
}

val columnsValues = columnValuesWithTypes.map(_._1)
val columnsTypes = columnValuesWithTypes.map(_._2)

val groupedSchemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)
groupedSchemaTpe match
case '[TupleSubtype[groupingKeys]] =>
val groupedViewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[groupingKeys]]

groupedViewExpr.asTerm.tpe.asType match
case '[SchemaView.Subtype[gv]] =>
case '[SchemaView.Subtype[groupedView]] =>
'{
val groupingCols = ${ groupingColumns }(using ${ groupBy }.view).asInstanceOf[s].toList.map(_.asInstanceOf[Column[DataType]].untyped)
val groupingCols = ${ Expr.ofSeq(columnsValues) }.flatten
new GroupedDataFrame[View]:
type GroupingKeys = s
type GroupedView = gv
type GroupingKeys = groupingKeys
type GroupedView = groupedView
def underlying = ${ groupBy }.underlying.groupBy(groupingCols*)
def fullView = ${ groupBy }.view
def groupedView = ${ groupedViewExpr }.asInstanceOf[GroupedView]
}

case '[t] =>
val errorMsg = s"""The parameter of `groupBy` should be a named column (e.g. of type: "foo" := StringType) or a tuple of named columns but it has type: ${Type.show[t]}"""
report.errorAndAbort(errorMsg, MacroHelpers.callPosition(groupBy))

// TODO: Rename to RelationalGroupedDataset and handle other aggregations: cube, rollup (and pivot?)
trait GroupedDataFrame[FullView <: SchemaView]:
import GroupedDataFrame.*
Expand All @@ -69,38 +66,43 @@ trait GroupedDataFrame[FullView <: SchemaView]:
object GroupedDataFrame:
given groupedDataFrameOps: {} with
extension [FullView <: SchemaView, GroupKeys <: Tuple, GroupView <: SchemaView](gdf: GroupedDataFrame[FullView]{ type GroupedView = GroupView; type GroupingKeys = GroupKeys })
transparent inline def agg[AggregatedColumns](columns: Agg { type View = FullView } ?=> GroupView ?=> AggregatedColumns): StructDataFrame[?] =
${ aggImpl[FullView, GroupKeys, GroupView, AggregatedColumns]('gdf, 'columns) }
transparent inline def agg(inline columns: (Agg { type View = FullView }, GroupView) ?=> NamedColumns[?]*): StructDataFrame[?] =
${ aggImpl[FullView, GroupKeys, GroupView]('gdf, 'columns) }


def aggImpl[FullView <: SchemaView : Type, GroupingKeys <: Tuple : Type, GroupView <: SchemaView : Type, AggregatedColumns : Type](
def aggImpl[FullView <: SchemaView : Type, GroupingKeys <: Tuple : Type, GroupView <: SchemaView : Type](
gdf: Expr[GroupedDataFrame[FullView] { type GroupedView = GroupView }],
columns: Expr[Agg { type View = FullView } ?=> GroupView ?=> AggregatedColumns]
columns: Expr[Seq[(Agg { type View = FullView }, GroupView) ?=> NamedColumns[?]]]
)(using Quotes): Expr[StructDataFrame[?]] =
import quotes.reflect.*

val aggWrapper = '{
new Agg:
type View = FullView
val view = ${ gdf }.fullView
}
Type.of[AggregatedColumns] match
case '[name := colType] =>
'{
val col = ${ columns }(using ${ aggWrapper })(using ${ gdf }.groupedView).asInstanceOf[name := colType].untyped
StructDataFrame[FrameSchema.Merge[GroupingKeys, name := colType]](
${ gdf }.underlying.agg(col)
)

val columnValuesWithTypes = columns match
case Varargs(colExprs) =>
colExprs.map { arg =>
val reduced = Term.betaReduce('{$arg(using ${ aggWrapper }, ${ gdf }.groupedView)}.asTerm).get
reduced.asExpr match
case '{ $value: NamedColumns[schema] } => ('{ ${ value }.underlyingColumns }, Type.of[schema])
}
case '[TupleSubtype[s]] if FrameSchema.isValidType(Type.of[s]) =>

val columnsValues = columnValuesWithTypes.map(_._1)
val columnsTypes = columnValuesWithTypes.map(_._2)

val schemaTpe = FrameSchema.schemaTypeFromColumnsTypes(columnsTypes)
schemaTpe match
case '[s] =>
'{
val cols = ${ columns }(using ${ aggWrapper })(using ${ gdf }.groupedView)
.asInstanceOf[s].toList.map(_.asInstanceOf[Column[DataType]].untyped)
// TODO assert cols is not empty
val cols = ${ Expr.ofSeq(columnsValues) }.flatten
StructDataFrame[FrameSchema.Merge[GroupingKeys, s]](
${ gdf }.underlying.agg(cols.head, cols.tail*)
)
}
case '[t] =>
val errorMsg = s"""The parameter of `agg` should be a named column (e.g. of type: "foo" := StringType) or a tuple of named columns but it has type: ${Type.show[t]}"""
report.errorAndAbort(errorMsg, MacroHelpers.callPosition(gdf))

trait Agg:
type View <: SchemaView
Expand Down
2 changes: 1 addition & 1 deletion src/main/JoinOnCondition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object JoinOnCondition:
case '[viewSchema] if FrameSchema.isValidType(Type.of[viewSchema]) =>
Type.of[MergeSchemas[s1, s2]] match
case '[joinedSchema] if FrameSchema.isValidType(Type.of[joinedSchema]) =>
val viewExpr = SchemaView.schemaViewExpr[StructDataFrame[viewSchema]]
val viewExpr = StructSchemaView.schemaViewExpr[StructDataFrame[viewSchema]]
viewExpr.asTerm.tpe.asType match
case '[SchemaView.Subtype[v]] =>
'{
Expand Down
Loading

0 comments on commit dc10a93

Please sign in to comment.