In [1]:
import $ivy.`org.apache.spark::spark-sql:3.1.1`
import $ivy.`org.typelevel::cats-core:2.3.0`
import $ivy.`com.lihaoyi::sourcecode:0.2.6`
import $cp.`doric_2.12-0.0.1.jar`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36m$cp.$                     [39m

In [2]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{functions => f}
import habla.doric._
import habla.doric.{functions => doricf}

[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, lit}
[39m
[32mimport [39m[36morg.apache.spark.sql.{functions => f}
[39m
[32mimport [39m[36mhabla.doric._
[39m
[32mimport [39m[36mhabla.doric.{functions => doricf}[39m

In [3]:
val spark = org.apache.spark.sql.SparkSession.builder().appName("test").master("local").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/04/27 17:30:01 INFO SparkContext: Running Spark version 3.1.1
21/04/27 17:30:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/04/27 17:30:01 INFO ResourceUtils: No custom resources configured for spark.driver.
21/04/27 17:30:01 INFO SparkContext: Submitted application: test
21/04/27 17:30:01 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/04/27 17:30:01 INFO ResourceProfile: Limiting resource is cpu
21/04/27 17:30:01 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/04/27 17:30:01 INFO SecurityManager: Changing view acls to: jovyan
21/04/27 17:30:01 INFO Security

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@2ebf24f7

In [4]:
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

Doric accelerates the spark developer with three simple steps.
1. Column type validation
2. Typesafety of the operations between columns
3. Aggregation of multiple errors with improved location of the error place

## 1. Column Type validation

Spark works with columns. Until runtime the dataframe doesn't know the type, but the developer knows it, and can add a validation of the expected type. Imagine that we are expecting an dataframe with an id of type string, but in our flow we can't validate it.

In [5]:
spark.range(1,10).withColumn("x", f.concat(col("id"), lit("jander"))) // id is really a string, or spark is creating a implicit conversion?

[36mres4[39m: [32mDataFrame[39m = [id: bigint, x: string]

In [6]:
val df = spark.range(1,10).toDF

df.withColumn("x", doricf.concat(getString("id"), "jander".lit))

: 

As you can see, doric will treat as an error that the column type is invalid, this gives doric a lot of power, only changing the way that you get a column.

## 2. Typesafety of the operations with columns

Now that we have certinty that we will have a column of the expected type, or the program won't run, doric can prevent at compile time, errors like:

In [7]:
val dfEq = List((1, "1"), (1, " 1"), (1, " 1 ")).toDF("int", "str")

[36mdfEq[39m: [32mDataFrame[39m = [int: int, str: string]

In [8]:
dfEq.withColumn("eq", col("int") === col("str")).show

+---+---+----+
|int|str|  eq|
+---+---+----+
|  1|  1|true|
|  1|  1|true|
|  1| 1 |true|
+---+---+----+



Spark is telling me that an integer and a string are equal?? Eaven if the strings are different?? What kind of behaviour is this??
With doric, this code it woudn't compile.

In [8]:
dfEq.withColumn("eq", getInt("int") === getString("str")).show

cmd8.sc:1: The type $ScalaType cant be casted as a literal for $T.
val res8 = dfEq.withColumn("eq", getInt("int") === getString("str")).show
                                               ^Compilation Failed

: 

And if we were wrong, and expected "int" column as a string, in runtime we would get:

In [9]:
dfEq.withColumn("eq", getString("int") === getString("str")).show

: 

Learning spark is hard, we don't want to also learn all the possible places that can allow us invalid code as valid, and all the magic transformations, so let us do it in our way

In [10]:
dfEq.withColumn("eq", getInt("int").castTo[String] === getString("str")).show

+---+---+-----+
|int|str|   eq|
+---+---+-----+
|  1|  1| true|
|  1|  1|false|
|  1| 1 |false|
+---+---+-----+



In [11]:
dfEq.withColumn("eq", getInt("int") === getString("str").warningCastTo[Int]).show
// its a warning cast to because you must know that it can put a null if it can be done

+---+---+----+
|int|str|  eq|
+---+---+----+
|  1|  1|true|
|  1|  1|true|
|  1| 1 |true|
+---+---+----+



This is a accelerator for the development, we have a located point of possible error in runtime, and if they pass it, it will run as expected. And with the knowledge of what function we can apply to what column.

## 3. Aggregation of multiple errors with improved location of the error place

Ok, we know what functions can be the reason to a runtime error, but, spark API is a fail fast API, and we have to run again and again to detect a single error, and when fixed, rerun again.

In [12]:
val dfadd = List((1,2),(3,4)).toDF("int1", "int2")

[36mdfadd[39m: [32mDataFrame[39m = [int1: int, int2: int]

In [13]:
dfadd.withColumn("add", col("Int_1") + col("Int_2")).show

: 

D'oh, i wrote wrong a column, let's fix it and run it again.

In [14]:
dfadd.withColumn("add", col("int1") + col("Int_2")).show

: 

In [14]:
next error... ok, let's fix it:

(console):1:1 expected end-of-input
next error... ok, let's fix it:
^

: 

In [15]:
dfadd.withColumn("add", col("int1") + col("int2")).show

+----+----+---+
|int1|int2|add|
+----+----+---+
|   1|   2|  3|
|   3|   4|  7|
+----+----+---+



Doric can help us in this cases aggregating all the errors found in a dataframe transformation, in a single exception:

In [16]:
dfadd.withColumn("add", getInt("int_1") + getInt("int_2")).show

: 

Luckly my logic is very simple, and all my columns can fit in the withColumn call, but imagine that we have to split them, due to a separation of the logic and the execution:

In [17]:
val addColumns = col("int_1") + col("int2")

dfadd.withColumn("add", addColumns).show

: 

I know its very simple to see in this logic, but the exception, marks the error in the `withColumn` method, but the real error is in other place, in the moment we asked for a unexisting column. Spark will give you a hint, but you must dive into your code to find the exact place.

With doric, we can simplify it, every method that can be the reason to an error is indexed and marked in the code.

In [18]:
val addColumns = getInt("int_1") + getInt("int2")

dfadd.withColumn("add", addColumns).show

: 

Let me copy the message of the error here again:

```
habla.doric.DoricMultiError: found 1 errors
Cannot resolve column name "int_1" among (int1, int2)
	located at . (cmd25.sc:1)
```

cmd25 is the name of the file, and the line number it contains the error is number 1, if Doric could, it would fix your code, but it has to leave something for the rest. And if you are using a IDE to develop, it will create a hiperlink to the line fo the error.

Lets try something "harder":

In [19]:
val col1 = getInt("int_1")
val col2 = getString("int2").warningCastTo[Int]
val addColumns = col1 + col2

dfadd.withColumn("add", addColumns).show

: 

See, each error explainded and located. Right on the target.

Ok, but will be asking for yourself, What is the dark side? Nothing, trully nothing, all your optimizations will work as pure Dataframe API, you still use your everydate Dataframe as normal if you want/need to.

## Alternatives to Doric

Doric is created to have a better and safer API for spark, but with the idea to be simple, and not a complete change for a spark developer.
Spark already has a typed API in scala with DataSet, but this API is not very frendly with all the optimizations.

We also have [Typelevel frameless](https://github.com/typelevel/frameless), that has a great API, that keeps the Dataset typed, and is fully compatible with the spark optimizations.

But bouth DataSet and Frameless TypedDataset share a common idea, to keep the whole schema allways. In both cases it has to be recreated with a case class. For example, if we expect our Dataset with 3 columns, 2 of them as string for the name and surname, and the third as an integer for the age, we would need to model it with the following:

```scala
case class User(name: String, surname: String, age: Int)

val ds: DataSet[User] = ???
```
If we want to enrich our row with the city, we have to use a new Structure, so we need to declare another case class

```scala
case class UserWithCity(name: String, surname: String, age: Int, city: String)

val ds2: DataSet[UserWithCity] = ds.???
```
In both dataset and typedDataset this is required, the thing that changes is the method to call.

This `case class` creation is acceptable if we don't have to transform the schema of the DataSet a lot, but if you are a spark develper, is not the normal case.

Other problem of this way to work, is that we cant do functions that transform a DataSet of an inteface, i mean, if we have different Datasets, that have different schemas but share a couple of columns that we whant to transform with the same function something like:

```scala
def parseTimestamp(df: DataFrame): DataFrame = 
   df.withColumn("timestampParsed", parse(df("timestamp")))
```

This can be seen as an inteface:

```scala
trait WithTimestamp {
    val timestamp: String
}

trait WithTimestampParsed {
    val timestampParsed: Long
}

def parseTimestamp[T <: WithTimestamp](df: Dataset[T]): Dataset[T with WithTimestampParsed] =
   df.withColumn("timestampParsed", parse(df("timestamp")))
```

I wish this code was as simple as i putted, but in the real world its very hard for the developer to do this.

And this is the reason to create Doric, its a balance between static and type safety for columns, but keeping the DataFrame as a dynamic data structure. Allowing the developer with previous knowledge of spark to jump in almost instantly, and for somebody thats getting into spark, skip the common erros for the beginer.