###Read & Write en Delta Lake
1. Escribir datos en Delta Lake(Managed Table)
2. Escribir datos en Delta Lake(External Table)
3. Leer dados desde Delta Lake(Table)
4. Leer dados desde Delta Lake(File)

In [0]:
%sql
--DROP SCHEMA movie_demo cascade
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION '/mnt/moviehistoryadilmor/demo'

In [0]:
from pyspark.sql.types import *

In [0]:
movie_schema = StructType(fields=[
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homepage", StringType(), True),    
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True), 
    StructField("yearReleaseDate", IntegerType(), True),  
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_df = spark.read \
  .option("header", True) \
  .schema(movie_schema) \
  .csv("/mnt/moviehistoryadilmor/bronze/2024-12-30/movie.csv")

In [0]:
movie_df.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movie_manager")

In [0]:
%sql
SELECT * FROM movie_demo.movie_manager
--drop table movie_demo.movie_manager

In [0]:
movie_df.write.format("delta").mode("overwrite").save("/mnt/moviehistoryadilmor/demo/movies_enternal")

In [0]:
%sql
CREATE TABLE movie_demo.movies_external
USING DELTA
LOCATION "/mnt/moviehistoryadilmor/demo/movies_enternal"

In [0]:
%sql
--ALTER TABLE movie_demo.movies_external SET TBLPROPERTIES (
--   'delta.columnMapping.mode' = 'name',
--   'delta.minReaderVersion' = '2',
--   'delta.minWriterVersion' = '5');
ALTER TABLE movie_demo.movies_external DROP COLUMN movieStatus;

In [0]:
%sql
DROP TABLE movie_demo.movies_external

In [0]:
%sql
--SELECT * FROM movie_demo.movies_external
DESC EXTENDED movie_demo.movies_external

In [0]:
movies_external_df = spark.read.format("delta").load("/mnt/moviehistoryadilmor/demo/movies_enternal")

In [0]:
movie_df.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:
%sql
--DESCRIBE EXTENDED movie_demo.movies_partitioned
SHOW PARTITIONS movie_demo.movies_partitioned

###Update & Delete en Delta Lake
1. Update desde Delta Lake
2. Delete desde Delta Lake

In [0]:
%sql
UPDATE movie_demo.movie_manager
  SET durationTime = 60
WHERE yearReleaseDate = 2012;

In [0]:
%sql
SELECT * FROM movie_demo.movie_manager
WHERE yearReleaseDate = 2012

In [0]:
%python
#from delta.tables import *

# Check if the table is a Delta table
#if not DeltaTable.isDeltaTable(spark, '/mnt/moviehistoryadilmor/demo/movies_managed'):
#    # Convert the Parquet table to a Delta table
#    df = spark.read.parquet('/mnt/moviehistoryadilmor/demo/movies_managed')
#    df.write.format("delta").mode("overwrite").save('/mnt/moviehistoryadilmor/demo/movies_managed')

# Load the Delta table
deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistoryadilmor/demo/movie_manager')

# Update the records
deltaTable.update(
    condition="yearReleaseDate = 2013",
    set={"durationTime": "100"}
)

In [0]:
%sql
SELECT * FROM movie_demo.movie_manager
WHERE yearReleaseDate = 2013

In [0]:
%sql
DELETE FROM movie_demo.movie_manager
WHERE yearReleaseDate = 2014;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistoryadilmor/demo/movie_manager')

# Delete the records
deltaTable.delete("yearReleaseDate = 2015")

In [0]:
%sql
SELECT * FROM movie_demo.movie_manager
WHERE yearReleaseDate = 2015;

###Merge / UpSert en Delta Lake

In [0]:
from pyspark.sql.types import *

In [0]:
movie_schema = StructType(fields=[
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homepage", StringType(), True),    
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True), 
    StructField("yearReleaseDate", IntegerType(), True),  
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_day1_df = spark.read \
  .option("header", True) \
  .schema(movie_schema) \
  .csv("/mnt/moviehistoryadilmor/bronze/2024-12-30/movie.csv") \
  .filter("yearReleaseDate < 2000") \
  .select("movieId", "title", "yearReleaseDate", "releaseDate","durationTime")

In [0]:
movie_day1_df.createOrReplaceTempView("movie_day1")

In [0]:
from pyspark.sql.functions import upper

movie_day2_df = spark.read \
  .option("header", True) \
  .schema(movie_schema) \
  .csv("/mnt/moviehistoryadilmor/bronze/2024-12-30/movie.csv") \
  .filter("yearReleaseDate BETWEEN 1998 AND 2005") \
  .select("movieId" \
          , upper("title").alias("title") \
          , "yearReleaseDate" \
          , "releaseDate" \
          ,"durationTime")

In [0]:
movie_day2_df.createOrReplaceTempView("movie_day2")

In [0]:
# Gera o DF3
movie_day3_df = spark.read \
  .option("header", True) \
  .schema(movie_schema) \
  .csv("/mnt/moviehistoryadilmor/bronze/2024-12-30/movie.csv") \
  .filter("(yearReleaseDate BETWEEN 1983 AND 1998 ) OR (yearReleaseDate BETWEEN 2006 AND 2010)" ) \
  .select("movieId" \
          , upper("title").alias("title") \
          , "yearReleaseDate" \
          , "releaseDate" \
          ,"durationTime")

In [0]:
#movie_day3_df.createOrReplaceTempView("movie_day3)"
spark.catalog.dropTempView("movie_day3");

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movie_merge(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING DELTA

#### Day 1

In [0]:
%sql
MERGE INTO movie_demo.movie_merge AS tgt
USING movie_day1 AS src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_date()
WHEN NOT MATCHED THEN
  INSERT (movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate)
  VALUES (src.movieId, 
          src.title, 
          src.yearReleaseDate, 
          src.releaseDate, 
          src.durationTime, 
          current_date()
          )

In [0]:
%sql
SELECT * FROM movie_demo.movie_merge

#### Day2

In [0]:
%sql
MERGE INTO movie_demo.movie_merge AS tgt
USING movie_day2 AS src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_date()
WHEN NOT MATCHED THEN
  INSERT (movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate)
  VALUES (src.movieId, 
          src.title, 
          src.yearReleaseDate, 
          src.releaseDate, 
          src.durationTime, 
          current_date()
          )

In [0]:
%sql
SELECT * FROM movie_demo.movie_merge

#### Day3 com PySpark

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date

# Carrega a tabela Delta de destino
#delta_target = DeltaTable.forName(spark, "movie_demo.movie_merge")
delta_target = DeltaTable.forPath(spark, "/mnt/moviehistoryadilmor/demo/movie_merge")

# DataFrame de origem
#source_df = spark.table("movie_day3")
source_df = movie_day3_df

# Executa o MERGE
delta_target.alias("tgt").merge(
    source_df.alias("src"),
    "tgt.movieId = src.movieId"
).whenMatchedUpdate(set={
    "title": "src.title",
    "yearReleaseDate": "src.yearReleaseDate",
    "releaseDate": "src.releaseDate",
    "durationTime": "src.durationTime",
    "updatedDate": "current_date()"
}).whenNotMatchedInsert(values={
    "movieId": "src.movieId",
    "title": "src.title",
    "yearReleaseDate": "src.yearReleaseDate",
    "releaseDate": "src.releaseDate",
    "durationTime": "src.durationTime",
    "createdDate": "current_date()"
}).execute()


In [0]:
%sql
SELECT * FROM movie_demo.movie_merge

### History, Time Travel y Vacuum

1. Historia y control de versiones
2. Viaje en el Tiempo
3. Vacio

In [0]:
%sql
DESC HISTORY movie_demo.movie_merge

In [0]:
%sql
-- Verificando o conteúdo da tabela após a 1ª inserção de registros
SELECT * FROM movie_demo.movie_merge VERSION AS OF 1;

In [0]:
%sql
-- Verificando o conteúdo da tabela em um momento específico do tempo, neste caso versão 1
SELECT * FROM movie_demo.movie_merge TIMESTAMP AS OF '2025-05-11T15:26:26.000+00:00';

In [0]:
#Gerando um dataframe com os dados equivalente a versão 1
df = spark.read.format("delta").option("timestampAsOf", "2025-05-11T15:26:26.000+00:00").load("/mnt/moviehistoryadilmor/demo/movie_merge")

In [0]:
df.display()

In [0]:
%sql
-- Eliminando permamentemente os registros de uma tabela
--VACUUM movie_demo.movie_merge -- Somente este comando retem os dados por default 7 dias
--
-- Setando esta opção e executando o comando abaixo, os registros de história são eliminados
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM movie_demo.movie_merge RETAIN 0 HOURS

In [0]:
%sql
-- Verificando o conteúdo da tabela em um momento específico do tempo, neste caso versão 1
SELECT * FROM movie_demo.movie_merge TIMESTAMP AS OF '2025-05-11T15:26:26.000+00:00';

In [0]:
%sql
DESC HISTORY movie_demo.movie_merge

In [0]:
%sql
DELETE FROM movie_demo.movie_merge
WHERE yearReleaseDate = 2004;

In [0]:
%sql
SELECT * FROM movie_demo.movie_merge VERSION AS OF 9
WHERE yearReleaseDate = 2004;

In [0]:
%sql
-- Recuperando os 4 registros deletados
MERGE INTO movie_demo.movie_merge tgt
USING movie_demo.movie_merge VERSION AS OF 9 src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

### Transaction Log en Delta Lake

##### Por padrão, o Delta Lake mantém o histórico por 30 dias, mas esse valor pode ser configurado.
#####delta.logRetentionDuration – controla quanto tempo o log de transações (Delta Log) é mantido.
#####delta.deletedFileRetentionDuration – controla quanto tempo os arquivos de dados marcados como excluídos são mantidos antes da limpeza (vacuum).


In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movie_log(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING DELTA

In [0]:
%sql
DESC HISTORY movie_demo.movie_log;

In [0]:
%sql
INSERT INTO movie_demo.movie_log
SELECT * FROM movie_demo.movie_merge
WHERE movieId = 125537

In [0]:
%sql DESC HISTORY movie_demo.movie_log

In [0]:
%sql
DELETE FROM movie_demo.movie_log
WHERE movieId = 125537

In [0]:
%sql DESC HISTORY movie_demo.movie_log

In [0]:
# Este comando gera várias arquivos parquet no storage e na pasta log para cada versão
List = [118452, 124606, 125052, 12523, 125623]
for movieId in List:
  spark.sql(f"""INSERT INTO movie_demo.movie_log
            SELECT * FROM movie_demo.movie_merge 
            WHERE movieId = {movieId}""")

In [0]:
%sql
--Este comando gera um único arquivos parquet no storage e na pasta log
INSERT INTO movie_demo.movie_log
SELECT * FROM movie_demo.movie_merge

### Convertir formato "Parquet" a "Delta"

In [0]:
%sql
-- Quando criamos uma tabela parquet, o databricks somente cria uma pasta vazia no storage, diferente da delta
CREATE TABLE IF NOT EXISTS movie_demo.movie_convert_to_delta(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING PARQUET

In [0]:
%sql
-- Esta instruão vai gerar os arquivos parquet no storage, sem controle de versão
INSERT INTO movie_demo.movie_convert_to_delta
SELECT * FROM movie_demo.movie_merge

In [0]:
%sql
select * from _sqldf ;

In [0]:
%sql
-- Este comando vai converter a tabela e gerar a pasta delta_log no storage, permitindo o controle de versão
CONVERT TO DELTA movie_demo.movie_convert_to_delta;

In [0]:
df = spark.table("movie_demo.movie_convert_to_delta")
df.display()

In [0]:
df.write.format("parquet").save("/mnt/moviehistoryadilmor/demo/movie_convert_to_delta_new")

In [0]:
%sql
CONVERT TO DELTA parquet.`/mnt/moviehistoryadilmor/demo/movie_convert_to_delta_new`