**Employees along with their department names**

In [0]:
SELECT e.EMPNO, e.ENAME, d.DNAME
FROM EMP e
JOIN DEPT d ON e.DEPTNO = d.DEPTNO;


In [0]:
emp_dept = emp.join(dept, emp.DEPTNO == dept.DEPTNO, "inner") \
.select(emp.EMPNO, emp.ENAME, dept.DNAME)
emp_dept.show()


**Employees who do not belong to any department**

In [0]:
SELECT e.EMPNO, e.ENAME
FROM EMP e
LEFT JOIN DEPT d ON e.DEPTNO = d.DEPTNO
WHERE d.DEPTNO IS NULL;




In [0]:
emp_no_dept = emp.join(dept, emp.DEPTNO == dept.DEPTNO, "left_anti") \
.select("EMPNO", "ENAME")
emp_no_dept.show()


**Employees who earn more than their manager**

In [0]:
SELECT e.ENAME AS Employee, e.SAL AS Emp_Sal, m.ENAME AS Manager, m.SAL AS Mgr_Sal
FROM EMP e
JOIN EMP m ON e.MGR = m.EMPNO
WHERE e.SAL > m.SAL;

In [0]:
emp_mgr = emp.alias("e").join(emp.alias("m"), col("e.MGR") == col("m.EMPNO")) \
    .filter(col("e.SAL") > col("m.SAL")) \
    .select(col("e.ENAME").alias("Employee"),
            col("e.SAL").alias("Emp_Sal"),
            col("m.ENAME").alias("Manager"),
            col("m.SAL").alias("Mgr_Sal"))
emp_mgr.show()


**Departments without employees**

In [0]:
SELECT d.DEPTNO, d.DNAME
FROM DEPT d
LEFT JOIN EMP e ON d.DEPTNO = e.DEPTNO
WHERE e.EMPNO IS NULL;


In [0]:
dept_no_emp = dept.join(emp, dept.DEPTNO == emp.DEPTNO, "left_anti") \
                  .select("DEPTNO", "DNAME")
dept_no_emp.show()


**Employees with salary > company average salary**

In [0]:
SELECT EMPNO, ENAME, SAL
FROM EMP
WHERE SAL > (SELECT AVG(SAL) FROM EMP);


In [0]:
company_avg = emp.agg({"SAL": "avg"}).collect()[0][0]
above_avg = emp.filter(col("SAL") > company_avg)
above_avg.show()


**Employees with salary > department average salary**

In [0]:
SELECT e.EMPNO, e.ENAME, e.SAL, e.DEPTNO
FROM EMP e
WHERE e.SAL > (SELECT AVG(SAL) FROM EMP WHERE DEPTNO = e.DEPTNO)

In [0]:
from pyspark.sql.functions import avg

dept_avg = emp.groupBy("DEPTNO").agg(avg("SAL").alias("avg_sal"))
above_dept_avg = emp.join(dept_avg, "DEPTNO") \
 .filter(col("SAL") > col("avg_sal")) \
 .select("EMPNO", "ENAME", "SAL", "DEPTNO")
above_dept_avg.show()

**Employees working in same department as "SCOTT"**

In [0]:
SELECT ENAME, DEPTNO
FROM EMP
WHERE DEPTNO = (SELECT DEPTNO FROM EMP WHERE ENAME = 'SCOTT');


In [0]:
scott_dept = emp.filter(col("ENAME") == "SCOTT").select("DEPTNO").collect()[0][0]
same_dept = emp.filter(col("DEPTNO") == scott_dept).select("ENAME", "DEPTNO")
same_dept.show()


**Employees whose salary is in the top 3 of their department**

In [0]:
-- Select employee number, name, salary, and department number
SELECT EMPNO, ENAME, SAL, DEPTNO
FROM (
    -- Subquery to rank employees within each department by salary in descending order
    SELECT EMPNO, ENAME, SAL, DEPTNO,
           DENSE_RANK() OVER(PARTITION BY DEPTNO ORDER BY SAL DESC) AS rnk
    FROM EMP
) t
-- Filter to get only the top 3 employees by salary in each department
WHERE rnk <= 3;

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

w = Window.partitionBy("DEPTNO").orderBy(col("SAL").desc())
top3 = emp.withColumn("rnk", dense_rank().over(w)).filter(col("rnk") <= 3)
top3.select("EMPNO", "ENAME", "SAL", "DEPTNO").show()

-- top3 = emp.withColumn("rnk", dense_rank().over(w)).filter((col("rnk") <= 3 & "deptno"=="20"))


**Department with the maximum number of employees**

In [0]:
SELECT DEPTNO, COUNT(*) AS emp_count
FROM EMP
GROUP BY DEPTNO
ORDER BY emp_count DESC
FETCH FIRST 1 ROWS ONLY;


In [0]:
dept_max = emp.groupBy("DEPTNO").count() \
              .orderBy(col("count").desc()) \
              .limit(1)
dept_max.show()


**Employees who joined before their manager**

In [0]:
SELECT e.ENAME AS Employee, e.HIREDATE AS Emp_Hire, 
       m.ENAME AS Manager, m.HIREDATE AS Mgr_Hire
FROM EMP e
JOIN EMP m ON e.MGR = m.EMPNO
WHERE e.HIREDATE < m.HIREDATE;


In [0]:
emp_mgr_hire = emp.alias("e").join(emp.alias("m"), col("e.MGR") == col("m.EMPNO")) \
    .filter(col("e.HIREDATE") < col("m.HIREDATE")) \
    .select(col("e.ENAME").alias("Employee"),
            col("e.HIREDATE").alias("Emp_Hire"),
            col("m.ENAME").alias("Manager"),
            col("m.HIREDATE").alias("Mgr_Hire"))
emp_mgr_hire.show()


**Find the maximum salary in each department**

In [0]:
SELECT DEPTNO, MAX(SAL) AS Max_Sal
FROM EMP
GROUP BY DEPTNO;


In [0]:
from pyspark.sql.functions import max

max_sal = emp.groupBy("DEPTNO").agg(max("SAL").alias("Max_Sal"))
max_sal.show()


**Find the minimum salary in each job role**

In [0]:
SELECT JOB, MIN(SAL) AS Min_Sal
FROM EMP
GROUP BY JOB;

In [0]:
from pyspark.sql.functions import min

min_sal = emp.groupBy("JOB").agg(min("SAL").alias("Min_Sal"))
min_sal.show()

**Find the average salary per department**

In [0]:
SELECT DEPTNO, AVG(SAL) AS Avg_Sal
FROM EMP
GROUP BY DEPTNO;


In [0]:
from pyspark.sql.functions import avg

avg_sal = emp.groupBy("DEPTNO").agg(avg("SAL").alias("Avg_Sal"))
avg_sal.show()


**Count employees per department**

In [0]:
SELECT DEPTNO, COUNT(*) AS Emp_Count
FROM EMP
GROUP BY DEPTNO;


In [0]:
emp_count = emp.groupBy("DEPTNO").count().withColumnRenamed("count", "Emp_Count")
emp_count.show()


**Count employees per department per job role**

In [0]:
SELECT DEPTNO, JOB, COUNT(*) AS Emp_Count
FROM EMP
GROUP BY DEPTNO, JOB;


In [0]:
emp_count_job = emp.groupBy("DEPTNO", "JOB").count().withColumnRenamed("count", "Emp_Count")
emp_count_job.show()

**Find total salary paid by each department**

In [0]:
SELECT DEPTNO, SUM(SAL) AS Total_Sal
FROM EMP
GROUP BY DEPTNO;


In [0]:
from pyspark.sql.functions import sum

total_sal = emp.groupBy("DEPTNO").agg(sum("SAL").alias("Total_Sal"))
total_sal.show()


**Find department with the highest average salary**

In [0]:
SELECT DEPTNO, AVG(SAL) AS Avg_Sal
FROM EMP
GROUP BY DEPTNO
ORDER BY Avg_Sal DESC
FETCH FIRST 1 ROWS ONLY;


In [0]:
highest_avg = emp.groupBy("DEPTNO").agg(avg("SAL").alias("Avg_Sal")) \
                 .orderBy(col("Avg_Sal").desc()) \
                 .limit(1)
highest_avg.show()


**Find employees earning above department average salary**

In [0]:
SELECT e.EMPNO, e.ENAME, e.SAL, e.DEPTNO
FROM EMP e
JOIN (
    SELECT DEPTNO, AVG(SAL) AS dept_avg
    FROM EMP
    GROUP BY DEPTNO
) d ON e.DEPTNO = d.DEPTNO
WHERE e.SAL > d.dept_avg;


In [0]:
dept_avg = emp.groupBy("DEPTNO").agg(avg("SAL").alias("dept_avg"))
above_avg = emp.join(dept_avg, "DEPTNO").filter(col("SAL") > col("dept_avg"))
above_avg.select("EMPNO", "ENAME", "SAL", "DEPTNO").show()


**Find salary difference between max & min per department**

In [0]:
SELECT DEPTNO, MAX(SAL) - MIN(SAL) AS Sal_Diff
FROM EMP
GROUP BY DEPTNO;


In [0]:
sal_diff = emp.groupBy("DEPTNO") \
 .agg((max("SAL") - min("SAL")).alias("Sal_Diff"))
sal_diff.show()


**Find total employees with commission > 0**

In [0]:
SELECT COUNT(*) AS Emp_With_Comm
FROM EMP
WHERE COMM > 0;


In [0]:
emp_with_comm = emp.filter(col("COMM") > 0).count()
print("Employees with commission > 0:", emp_with_comm)


**Find total employees with commission**

In [0]:
SELECT COUNT(*) AS Emp_With_Comm
FROM EMP
WHERE COMM > 0;


In [0]:
emp_with_comm = emp.filter(col("COMM") > 0).count()
print("Employees with commission > 0:", emp_with_comm)

**Find the top 3 salaries across the company**

In [0]:
SELECT EMPNO, ENAME, SAL
FROM (
    SELECT EMPNO, ENAME, SAL,
           DENSE_RANK() OVER (ORDER BY SAL DESC) AS rnk
    FROM EMP
) t
WHERE rnk <= 3;


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

w = Window.orderBy(emp["SAL"].desc())
top3 = emp.withColumn("rnk", dense_rank().over(w)) \
          .filter("rnk <= 3")
top3.show()


**Find the highest paid employee per department**

In [0]:
SELECT EMPNO, ENAME, DEPTNO, SAL
FROM (
    SELECT EMPNO, ENAME, DEPTNO, SAL,
           RANK() OVER (PARTITION BY DEPTNO ORDER BY SAL DESC) AS rnk
    FROM EMP
) t
WHERE rnk = 1;

In [0]:
from pyspark.sql.functions import rank

w = Window.partitionBy("DEPTNO").orderBy(emp["SAL"].desc())
highest_paid = emp.withColumn("rnk", rank().over(w)).filter("rnk = 1")
highest_paid.show()

**Find running total of salaries ordered by hire date**


In [0]:
SELECT EMPNO, ENAME, HIREDATE, SAL,
       SUM(SAL) OVER (ORDER BY HIREDATE) AS Running_Total
FROM EMP;


In [0]:
from pyspark.sql.functions import sum

w = Window.orderBy("HIREDATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
running_total = emp.withColumn("Running_Total", sum("SAL").over(w))
running_total.show()


**Find employees with rank based on salary (per department)**


In [0]:
SELECT EMPNO, ENAME, DEPTNO, SAL,
       RANK() OVER (PARTITION BY DEPTNO ORDER BY SAL DESC) AS sal_rank
FROM EMP;


In [0]:
w = Window.partitionBy("DEPTNO").orderBy(emp["SAL"].desc())
dept_rank = emp.withColumn("sal_rank", rank().over(w))
dept_rank.show()


**Find dense rank of employees across company by salary**


In [0]:
SELECT EMPNO, ENAME, SAL,
DENSE_RANK() OVER (ORDER BY SAL DESC) AS dense_rnk
FROM EMP;

In [0]:
w = Window.orderBy(emp["SAL"].desc())
dense_rnk = emp.withColumn("dense_rnk", dense_rank().over(w))
dense_rnk.show()


**Find employees earning above department median**

In [0]:
SELECT EMPNO, ENAME, SAL, DEPTNO
FROM EMP e
JOIN (
    SELECT DEPTNO,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SAL) AS dept_median
    FROM EMP
    GROUP BY DEPTNO
) d ON e.DEPTNO = d.DEPTNO
WHERE e.SAL > d.dept_median;


In [0]:
from pyspark.sql.functions import expr

dept_median = emp.groupBy("DEPTNO").agg(expr("percentile_approx(SAL, 0.5)").alias("dept_median"))
above_median = emp.join(dept_median, "DEPTNO").filter(emp["SAL"] > dept_median["dept_median"])
above_median.show()


**Find the Nth highest salary per department (say N=2)**

In [0]:
SELECT EMPNO, ENAME, DEPTNO, SAL
FROM (
    SELECT EMPNO, ENAME, DEPTNO, SAL,
           DENSE_RANK() OVER (PARTITION BY DEPTNO ORDER BY SAL DESC) AS rnk
    FROM EMP
) t
WHERE rnk = 2;


In [0]:
N = 2
w = Window.partitionBy("DEPTNO").orderBy(emp["SAL"].desc())
nth_highest = emp.withColumn("rnk", dense_rank().over(w)).filter(f"rnk = {N}")
nth_highest.show()


**Find cumulative distribution of salaries across company**

In [0]:
SELECT EMPNO, ENAME, SAL,
       CUME_DIST() OVER (ORDER BY SAL) AS cum_dist
FROM EMP;


In [0]:
from pyspark.sql.functions import cume_dist

w = Window.orderBy("SAL")
cumdist = emp.withColumn("cum_dist", cume_dist().over(w))
cumdist.show()


**Find lag/lead salary for each employee compared to colleague**

In [0]:
SELECT EMPNO, ENAME, SAL,
       LAG(SAL,1) OVER (ORDER BY SAL) AS prev_sal,
       LEAD(SAL,1) OVER (ORDER BY SAL) AS next_sal
FROM EMP;


In [0]:
from pyspark.sql.functions import lag, lead

w = Window.orderBy("SAL")
lag_lead = emp.withColumn("prev_sal", lag("SAL", 1).over(w)) \
              .withColumn("next_sal", lead("SAL", 1).over(w))
lag_lead.show()


**Implement SCD Type 1 for CUSTOMER (overwrite changed attributes)**

In [0]:
MERGE INTO CUSTOMER tgt
USING STG_CUSTOMER src
  ON tgt.CUST_ID = src.CUST_ID
WHEN MATCHED AND (
     COALESCE(tgt.NAME,'')       <> COALESCE(src.NAME,'')
  OR COALESCE(tgt.ADDRESS,'')    <> COALESCE(src.ADDRESS,'')
  OR COALESCE(tgt.PHONE,'')      <> COALESCE(src.PHONE,'')
) THEN UPDATE SET
  NAME    = src.NAME,
  ADDRESS = src.ADDRESS,
  PHONE   = src.PHONE,
  UPDATED_AT = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (CUST_ID, NAME, ADDRESS, PHONE, CREATED_AT, UPDATED_AT)
  VALUES (src.CUST_ID, src.NAME, src.ADDRESS, src.PHONE, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);


In [0]:
from delta.tables import DeltaTable

tgt = DeltaTable.forName(spark, "CUSTOMER")
src = spark.table("STG_CUSTOMER")

(tgt.alias("t").merge(src.alias("s"), "t.CUST_ID = s.CUST_ID")
 .whenMatchedUpdate(
     condition="""coalesce(t.NAME,'')<>coalesce(s.NAME,'')
                  OR coalesce(t.ADDRESS,'')<>coalesce(s.ADDRESS,'')
                  OR coalesce(t.PHONE,'')<>coalesce(s.PHONE,'')""",
     set={"NAME":"s.NAME","ADDRESS":"s.ADDRESS","PHONE":"s.PHONE","UPDATED_AT":"current_timestamp()"}
 )
 .whenNotMatchedInsert(values={
     "CUST_ID":"s.CUST_ID","NAME":"s.NAME","ADDRESS":"s.ADDRESS","PHONE":"s.PHONE",
     "CREATED_AT":"current_timestamp()","UPDATED_AT":"current_timestamp()"
 })
 .execute())


**Implement SCD Type 2 for CUSTOMER (track history)

Assumptions: target columns = CUST_ID, NAME, ADDRESS, PHONE, START_DATE, END_DATE, IS_CURRENT.**


In [0]:
-- 1) Close out current rows that changed
MERGE INTO CUSTOMER tgt
USING STG_CUSTOMER src
  ON tgt.CUST_ID = src.CUST_ID AND tgt.IS_CURRENT = 1
WHEN MATCHED AND (
     COALESCE(tgt.NAME,'')    <> COALESCE(src.NAME,'')
  OR COALESCE(tgt.ADDRESS,'') <> COALESCE(src.ADDRESS,'')
  OR COALESCE(tgt.PHONE,'')   <> COALESCE(src.PHONE,'')
) THEN UPDATE SET
  END_DATE   = CURRENT_DATE,
  IS_CURRENT = 0;

-- 2) Insert brand-new and changed-current versions
MERGE INTO CUSTOMER tgt
USING STG_CUSTOMER src
  ON tgt.CUST_ID = src.CUST_ID AND tgt.IS_CURRENT = 1
WHEN NOT MATCHED THEN
  INSERT (CUST_ID, NAME, ADDRESS, PHONE, START_DATE, END_DATE, IS_CURRENT)
  VALUES (src.CUST_ID, src.NAME, src.ADDRESS, src.PHONE, CURRENT_DATE, NULL, 1);


In [0]:
tgt = DeltaTable.forName(spark, "CUSTOMER")
src = spark.table("STG_CUSTOMER")

# 1) Close current records that changed
(tgt.alias("t").merge(src.alias("s"), "t.CUST_ID = s.CUST_ID AND t.IS_CURRENT = 1")
 .whenMatchedUpdate(
   condition="""coalesce(t.NAME,'')<>coalesce(s.NAME,'')
                OR coalesce(t.ADDRESS,'')<>coalesce(s.ADDRESS,'')
                OR coalesce(t.PHONE,'')<>coalesce(s.PHONE,'')""",
   set={"END_DATE":"current_date()","IS_CURRENT":"0"}
 ).execute())

# 2) Insert new current versions (new or changed)
(tgt.alias("t").merge(src.alias("s"), "t.CUST_ID = s.CUST_ID AND t.IS_CURRENT = 1")
 .whenNotMatchedInsert(values={
   "CUST_ID":"s.CUST_ID","NAME":"s.NAME","ADDRESS":"s.ADDRESS","PHONE":"s.PHONE",
   "START_DATE":"current_date()","END_DATE":"null","IS_CURRENT":"1"
 }).execute())


**Identify slowly changing attributes in EMP**

In [0]:
SELECT 'JOB'   AS column_name FROM EMP e JOIN STG_EMP s ON e.EMPNO=s.EMPNO
 WHERE COALESCE(e.JOB,'')   <> COALESCE(s.JOB,'')
UNION ALL
SELECT 'SAL'   FROM EMP e JOIN STG_EMP s ON e.EMPNO=s.EMPNO
 WHERE COALESCE(e.SAL,0)   <> COALESCE(s.SAL,0)
UNION ALL
SELECT 'MGR'   FROM EMP e JOIN STG_EMP s ON e.EMPNO=s.EMPNO
 WHERE COALESCE(e.MGR,0)   <> COALESCE(s.MGR,0)
UNION ALL
SELECT 'DEPTNO' FROM EMP e JOIN STG_EMP s ON e.EMPNO=s.EMPNO
 WHERE COALESCE(e.DEPTNO,0)<> COALESCE(s.DEPTNO,0);


In [0]:
e = spark.table("EMP")
s = spark.table("STG_EMP")
j = e.alias("e").join(s.alias("s"), "EMPNO")

changes = {
  "JOB":   (j["e.JOB"] != j["s.JOB"]),
  "SAL":   (j["e.SAL"] != j["s.SAL"]),
  "MGR":   (j["e.MGR"] != j["s.MGR"]),
  "DEPTNO":(j["e.DEPTNO"] != j["s.DEPTNO"]),
}
for colname, cond in changes.items():
    print(colname, j.filter(cond).limit(1).count() > 0)


**Incremental load: insert new + update changed employees from STG_EMP (SCD1)**


In [0]:
MERGE INTO EMP tgt
USING STG_EMP src
  ON tgt.EMPNO = src.EMPNO
WHEN MATCHED AND (
     COALESCE(tgt.ENAME,'')<>COALESCE(src.ENAME,'')
  OR COALESCE(tgt.JOB,'')  <>COALESCE(src.JOB,'')
  OR COALESCE(tgt.SAL,0)   <>COALESCE(src.SAL,0)
  OR COALESCE(tgt.MGR,0)   <>COALESCE(src.MGR,0)
  OR COALESCE(tgt.DEPTNO,0)<>COALESCE(src.DEPTNO,0)
) THEN UPDATE SET
  ENAME=src.ENAME, JOB=src.JOB, SAL=src.SAL, MGR=src.MGR, DEPTNO=src.DEPTNO, UPDATED_AT=CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (EMPNO, ENAME, JOB, SAL, MGR, DEPTNO, CREATED_AT, UPDATED_AT)
  VALUES (src.EMPNO, src.ENAME, src.JOB, src.SAL, src.MGR, src.DEPTNO, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);


In [0]:
from delta.tables import DeltaTable
tgt = DeltaTable.forName(spark, "EMP")
src = spark.table("STG_EMP")

(tgt.alias("t").merge(src.alias("s"), "t.EMPNO = s.EMPNO")
 .whenMatchedUpdate(
   condition="""coalesce(t.ENAME,'')<>coalesce(s.ENAME,'')
             OR coalesce(t.JOB,'')<>coalesce(s.JOB,'')
             OR coalesce(t.SAL,0)<>coalesce(s.SAL,0)
             OR coalesce(t.MGR,0)<>coalesce(s.MGR,0)
             OR coalesce(t.DEPTNO,0)<>coalesce(s.DEPTNO,0)""",
   set={"ENAME":"s.ENAME","JOB":"s.JOB","SAL":"s.SAL","MGR":"s.MGR","DEPTNO":"s.DEPTNO","UPDATED_AT":"current_timestamp()"}
 )
 .whenNotMatchedInsert(values={
   "EMPNO":"s.EMPNO","ENAME":"s.ENAME","JOB":"s.JOB","SAL":"s.SAL","MGR":"s.MGR","DEPTNO":"s.DEPTNO",
   "CREATED_AT":"current_timestamp()","UPDATED_AT":"current_timestamp()"
 })
 .execute())


**Partition EMP by DEPTNO for faster queries**

In [0]:
CREATE TABLE EMP_PARTITIONED
USING DELTA
PARTITIONED BY (DEPTNO)
AS SELECT * FROM EMP;


In [0]:
spark.table("EMP") \
     .write.format("delta") \
     .mode("overwrite") \
     .partitionBy("DEPTNO") \
     .saveAsTable("EMP_PARTITIONED")


**Optimize queries on EMP table with indexes**

In [0]:
CREATE INDEX idx_emp_deptno ON EMP (DEPTNO);
CREATE INDEX idx_emp_mgr    ON EMP (MGR);
CREATE INDEX idx_emp_job    ON EMP (JOB);


**Use Z-Ordering in Databricks on EMP by DEPTNO**

In [0]:
OPTIMIZE EMP
ZORDER BY (DEPTNO);


In [0]:
spark.sql("OPTIMIZE EMP ZORDER BY (DEPTNO)")


**Demonstrate rollback in case of failed transaction**

In [0]:
BEGIN TRANSACTION;
UPDATE EMP SET SAL = SAL * 1.1 WHERE DEPTNO = 10;

-- Suppose this fails (e.g., FK violation)
INSERT INTO EMP (EMPNO, ENAME, DEPTNO) VALUES (1, NULL, 99);

-- On error:
ROLLBACK;

-- Otherwise:
-- COMMIT;


In [0]:
BEGIN;
UPDATE EMP SET SAL = SAL * 1.1 WHERE DEPTNO = 10;

-- Simulate error or validation failure, then:
ROLLBACK;

-- If all good, use:
-- COMMIT;


In [0]:
spark.sql("BEGIN")
try:
    spark.sql("UPDATE EMP SET SAL = SAL * 1.1 WHERE DEPTNO = 10")
    # simulate failure
    raise Exception("validation failed")
    spark.sql("COMMIT")
except:
    spark.sql("ROLLBACK")


**Show how schema evolution works in Delta Lake (add column automatically)**

In [0]:
ALTER TABLE EMP ADD COLUMNS (BONUS DOUBLE);


In [0]:
from pyspark.sql.functions import lit

df = spark.table("EMP").withColumn("BONUS", lit(None).cast("double"))

# Allow merge/write to evolve schema
(df.write.format("delta")
   .mode("overwrite")
   .option("mergeSchema", "true")   # <- evolve new columns
   .saveAsTable("EMP"))


In [0]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

tgt = DeltaTable.forName(spark, "EMP")
src = spark.table("STG_EMP_WITH_BONUS")  # has a new column BONUS

(tgt.alias("t").merge(src.alias("s"), "t.EMPNO = s.EMPNO")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())
