In [0]:
spark

In [0]:
# Default catalog for Databricks

spark.conf.get("spark.sql.catalogImplementation")

In [0]:
%sql
show databases;

In [0]:
# Read sales Parquet data

df_sales = spark.read.parquet("/Volumes/dev/default/file_ingestion/users.parquet")
df_sales.show()
display(df_sales)

In [0]:
display(df_sales)

In [0]:
# Write data as hive table
df_sales.write.format("parquet").mode("overwrite").option("path", "dbfs:/data/output/sales_parquet_1").saveAsTable("hive_metastore.default.sales_parquet")

In [0]:
dbutils.fs.ls("dbfs:/data/output/sales_parquet_1/")

In [0]:
%sql
show tables in hive_metastore.default;

In [0]:
%sql
describe table extended hive_metastore.default.sales_parquet;

In [0]:
%sql
select * from hive_metastore.default.sales_parquet;

update hive_metastore.default.sales_parquet set salary = 0 where id = 1 -- this will throw error, as hive doesn't support DML operations

In [0]:
# write sales table as Delta table
df_sales.write.format("delta").mode("overwrite").option("path", "dbfs:/data/output/sales_delta_1").saveAsTable("hive_metastore.default.sales_delta")

In [0]:
%sql
show tables in hive_metastore.default;


In [0]:
%sql
describe table extended hive_metastore.default.sales_delta;

In [0]:
# view files for delta

display(dbutils.fs.ls("dbfs:/data/output/sales_delta_1/_delta_log/"))

In [0]:
display(dbutils.fs.head("dbfs:/data/output/sales_delta_1/_delta_log/00000000000000000000.json"))

In [0]:
%sql
describe history hive_metastore.default.sales_delta;

In [0]:
%sql 
update hive_metastore.default.sales_delta set salary = 0 where id = 1

In [0]:
# read particular version -- pyspark API

df_sales_delta = spark.read.table("hive_metastore.default.sales_delta")
# df_sales_delta = spark.read.format("delta").option("versionAsOf", 0).load("dbfs:/data/output/sales_delta_1")
# df_sales_delta = spark.read.format("delta").load("dbfs:/data/output/sales_delta_1/")
display(df_sales_delta)

In [0]:
# particular version

df_sales_delta = spark.read.table("hive_metastore.default.sales_delta@v0")
display(df_sales_delta.where("id = 1"))

In [0]:
%sql
-- particular version using sql
select * from hive_metastore.default.sales_delta@v0;

Schema Evolution in Delta Lake

In [0]:
%sql
select *, current_timestamp() as time_now from hive_metastore.default.sales_delta@v0 where id = 1;

In [0]:
df_new = spark.sql("select *, current_timestamp() as time_now from hive_metastore.default.sales_delta@v0 where id = 1;")
display(df_new)

In [0]:
# Append data to existing delta table

df_new.write.format("delta").mode("append").option("mergeSchema", "true").option("path", "dbfs:/data/output/sales_delta_1").saveAsTable("hive_metastore.default.sales_delta")

In [0]:
%sql

describe history hive_metastore.default.sales_delta;

In [0]:
%sql

select * from hive_metastore.default.sales_delta where id = 1;

Convert Parquet to Delta

In [0]:
from delta import DeltaTable

deltaTable = DeltaTable.forPath(spark, "dbfs:/data/output/sales_delta_1")

deltaTable.history().show()

In [0]:
# Converting a Parquet into Delta - Check and convert

#checking
DeltaTable.isDeltaTable(spark, "dbfs:/data/output/sales_parquet_1")
DeltaTable.isDeltaTable(spark, "dbfs:/data/output/sales_delta_1")

In [0]:
# converting
DeltaTable.convertToDelta(spark, "parquet.`dbfs:/data/output/sales_parquet_1`")

# validating
DeltaTable.isDeltaTable(spark, "dbfs:/data/output/sales_parquet_1")

display(dbutils.fs.ls("dbfs:/data/output/sales_parquet_1"))

In [0]:
%sql
describe table extended hive_metastore.default.sales_parquet

In [0]:
%sql
-- convert metadata to delta as well
CONVERT TO DELTA hive_metastore.default.sales_parquet;
    

In [0]:
%sql
describe history hive_metastore.default.sales_delta;

In [0]:
%sql
-- Restoring back to a specific version
RESTORE TABLE hive_metastore.default.sales_delta TO VERSION AS OF 1;

In [0]:
dt = DeltaTable.forName(spark, "hive_metastore.default.sales_delta")
dt.vacuum(0)

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

dt.vacuum(0)

In [0]:
%sql
-- list catalogs
SHOW CATALOGS;

SHOW CATALOGS LIKE 'dev*';

-- list Schemas & Tables

show schemas in dev;
show schemas in dev like 'in*';
show tables in dev.bronze;
show tables in dev.bronze like 'sale*';

In [0]:
# Check if table exists using pyspark

spark.catalog.tableExists("dev.bronze.sale_external")

In [0]:
%sql
-- IF NOT EXISTS clause
CREATE TABLE IF NOT EXISTS dev.bronze.emp (
  emp_id INT,
  emp_name STRING,
  dept_code STRING,
  salary DOUBLE
);

CREATE SCHEMA IF NOT EXISTS dev.bronze;

In [0]:
%sql
INSERT INTO dev.bronze.emp VALUES (1001, 'John', 'D101', 100000);
INSERT INTO dev.bronze.emp VALUES (1002, 'Mary', 'D102', 120000);
INSERT INTO dev.bronze.emp VALUES (1003, 'Jane', 'D103', 150000);
INSERT INTO dev.bronze.emp VALUES (1004, 'Bob', 'D104', 180000);
    

    
-- Create Temporary & Permanent Views

In [0]:
%sql
-- Create Temporary & Permanent Views
CREATE OR REPLACE TEMP VIEW emp_temp_view 
AS 
SELECT * FROM dev.bronze.emp
WHERE dept_code = 'D101';

-- select * from emp_temp_view;

In [0]:
%sql
CREATE OR REPLACE VIEW dev.bronze.emp_view 
AS 
SELECT * FROM dev.bronze.emp
WHERE dept_code = 'D102';

In [0]:
%sql
-- select * from dev.bronze.emp_temp_view;
    
select * from dev.bronze.emp_view;

In [0]:
%sql
-- CTAS for delta table

CREATE OR REPLACE TABLE dev.bronze.emp_ctas
AS
SELECT * FROM dev.bronze.emp;

In [0]:
%sql
describe table extended dev.bronze.emp_ctas;

In [0]:
%sql
describe history dev.bronze.emp_ctas;

In [0]:
%sql
-- DEEP CLONE in delta table - metadata & data

-- CREATE TABLE dev.bronze.emp_dc DEEP CLONE dev.bronze.emp;
    
describe table extended dev.bronze.emp_dc;

describe history dev.bronze.emp_dc;

In [0]:
%sql
-- SHALLOW CLONE in delta table - metadata only

-- CREATE TABLE dev.bronze.emp_sc SHALLOW CLONE dev.bronze.emp;

describe table extended dev.bronze.emp_sc;

describe history dev.bronze.emp_sc;

select * from dev.bronze.emp_sc;