# Spark Parser


### Catalyst Architecture

![Spark Architecture](https://raw.githubusercontent.com/dharmeshkakadia/dharmeshkakadia.github.io/master/images/spark-arch.png) 

### Need for a formally defined grammer and parser

Grammar+Parser helps in answering the following question: Is given statement complaint to the rules of the language. 
Some ``select * from sample`` is valid Spark statement or not?



### Why we need to understand parser 

* Will/Why is the given statement giving error on parsing?
* Is this a keyword in Spark or not? 
* Add new feature (say merge statement support)
* Build new tools, say our own editor for Spark..
* Generating automated tests from the grammer

### ANTLR

* ANother Tool for Language Recognition. 
* Parser generator for reading, processing, executing, or translating structured text.
* ANTLR generates a parser that can build and walk parse trees.
* Toolkit for building languages
* Used widely by many languages (Groovy, Cassandra, Hive, …)
* Spark parser first tries to parse it using SLL mode (Strong LL), which is faster. If that fails, it will try to parse it as LL.


### ANTLR grammar 

* Spark grammar is LL. 
* LL stands for Left to right parsing, deriving leftmost derivation.


* A grammar G = ( N, T, P, S ) is said to be strong LL(k) for some fixed natural number k if for all nonterminals A, and for any two distinct A-productions in the grammar. 
* The strong LL(k) grammars are a subset of the LL(k) grammars that can be parsed without knowledge of the left-context of the parse. That is, each parsing decision is based only on the next k tokens of the input for the current nonterminal that is being expanded. Or in other words, parsers that ignore the parser call stack for prediction are called Strong LL (SLL) parsers.


* **Spark parser first tries to parse it using SLL mode (Strong LL), which is faster. If that fails, it will try to parse it as LL.**


### ANTLR operators

```
| alternatives
. any character
? repeated zero or one time
+ repeated one or more times
* repeasted zero or more times
```

### Simple ANTLR end to end example

A very simple codebase that implements the following [grammer](https://github.com/dharmeshkakadia/hello-antlr/blob/master/src/main/antlr4/Hello.g4):

```antlr-java
grammar Hello;
msg   : 'Hello' ID;
ID  : [a-z]+ ;
WS  : [ \t\r\n]+ -> skip ;
```

Then we have to implement Visiter interface to decide what to do when we encounter/parse/traverse a node in the grammar

```java
public class HelloVister extends HelloBaseListener {
public void enterMsg(HelloParser.MsgContext ctx){
	System.out.println("Entering Msg : " + ctx.ID().getText());
}
	public void exitMsg(HelloParser.MsgContext ctx){
		System.out.println("Exiting Msg");
	}
}
```

Now we have to initilize the lexer, parser etc.

```java
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

public class Hello {
	public static void main(String[] args) {
		HelloLexer lexer = new HelloLexer(new ANTLRInputStream("Hello world"));
		HelloParser parser = new HelloParser(new CommonTokenStream(lexer));
		ParseTreeWalker visiter = new ParseTreeWalker();
		visiter.walk(new HelloVister(),parser.msg());
	}
}

```


This has a maven antlr integration, which takes care of generating the code for the grammar as well. To compile

```
mvn compile
```

To run it, use

```
mvn exec:java -q
```

which would print
```
Entering Msg : world
Exiting Msg

```

### Spark Grammer

* Spark Catalyst works over a specified grammar is defined by [SqlBase.g4](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4)

* Spark uses AnTLR v4 parser generator.


### Statement in Spark

```antlr-java
statement
    : query                                                            #statementDefault
    | USE db=identifier                                                #use
    | CREATE DATABASE (IF NOT EXISTS)? identifier
        (COMMENT comment=STRING)? locationSpec?
        (WITH DBPROPERTIES tablePropertyList)?                         #createDatabase
    | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     #setDatabaseProperties
    | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      #dropDatabase
    | createTableHeader ('(' colTypeList ')')? tableProvider
        (OPTIONS options=tablePropertyList)?
        (PARTITIONED BY partitionColumnNames=identifierList)?
        bucketSpec? locationSpec?
        (COMMENT comment=STRING)?
        (TBLPROPERTIES tableProps=tablePropertyList)?
        (AS? query)?                                                   #createTable
    | createTableHeader ('(' columns=colTypeList ')')?
        (COMMENT comment=STRING)?
        (PARTITIONED BY '(' partitionColumns=colTypeList ')')?
        bucketSpec? skewSpec?
        rowFormat?  createFileFormat? locationSpec?
        (TBLPROPERTIES tablePropertyList)?
        (AS? query)?                                                   #createHiveTable
    | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
        LIKE source=tableIdentifier locationSpec?                      #createTableLike
    | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
        (identifier | FOR COLUMNS identifierSeq)?                      #analyze
    | ALTER TABLE tableIdentifier
        ADD COLUMNS '(' columns=colTypeList ')'                        #addTableColumns
    | ALTER (TABLE | VIEW) from=tableIdentifier
        RENAME TO to=tableIdentifier                                   #renameTable
    | ALTER (TABLE | VIEW) tableIdentifier
        SET TBLPROPERTIES tablePropertyList                            #setTableProperties
    | ALTER (TABLE | VIEW) tableIdentifier
        UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList             #unsetTableProperties
    | ALTER TABLE tableIdentifier partitionSpec?
        CHANGE COLUMN? identifier colType colPosition?                 #changeColumn
    | ALTER TABLE tableIdentifier (partitionSpec)?
        SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)?     #setTableSerDe
    | ALTER TABLE tableIdentifier (partitionSpec)?
        SET SERDEPROPERTIES tablePropertyList                          #setTableSerDe
    | ALTER TABLE tableIdentifier ADD (IF NOT EXISTS)?
        partitionSpecLocation+                                         #addTablePartition
    | ALTER VIEW tableIdentifier ADD (IF NOT EXISTS)?
        partitionSpec+                                                 #addTablePartition
    | ALTER TABLE tableIdentifier
        from=partitionSpec RENAME TO to=partitionSpec                  #renameTablePartition
    | ALTER TABLE tableIdentifier
        DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE?    #dropTablePartitions
    | ALTER VIEW tableIdentifier
        DROP (IF EXISTS)? partitionSpec (',' partitionSpec)*           #dropTablePartitions
    | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec      #setTableLocation
    | ALTER TABLE tableIdentifier RECOVER PARTITIONS                   #recoverPartitions
    | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   #dropTable
    | DROP VIEW (IF EXISTS)? tableIdentifier                           #dropTable
    | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
        VIEW (IF NOT EXISTS)? tableIdentifier
        identifierCommentList? (COMMENT STRING)?
        (PARTITIONED ON identifierList)?
        (TBLPROPERTIES tablePropertyList)? AS query                    #createView
    | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
        tableIdentifier ('(' colTypeList ')')? tableProvider
        (OPTIONS tablePropertyList)?                                   #createTempViewUsing
    | ALTER VIEW tableIdentifier AS? query                             #alterViewQuery
    | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
        qualifiedName AS className=STRING
        (USING resource (',' resource)*)?                              #createFunction
    | DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName              #dropFunction
    | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
        statement                                                      #explain
    | SHOW TABLES ((FROM | IN) db=identifier)?
        (LIKE? pattern=STRING)?                                        #showTables
    | SHOW TABLE EXTENDED ((FROM | IN) db=identifier)?
        LIKE pattern=STRING partitionSpec?                             #showTable
    | SHOW DATABASES (LIKE pattern=STRING)?                            #showDatabases
    | SHOW TBLPROPERTIES table=tableIdentifier
        ('(' key=tablePropertyKey ')')?                                #showTblProperties
    | SHOW COLUMNS (FROM | IN) tableIdentifier
        ((FROM | IN) db=identifier)?                                   #showColumns
    | SHOW PARTITIONS tableIdentifier partitionSpec?                   #showPartitions
    | SHOW identifier? FUNCTIONS
        (LIKE? (qualifiedName | pattern=STRING))?                      #showFunctions
    | SHOW CREATE TABLE tableIdentifier                                #showCreateTable
    | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName            #describeFunction
    | (DESC | DESCRIBE) DATABASE EXTENDED? identifier                  #describeDatabase
    | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
        tableIdentifier partitionSpec? describeColName?                #describeTable
    | REFRESH TABLE tableIdentifier                                    #refreshTable
    | REFRESH (STRING | .*?)                                           #refreshResource
    | CACHE LAZY? TABLE tableIdentifier (AS? query)?                   #cacheTable
    | UNCACHE TABLE (IF EXISTS)? tableIdentifier                       #uncacheTable
    | CLEAR CACHE                                                      #clearCache
    | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
        tableIdentifier partitionSpec?                                 #loadData
    | TRUNCATE TABLE tableIdentifier partitionSpec?                    #truncateTable
    | MSCK REPAIR TABLE tableIdentifier                                #repairTable
    | op=(ADD | LIST) identifier .*?                                   #manageResource
    | SET ROLE .*?                                                     #failNativeCommand
    | SET .*?                                                          #setConfiguration
    | RESET                                                            #resetConfiguration
    | unsupportedHiveNativeCommands .*?                                #failNativeCommand
    ;



```

### Query definition in Spark

```antlr-java
querySpecification
    : (((SELECT kind=TRANSFORM '(' namedExpressionSeq ')'
        | kind=MAP namedExpressionSeq
        | kind=REDUCE namedExpressionSeq))
       inRowFormat=rowFormat?
       (RECORDWRITER recordWriter=STRING)?
       USING script=STRING
       (AS (identifierSeq | colTypeList | ('(' (identifierSeq | colTypeList) ')')))?
       outRowFormat=rowFormat?
       (RECORDREADER recordReader=STRING)?
       fromClause?
       (WHERE where=booleanExpression)?)
    | ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause?
       | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
       lateralView*
       (WHERE where=booleanExpression)?
       aggregation?
       (HAVING having=booleanExpression)?
       windows?)
    ;

```

### Constants in Spark

```antlr-java
constant
    : NULL                                                                                     #nullLiteral
    | interval                                                                                 #intervalLiteral
    | identifier STRING                                                                        #typeConstructor
    | number                                                                                   #numericLiteral
    | booleanValue                                                                             #booleanLiteral
    | STRING+                                                                                  #stringLiteral
    ;

```

### Spark grammer identifiers

Some samples.

Relational Operators :
```antlr-java
EQ  : '=' | '==';
NSEQ: '<=>';
NEQ : '<>';
NEQJ: '!=';
LT  : '<';
LTE : '<=' | '!>';
GT  : '>';
GTE : '>=' | '!<';

```

Arithmatic operators:

```antlr-java
PLUS: '+';
MINUS: '-';
ASTERISK: '*';
SLASH: '/';
PERCENT: '%';
DIV: 'DIV';
TILDE: '~';
AMPERSAND: '&';
PIPE: '|';
CONCAT_PIPE: '||';
HAT: '^';

```

String quotations:

```antlr-java
STRING
    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
    ;
```



### Data types in Spark

#### How data is represented in Spark
All the data types implements [Row API](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala), which represents one row of output from a relational operator. It allows both generic access by ordinal, which will incur boxing overhead for primitives, as well as native primitive access.


* Primitive types
    * String, Varchar, char
    * Float, Integer, Short, Double, Long
    * Byte
    * Binary
    * Boolean
    * Timestamp, Date

* Complex Types
    * Array
    * Struct
    * Map
    * Union
    
#### Formal definition of data types:

```antlr-java
dataType
    : complex=ARRAY '<' dataType '>'                            #complexDataType
    | complex=MAP '<' dataType ',' dataType '>'                 #complexDataType
    | complex=STRUCT ('<' complexColTypeList? '>' | NEQ)        #complexDataType
    | identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')?  #primitiveDataType
    ;

```

#### Spark types to Java types

Here is how Spark data types maps to Java types

* BooleanType -> java.lang.Boolean
* ByteType -> java.lang.Byte
* ShortType -> java.lang.Short
* IntegerType -> java.lang.Integer
* FloatType -> java.lang.Float
* DoubleType -> java.lang.Double
* StringType -> String
* DecimalType -> java.math.BigDecimal
* DateType -> java.sql.Date
* TimestampType -> java.sql.Timestamp
* BinaryType -> byte array
* ArrayType -> scala.collection.Seq (use getList for java.util.List)
* MapType -> scala.collection.Map (use getJavaMap for java.util.Map)
* StructType -> org.apache.spark.sql.Row


## Hands on with Spark Parser

Lets look at creating expression trees manually and execute it. We will run select statement with a condition on a temporary table.


In [77]:
%%info


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1510699372204_0025,spark,dead,Link,,
2,application_1511309191625_0009,spark,idle,Link,Link,✔


In [78]:
org.apache.spark.sql.SparkSession.setActiveSession(spark)

### Original query


In [79]:
List((1),(2),(3),(4),(5)).toDF("id").createOrReplaceGlobalTempView("data")
sql("select * from global_temp.data where id < 4").show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
+---+

### Defining data

Lets define data and translate it into the internal representation spark uses for the data type.

In [80]:
val data =List(1,2,3,4,5).map(value => org.apache.spark.sql.catalyst.InternalRow.apply(value))

data: List[org.apache.spark.sql.catalyst.InternalRow] = List([1], [2], [3], [4], [5])

### Defining schema

Since we are creating the tree manually, we need to specify the datatypes, that normally would be resolved by catalog.

In [81]:
val schema = org.apache.spark.sql.catalyst.expressions.AttributeReference("id",org.apache.spark.sql.types.IntegerType)()

schema: org.apache.spark.sql.catalyst.expressions.AttributeReference = id#75

### Attaching schema to data

Now that we have both data and schema, lets create a local relation. LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.


In [82]:
val localRelation = org.apache.spark.sql.catalyst.plans.logical.LocalRelation(Seq(schema),data)

localRelation: org.apache.spark.sql.catalyst.plans.logical.LocalRelation =
LocalRelation [id#75]

### Defining filter expression

Now we have to add the filter "id < 4" to our plan. Lets define the filter first. We will use LessThan expression with literal "4"


In [83]:
val filterExpression = org.apache.spark.sql.catalyst.expressions.LessThan(schema,org.apache.spark.sql.catalyst.expressions.Literal(4))

filterExpression: org.apache.spark.sql.catalyst.expressions.LessThan = (id#75 < 4)

### Creating logical plan/expression tree

Now lets create logical plan by applying the above filter to the local relation that we just created.

In [84]:
val phyzicalPlans = spark.sessionState.planner.plan(logicalPlan)

phyzicalPlans: Iterator[org.apache.spark.sql.execution.SparkPlan] = non-empty iterator

### Executing the plan

Let's execute the first plan that is returned by the planner. (For our simple logical plan, we actually have only one physical plan).

In [85]:
val results = phyzicalPlans.next.execute.collect

results: Array[org.apache.spark.sql.catalyst.InternalRow] = Array([0,1], [0,2], [0,3])

we now have our results of 3 rows with values 1,2,3. You can confirm by printing them 

In [86]:
 results.foreach(println)

[0,1]
[0,2]
[0,3]

## Generating test queries automatically

We can use this ability to manipulate expression trees to generate test cases automatically. Here we will see how to generate different comparision operators for above schema and execute them.

In [87]:
val data =List(1,2,3,4,5).map(value => org.apache.spark.sql.catalyst.InternalRow.apply(value))
val schema = org.apache.spark.sql.catalyst.expressions.AttributeReference("id",org.apache.spark.sql.types.IntegerType)()
val localRelation = org.apache.spark.sql.catalyst.plans.logical.LocalRelation(Seq(schema),data)


for (expression <- List(org.apache.spark.sql.catalyst.expressions.LessThan, org.apache.spark.sql.catalyst.expressions.LessThanOrEqual, org.apache.spark.sql.catalyst.expressions.GreaterThan, org.apache.spark.sql.catalyst.expressions.EqualTo)){
    val filterExpression = expression(schema,org.apache.spark.sql.catalyst.expressions.Literal(4))
    val logicalPlan = org.apache.spark.sql.catalyst.plans.logical.Filter(filterExpression,localRelation)

    val phyzicalPlans = spark.sessionState.planner.plan(logicalPlan)
    val results = phyzicalPlans.next.execute.collect
    println(expression)
    results.foreach(println)
}

LessThan
[0,1]
[0,2]
[0,3]
LessThanOrEqual
[0,1]
[0,2]
[0,3]
[0,4]
GreaterThan
[0,5]
EqualTo
[0,4]