# Databricks Demo - Comprehensive Guide
### SQL, PySpark, and Spark-Scala Examples for All Operations

## 1. DBFS Operations

#### SQL - Query Delta Table

In [0]:
%sql
SELECT * FROM delta.`/databricks-datasets/nyctaxi-with-zipcodes/subsampled`

#### PySpark - List Files

In [0]:
files = dbutils.fs.ls("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")
display(files)

#### Scala - List Files

In [0]:
%scala
val files = dbutils.fs.ls("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")
display(files)

## 2. CREATE TABLE Operations

#### SQL - Create Table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS students (id INT, name STRING, value DOUBLE)

#### PySpark - Create Table

In [0]:
from pyspark.sql.types import *
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType()), StructField("value", DoubleType())])
empty_df = spark.createDataFrame([], schema)
empty_df.write.format("delta").mode("ignore").saveAsTable("students")

#### Scala - Create Table

In [0]:
%scala
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("id", IntegerType), StructField("name", StringType), StructField("value", DoubleType)))
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
emptyDF.write.format("delta").mode("ignore").saveAsTable("students")

## 3. INSERT Operations

#### SQL - Insert Records

In [0]:
%sql
INSERT INTO students VALUES (1, "Yve", 1.0), (2, "Omar", 2.5), (3, "Elia", 3.3)

#### PySpark - Insert Records

In [0]:
from pyspark.sql import Row
new_data = [Row(id=4, name="Ted", value=4.7), Row(id=5, name="Tiffany", value=5.5)]
new_df = spark.createDataFrame(new_data)
new_df.write.format("delta").mode("append").saveAsTable("students")

#### Scala - Insert Records

In [0]:
%scala
import spark.implicits._
val newData = Seq((6, "Vini", 6.3), (7, "Alice", 7.8)).toDF("id", "name", "value")
newData.write.format("delta").mode("append").saveAsTable("students")

## 4. SELECT Operations

#### SQL - Select Records

In [0]:
%sql
SELECT * FROM students

#### PySpark - Select Records

In [0]:
df = spark.table("students")
display(df)

#### Scala - Select Records

In [0]:
%scala
val df = spark.table("students")
display(df)

## 5. UPDATE Operations

#### SQL - Update Records

In [0]:
%sql
UPDATE students SET value = value + 1 WHERE name LIKE "T%"

#### PySpark - Update Records

In [0]:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "students")
deltaTable.update(condition="name LIKE 'T%'", set={"value": "value + 1"})

#### Scala - Update Records

In [0]:
%scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "students")
deltaTable.update(expr("name LIKE 'T%'"), Map("value" -> expr("value + 1")))

## 6. DELETE Operations

#### SQL - Delete Records

In [0]:
%sql
DELETE FROM students WHERE value > 6

#### PySpark - Delete Records

In [0]:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "students")
deltaTable.delete("value > 6")

#### Scala - Delete Records

In [0]:
%scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "students")
deltaTable.delete(expr("value > 6"))

## 7. MERGE Operations (UPSERT)

#### SQL - Create Updates and Merge

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW updates(id, name, value, type) AS VALUES
(2, "Omar", 15.2, "update"), (3, "", null, "delete"), (7, "Blue", 7.7, "insert")

In [0]:
%sql
MERGE INTO students b USING updates u ON b.id=u.id
WHEN MATCHED AND u.type = "update" THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete" THEN DELETE
WHEN NOT MATCHED AND u.type = "insert" THEN INSERT *

#### PySpark - Merge Operation

In [0]:
from delta.tables import DeltaTable
from pyspark.sql import Row
updates_data = [Row(id=2, name="Omar", value=15.2, type="update"), Row(id=3, name="", value=None, type="delete")]
updates_df = spark.createDataFrame(updates_data)
deltaTable = DeltaTable.forName(spark, "students")
deltaTable.alias("b").merge(updates_df.alias("u"), "b.id = u.id").whenMatchedUpdate(condition="u.type = 'update'", set={"name": "u.name", "value": "u.value"}).whenMatchedDelete(condition="u.type = 'delete'").whenNotMatchedInsert(condition="u.type = 'insert'", values={"id": "u.id", "name": "u.name", "value": "u.value"}).execute()

#### Scala - Merge Operation

In [0]:
%scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = Seq((2, "Omar", 15.2, "update"), (3, "", 0.0, "delete")).toDF("id", "name", "value", "type")
val deltaTable = DeltaTable.forName(spark, "students")
deltaTable.as("b").merge(updatesDF.as("u"), "b.id = u.id").whenMatched(expr("u.type = 'update'")).updateExpr(Map("name" -> "u.name", "value" -> "u.value")).whenMatched(expr("u.type = 'delete'")).delete().execute()

## 8. Table Introspection

#### SQL - DESCRIBE Operations

In [0]:
%sql
DESCRIBE EXTENDED students

In [0]:
%sql
DESCRIBE DETAIL students

In [0]:
%sql
DESCRIBE HISTORY students

#### PySpark - Table Information

In [0]:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "students")
history_df = deltaTable.history()
display(history_df)

#### Scala - Table Information

In [0]:
%scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "students")
val historyDF = deltaTable.history()
display(historyDF)

## 9. Time Travel

#### SQL - Time Travel by Version

In [0]:
%sql
SELECT * FROM students VERSION AS OF 3

#### PySpark - Time Travel

In [0]:
df_version = spark.read.format("delta").option("versionAsOf", 3).table("students")
display(df_version)

#### Scala - Time Travel

In [0]:
%scala
val dfVersion = spark.read.format("delta").option("versionAsOf", 3).table("students")
display(dfVersion)

## 10. VACUUM Operations

#### SQL - VACUUM

In [0]:
%sql
VACUUM students RETAIN 168 HOURS DRY RUN

#### PySpark - VACUUM

In [0]:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "students")
deltaTable.vacuum(168)

#### Scala - VACUUM

In [0]:
%scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "students")
deltaTable.vacuum(168)

## 11. Data Ingestion

#### SQL - Read JSON

In [0]:
%sql
SELECT * FROM json.`/Volumes/workspace/default/input/data.json`

#### PySpark - Read JSON/CSV

In [0]:
json_df = spark.read.format("json").load("/Volumes/workspace/default/input/data.json")
display(json_df)

csv_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://bucket/file.csv")
display(csv_df)

#### Scala - Read JSON/CSV

In [0]:
%scala
val jsonDF = spark.read.format("json").load("/Volumes/workspace/default/input/data.json")
display(jsonDF)

val csvDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://bucket/file.csv")
display(csvDF)

## 12. JDBC Connections

#### SQL - JDBC Table

In [0]:
%sql
CREATE TABLE postgres_table USING JDBC OPTIONS (url="jdbc:postgresql://host:5432/db", dbtable="schema.table", user="user", password="pass")

#### PySpark - Read JDBC

In [0]:
jdbc_df = spark.read.format("jdbc").options(url="jdbc:postgresql://host:5432/db", dbtable="schema.table", user="user", password="pass", driver="org.postgresql.Driver").load()
display(jdbc_df)

#### Scala - Read JDBC

In [0]:
%scala
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:5432/db").option("dbtable", "schema.table").option("user", "user").option("password", "pass").option("driver", "org.postgresql.Driver").load()
display(jdbcDF)

## 13. Table Cloning

#### SQL - Deep and Shallow Clone

In [0]:
%sql
CREATE OR REPLACE TABLE students_deep_clone DEEP CLONE students

In [0]:
%sql
CREATE OR REPLACE TABLE students_shallow_clone SHALLOW CLONE students

#### PySpark - Clone Tables

In [0]:
spark.sql("CREATE OR REPLACE TABLE students_deep_clone DEEP CLONE students")
spark.sql("CREATE OR REPLACE TABLE students_shallow_clone SHALLOW CLONE students")

#### Scala - Clone Tables

In [0]:
%scala
spark.sql("CREATE OR REPLACE TABLE students_deep_clone DEEP CLONE students")
spark.sql("CREATE OR REPLACE TABLE students_shallow_clone SHALLOW CLONE students")

## 14. User-Defined Functions

#### SQL - Create UDF

In [0]:
%sql
CREATE FUNCTION foods_i_like(food STRING) RETURNS STRING
RETURN CASE
 WHEN food = "beans" THEN "I love beans"
 WHEN food = "potatoes" THEN "My favorite vegetable is potatoes"
 ELSE concat("Do you have recipes for ", food, "?")
END

#### PySpark - Create UDF

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def foods_i_like(food):
    if food == "beans": return "I love beans"
    elif food == "potatoes": return "My favorite vegetable is potatoes"
    else: return f"Do you have recipes for {food}?"

foods_udf = udf(foods_i_like, StringType())
spark.udf.register("foods_i_like", foods_i_like, StringType())

#### Scala - Create UDF

In [0]:
%scala
import org.apache.spark.sql.functions.udf
val foodsILike = (food: String) => food match {
  case "beans" => "I love beans"
  case "potatoes" => "My favorite vegetable is potatoes"
  case _ => s"Do you have recipes for $food?"
}
val foodsUDF = udf(foodsILike)
spark.udf.register("foods_i_like", foodsILike)

## 15. Streaming Data (Auto Loader)

#### PySpark - Auto Loader

In [0]:
(spark.readStream.format("cloudFiles")
 .option("cloudFiles.format", "csv")
 .option("cloudFiles.schemaLocation", "/Volumes/workspace/default/schema")
 .option("header", "true")
 .load("/Volumes/workspace/default/data")
 .writeStream.format("delta")
 .option("checkpointLocation", "/Volumes/workspace/default/checkpoint")
 .trigger(availableNow=True)
 .toTable("streaming_table"))

#### SQL - read_files()

In [0]:
%sql
CREATE OR REFRESH STREAMING TABLE streaming_table
AS SELECT * FROM STREAM read_files('/Volumes/workspace/default/data', format => 'csv', header => true)

#### Scala - Auto Loader

In [0]:
%scala
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.schemaLocation", "/Volumes/workspace/default/schema")
  .option("header", "true")
  .load("/Volumes/workspace/default/data")
  .writeStream.format("delta")
  .option("checkpointLocation", "/Volumes/workspace/default/checkpoint")
  .trigger(org.apache.spark.sql.streaming.Trigger.AvailableNow())
  .toTable("streaming_table")

## 16. Medallion Architecture (Bronze-Silver-Gold)

### Bronze Layer (Raw Data)

#### SQL - Bronze Table

In [0]:
%sql
CREATE TABLE bronze_insurance (policy_id INT, name STRING, policy_type STRING, premium DECIMAL(10,2), claim DECIMAL(10,2), created_at TIMESTAMP);
INSERT INTO bronze_insurance VALUES (101, 'Alice', 'Health', 500.00, NULL, CURRENT_TIMESTAMP), (102, 'Bob', 'Auto', 300.00, 1500.00, CURRENT_TIMESTAMP)

#### PySpark - Bronze Layer

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp
bronze_data = [Row(policy_id=101, name="Alice", policy_type="Health", premium=500.00, claim=None)]
bronze_df = spark.createDataFrame(bronze_data).withColumn("created_at", current_timestamp())
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_insurance")

### Silver Layer (Cleaned Data)

#### SQL - Silver Table

In [0]:
%sql
CREATE TABLE silver_insurance AS SELECT policy_id, name, policy_type, premium, claim FROM bronze_insurance WHERE premium IS NOT NULL

#### PySpark - Silver Layer

In [0]:
bronze_df = spark.table("bronze_insurance")
silver_df = bronze_df.select("policy_id", "name", "policy_type", "premium", "claim").filter("premium IS NOT NULL")
silver_df.write.format("delta").mode("overwrite").saveAsTable("silver_insurance")

### Gold Layer (Aggregated Metrics)

#### SQL - Gold Table

In [0]:
%sql
CREATE TABLE gold_insurance AS SELECT policy_type, COUNT(*) AS total_policies, SUM(premium) AS total_premium, SUM(COALESCE(claim, 0)) AS total_claims FROM silver_insurance GROUP BY policy_type

#### PySpark - Gold Layer

In [0]:
from pyspark.sql.functions import count, sum, coalesce, lit
silver_df = spark.table("silver_insurance")
gold_df = silver_df.groupBy("policy_type").agg(count("*").alias("total_policies"), sum("premium").alias("total_premium"), sum(coalesce("claim", lit(0))).alias("total_claims"))
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_insurance")

## Summary

This notebook demonstrates all major Databricks operations in SQL, PySpark, and Spark-Scala:

1. DBFS Operations
2. CREATE TABLE
3. INSERT
4. SELECT
5. UPDATE
6. DELETE
7. MERGE (UPSERT)
8. Table Introspection (DESCRIBE)
9. Time Travel
10. VACUUM
11. Data Ingestion (JSON/CSV)
12. JDBC Connections
13. Table Cloning
14. User-Defined Functions
15. Streaming (Auto Loader)
16. Medallion Architecture (Bronze-Silver-Gold)