### Spark Execution modes

1. Interactive Client (Spark-shell, notebooks) Used for exploration
2. Submit Job (spark-submit, Databricks, RestAPI) In production cluster

### Processing Model
Aplies a master-slave process. When we submit our application to spark, it would create a master(Driver) process. And this master process creates slave(Executor) process. The driver and executors are assigned by Cluster manager (YARN).

**Spark Cluster Manager** 
1. local[n] (n refers multi threading) if n=3, driver=1 and executor =2 
2. YARN
3. Kubernetes
4. Mesos
5. Standalone

**-----------------------------------------------------------------------------------------------------------**


### Spark Programming model

**Spark Session** (is the driver)
- when we launch a shell, usually the sessions are created and are available as 'spark'
- But when writing program, we need manually write it

```
spark = SparkSession.builder\
        .appName("Hello word")\
        .master("local[3]")\
        .getOrCreate()
        
spark.stop()
```

We can configure Spark Sessions via:
1. Environment variables (precendence 4)
2. SPARK_HOME\conf\spark-defaults.conf (precendence 3)
3. spark-submit command-line options (precendence 2)
4. SparkConf Object (Applocation code) (precendence 1)

**Spark DataFrame**

1. Read 
```
spark_df = spark.read
                .option("header","true")
                .option("inferSchema","true")
                .csv(sys.args[1])
```

**Spark DataFrame Partitions**
- logical small in memory dataframe. The data is partitioned on your HDFS.
- the partitions loading into the executors are handles by Resource Manager.

**Spark Transformation**
- **Spark dataframes are immutable**

```
filtered_df = survey_df.where("Age" < 40).select("Name","age").groupby("Country")

filtered_df.collect()

```

**Transformation**
1. Narrow dependencies - Transformation performed independently on a single partition to produce valid results. (Example: where)
2. Wide dependency - A transformation that requires data from other partitions to produce valid results. (Example: group by). This performs Shuffle and sort exchange which is internally managed.

**Lazy evaluation**
Transformations are lazy

**Spark Action**
read, write, collect, show are examples. This triggers the spark execution and are not lazy. Actions are converted to *jobs*.

### Understanding the execution plan

App code --> Jobs --> stages --> tasks 

**-----------------------------------------------------------------------------------------------------------**

### Spark API

RDD APIS (core) --> catalyst Optimizer --> Spark SQL, Dataframe API, Dataset API (scala or java)

**Spark RDD**
Resilient distributed dataset. 
R -- fault tolerant
They lack a row column structure.

If you want spark RDD, then you need to use sparkContext
```
conf = SparkConf()\
        .setMaster("local[3]") \
        .setAppName("Hello")

--Option 1--
sc= SparkContext(conf=conf)

--Option 2--
spark = SparkSession \
        .builder\
        .config(conf=conf)\
        .getOrCreate()
        
sc = spark.sparkContext 

linesRDD = sc.testFile(sys.argv[1])

```

Spark SQL engine\Catalyst optimizer
1. Analysis
2. Logical optimization
3. Physical planning (set of RDD operation)
4. Code generation (generate efficient byte code)

**-----------------------------------------------------------------------------------------------------------**

### Spark data sources and sinks

DataFrameReader API

Example:
```
spark.read
    .format("csv")
    .option("heaader","true")
    .option("path","/data/mycsvfiles/")
    .option("mode","FAILFAST")
    .schema(mySchema)
    .load()
```

Read Mode
1. PERMISSIVE  -- all fields are null when encounters a corrupted the record
2. DROPMALFORMED -- drops
3. FAILFAST -- raises exception

Schema inference doesn't work well with csv and JSON.
*Parquet* comes with **in built schema info**


Spark schema definition
1. Programmatically
2. Using DDL scripts (Column datatype seperated by,)

DataFrameWriter API

```
DataFrame.write
        .format("parquet")
        .mode(SaveMode) --given below
        .option("path", "/data/flights/")
        .save()
```

**Save Modes**
1. append
2. overwrite
3. errorIfExists
4. ignore

Spark File Layout
1. Number of files and file size
2. Partitions and Buckets
3. Sorted data  


- Number of files created depends upon dataframe partitions
- `MaxRecordsPerFile` is used controls the number of rows or file size

We can create database
1. Tables
2. Views

Spark allows us to create two sets of tables Managed tables(stored inside the warehouse directory) and unmanaged tables (External table)

```
Add .enableHiveSupport() to spark session

spark.catalog.setCurrentDatabase("AIRLINE_DB")
filghtdf.write.mode("overwrite").saveAsTable("flight_data_tble")
```
**-----------------------------------------------------------------------------------------------------------**

In [None]:
### Data transformation

