# <font color=Blue>Databricks</font>

- Azure Databricks is a popular cloud-based data analytics service offered by Microsoft Azure
- It allows you to perform data analytics on huge amounts of data on Azure
- Azure Databricks cluster uses Spark Standalone cluster
- Control pane holds metadata information like, databricks web app, notebooks, jobs & queris, cluster manager
- Compute pane holds data, Vnet, cluster

![databricks](./Databricks.png)

## 1) Databricks Cluster

* Cluster is a set of computation resources and configurations to run your workloads
* There are 2 types of cluster
     - All purpose Cluster
     - Job Cluster

### 1.1) All Purpose Cluster

* To interactively run the commands in your notebook
* Multiple users can share such clusters to do collaborative interactive analysis
* You can terminate, restart, attach, detach these clusters to multiple notebooks
* You can choose:
    * Multi-Node cluster: Driver and executor nodes will be on seperate machine
    * Single Node Cluster: Only there will be a single Driver with single machine

### 1.2) Job Cluster

* To run the job that you can run as a automated workflows
* It runs a new job cluster and terminate the cluster automatically when the job is complete
* You cannot restart a job cluster

## 2) DBUtils

* Databricks provides set of utilities to efficiently interact with your notebook. The most commanly used DBUtils are
    * File system Utilities
    * Widget Utilities
    * Notebook Utilities

dbutils.widgets.text(name='text_name', defaultvalue='', label='Text Label')

res = dbutils.widgets.get(text_name')

# <font color=Blue>Delta Lake</font>

## Drawbacks of ADLS

1. No ACID properties
2. Job failures leads to inconsistent data
3. Simultaneous write on same folder brings incorrect results
4. No schema enforcement
5. No support for updates (update & delete)
6. No support for versioning
7. Data quality issues

## 1) What is Delta Lake

* Open source storage framework that brings reliability to data lakes
* Brings **transactional** capabilities to data lakes
* Runs on top of your existing data lake and support **parquet**
* Enables **LakeHouse** architecture
* Using **Delta Lake** we can implement LakeHouse architecture

abfss://container@storage_account.dfs.core.windows.net/folder

### Read CSV from ADLS 

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

schema_1 = StructType([StructField("emp_name", StringType()),
                       StructField("emp_id", IntegerType()),
                       StrcutField("gender", StringType())    
                    ])

df = (spark.read.format("csv").option("header", "true") \
                            .schema(schema_1)
                            .load("abfss://container@adlsstorage.dfs.core.windows.net/folder/*.csv"))

### Write to parquet

df.write.format("parquet") \
        .mode("overwrite") \
        .save("abfss://container@storage.dfs.core.windows.net/folder/")

### Reading parquet file

df_1 = spark.read.format("parquet") \
                .load("abfss://container@storage.dfs.core.windows.net/folder/")

## 2) Create Delta Lake

df.write.format("delta") \
        .mode("overwrite") \
        .save("abfss://container@storage.dfs.core.windows.net/folder/")

* when we create delta format file, there will be two files
* 1) _delta_log folder
  2) snappy.parquet file
 <br>
* **_delta_log** folder creates delta lake. It contains
* 1.1) _tmp_path_dir folder
* 1.2) .crc checksum file
* 1.3) .json file

### Reading Delta file

In [None]:
df = spark.read.format("delta") \
                .load("abfss://container@storage.dfs.core.windows.net/folder/")

### Create Delta Table

In [None]:
df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("`schema_name`.table_name")

### Schema Enforcement

* Delta Lake uses Schema Validation on **Writes**

#### Schema enforcement Rules

* Can't contain any additional columns that are not present in the target table's schema
* Can't have different data type from the data type in the target table

### Schema Evolution

* It allows changes for additional columns

In [None]:
df.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("`schema_name`.table_name")

* It allows changes for different schema

In [None]:
df.writ.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true")
        .saveAsTable("`schema_name`.table_name")


### Versioning

### By using **versionAsOf**

In [None]:
df = spark.read.format("delta") \
                .option("versionAsOf", "1")
                .load("dbfs:/user/hive/warehouse/db_name.db/table_name/

In [None]:
df = spark.read.format("delta") \
                .option("timestampAsOf", "1")
                .load("dbfs:/user/hive/warehouse/db_name.db/table_name/

## Upsert in Delta Lake

### Upsert using Merge

In [None]:
MERGE INTO `bd_name`.Dest_Table_name as Dest
USING Source_Table_Name as Source
    ON Dest.key_col_name = Source.key_col_name
  WHEN MATCHED 
    THEN UPDATE SET
  Dest.col_1 = Source.Col_1,
  Dest.col_2 = Source.col_2,
    .           .
    .           .
  Dest.last_col = Source.last_col

  WHEN NOT MATCHED
    THEN INSERT
       (col_1, col_2, col_3, col_4,...... last_col)
        VALUES(Source.col_1, Source.col_2, Source.col_3,.......,Source.last_col)

# <font color=Blue>Unity Catalog</font>

* A centralized location where you can manage Users, Governance, Audit, Metadata management etc
* Unity Catelog gives a unified governance layer into data & ai with a single permission model and it will give data sharing feature
* Access Control, Lineage, Discovery, Monitoring, Auditing, Sharing, Metadata management

### Metastore

* Metastore is a top level container in unity catalog. Within in Metastore a Unity catakog provides a 3 Level namespace for organizing a data
* Catalog, Schema (database), Table
* Only one metastore for one region

# <font color=Blue>Spark Structured Streaming</font>

* Schema must be specified

In [None]:
df = spark.readStream.format("csv") \
                    .option("header", "true") \
                    .schema(schema) \
                    .load("abfss://folder/path/")

* most of the actions are not workinng for streaming
* you can't read a file directly. Always read a folder

* File source
* Kafka source
* Table Source
* Socket Source (UTF-8)
* Rate Source

* File Sink
* Kafks Sink
* Table Sink
* Foreach Sink
* Console Sink

## Checkpoint

* To develop fault-tolerant and resilient saprk applications
* It maintains intermediate state
* It must be unique

In [None]:
write = df.writeStream.option("checkpointLocation", "checkpoit/path/") \
            .outputMode("append") \
            .queryName("appendQuey") \
            .toTable("schema_name.tablename")

## outputMode

* append
* complete
* update

## trigger

In [2]:
write = df.writeStream.option("checkpointLocation", "checkpoit/path/") \
            .outputMode("append") \
            .trigger(processingTime = "2 minutes") \
            .queryName("appendQuey") \
            .toTable("schema_name.tablename")

NameError: name 'df' is not defined

In [None]:
write = df.writeStream.option("checkpointLocation", "checkpoit/path/") \
            .outputMode("append") \
            .trigger(availableNow = True) \
            .queryName("appendQuey") \
            .toTable("schema_name.tablename")

# <font color=Blue>Auto Loader</font>

In [None]:
df = spark.readStream.format("cloudFiles") \
                    .option("coludFiles.format", "csv") \
                    .option("clodFiles.schemaLocation", "path") \
                    .option("cloudFiles.inferColumnTypes", "true") \
                    .option("header", "true") \
                    .load("source_path")