### Read & Write en Delta Lake
1. Escribir datos en Delta Lake(Managed Table)
2. Escribir datos en Delta Lake(External Table)
3. Leer datos desde Delta Lake(Table)
4. Leer datos desde Delta Lake(File)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION "/mnt/historialpeliculas/demo"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

In [0]:
movie_schema = StructType(fields=[
    StructField("movieId", IntegerType(), False),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homepage", StringType(), True  ),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", IntegerType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_df = spark.read.option("header", "true").schema(movie_schema).csv("/mnt/historialpeliculas/bronze/2024-12-30/movie.csv")

In [0]:
display(movie_df)

1. https://docs.delta.io/latest/index.html
2. https://docs.databricks.com/aws/en/delta
3. https://learn.microsoft.com/en-us/azure/databricks/delta/

In [0]:
movie_df.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movies_managed")

In [0]:
%sql
SELECT *
FROM movie_demo.movies_managed;

In [0]:
movie_df.write.format("delta").mode("overwrite").save("/mnt/historialpeliculas/demo/movies_external")

In [0]:
%sql
CREATE TABLE movie_demo.movies_external
USING DELTA
LOCATION "/mnt/historialpeliculas/demo/movies_external"
  

In [0]:
%sql
SELECT *
FROM movie_demo.movies_external;

In [0]:
movies_external_df = spark.read.format("delta").load("/mnt/historialpeliculas/demo/movies_external")

In [0]:
display(movies_external_df)

In [0]:
movie_df.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:
%sql
SHOW PARTITIONS movie_demo.movies_partitioned;

### Update & Delete en Delta Lake
1. Update desde Delta Lake
2. Delete desde Delta Lake

https://docs.delta.io/latest/delta-update.html

In [0]:
%sql
SELECT *
FROM movie_demo.movies_managed;

In [0]:
%sql
UPDATE movie_demo.movies_managed 
SET durationTime = 60
WHERE yearReleaseDate = 2012;

In [0]:
%sql
SELECT *
FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2012;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/historialpeliculas/demo/movies_managed')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "yearReleaseDate = 2013",
  set = { "durationTime": "100" }
)

In [0]:
%sql
SELECT *
FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2013;

In [0]:
%sql
DELETE 
FROM movie_demo.movies_managed 
WHERE yearReleaseDate = 2014;

In [0]:
%sql
SELECT * 
FROM movie_demo.movies_managed 
WHERE yearReleaseDate = 2014;

In [0]:
deltaTable = DeltaTable.forPath(spark, '/mnt/historialpeliculas/demo/movies_managed')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("yearReleaseDate = 2015")

In [0]:
%sql
SELECT * 
FROM movie_demo.movies_managed 
WHERE yearReleaseDate = 2015;

### Merge/Upsert en Delta Lake

In [0]:
movies_day1_df = spark.read.option("header", "true").schema(movie_schema).csv("/mnt/historialpeliculas/bronze/2024-12-30/movie.csv").filter("yearReleaseDate < 2000").select("movieId", "title", "yearReleaseDate","releaseDate","durationTime")

In [0]:
display(movies_day1_df)

In [0]:
movies_day1_df.createOrReplaceTempView("movies_day1")

In [0]:
from pyspark.sql.functions import upper

movies_day2_df = spark.read.option("header", "true").schema(movie_schema).csv("/mnt/historialpeliculas/bronze/2024-12-30/movie.csv").filter("yearReleaseDate between 1998 and 2005").select("movieId", upper("title").alias("title"), "yearReleaseDate","releaseDate","durationTime")

In [0]:
display(movies_day2_df)

In [0]:
movies_day2_df.createOrReplaceTempView("movies_day2")

In [0]:
movies_day3_df = spark.read.option("header", "true").schema(movie_schema).csv("/mnt/historialpeliculas/bronze/2024-12-30/movie.csv").filter("yearReleaseDate BETWEEN 1983 AND 1998 OR yearReleaseDate BETWEEN 2006 AND 2010").select("movieId", upper("title").alias("title"), "yearReleaseDate", "releaseDate", "durationTime")

In [0]:
display(movies_day3_df)

https://docs.delta.io/latest/delta-update.html

In [0]:
%sql
create table if not exists movie_demo.movies_merge(
  movieId int, 
  title string, 
  yearReleaseDate int, 
  releaseDate date, 
  durationTime int,
  createdDate date,
  updatedDate date
)

#### Dia 1

In [0]:
%sql
MERGE INTO movie_demo.movies_merge AS tgt
USING movies_day1 AS src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate)
    VALUES (src.movieId, src.title, src.yearReleaseDate, src.releaseDate, src.durationTime, current_timestamp)

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge;

#### Dia 2

In [0]:
%sql
MERGE INTO movie_demo.movies_merge AS tgt
USING movies_day2 AS src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate)
    VALUES (src.movieId, src.title, src.yearReleaseDate, src.releaseDate, src.durationTime, current_timestamp)

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge;

#### Dia 3

In [0]:
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/mnt/historialpeliculas/demo/movies_merge')


deltaTablePeople.alias('tgt') \
  .merge(
    movies_day3_df.alias('src'),
    'tgt.movieId = src.movieId'
  ) \
  .whenMatchedUpdate(set =
    {
      "tgt.title": "src.title",
      "tgt.yearReleaseDate": "src.yearReleaseDate",
      "tgt.releaseDate": "src.releaseDate",
      "tgt.durationTime": "src.durationTime",
      "tgt.updatedDate": "current_timestamp()"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "tgt.movieId": "src.movieId",
      "tgt.title": "src.title",
      "tgt.yearReleaseDate": "src.yearReleaseDate",
      "tgt.releaseDate": "src.releaseDate",
      "tgt.durationTime": "src.durationTime",
      "tgt.createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge;

### History, Time Travel y Vacuum
1. Historia y Control de Versiones
2. Viaje en el Tiempo
3. Vacio 

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
select *
from movie_demo.movies_merge version as of 2;

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-07-04T16:09:08.000+00:00';

In [0]:
df = spark.read.format("delta").option("timestampAsOf",'2025-07-04T16:09:08.000+00:00').load("/mnt/historialpeliculas/demo/movies_merge")#guardo en un df el historial de la fecha especificada

In [0]:
display(df)

In [0]:
%sql
vacuum movie_demo.movies_merge;--Elimino toda la historia de la tabla

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-07-04T16:09:08.000+00:00';-- No se borro porque Databricks por default guarda la historia por 7 dias.

In [0]:
%sql
set spark.databricks.delta.retentionDurationCheck.enabled = false;--Fuerzo el borrado
vacuum movie_demo.movies_merge retain 0 hours;--Elimino toda la historia de la tabla, sin esperar el tiempo de DataBricks

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-07-04T16:09:08.000+00:00';--Confirmo que se borro, por eso el msj de error

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge;

In [0]:
%sql
desc history movie_demo.movies_merge;--Veo la historia pero no puedo acceder a ella.

In [0]:
%sql
delete
from movie_demo.movies_merge
where yearReleaseDate = 2004;

In [0]:
%sql
select *
from movie_demo.movies_merge

In [0]:
%sql
desc history movie_demo.movies_merge;

In [0]:
%sql
SELECT *
FROM movie_demo.movies_merge VERSION AS OF 9;

In [0]:
%sql
merge into movie_demo.movies_merge tgt
using movie_demo.movies_merge VERSION AS OF 9 src
on tgt.movieId = src.movieId
when not matched then 
  insert *;--Recupero los datos borrados y los vuelvo a insertar

In [0]:
%sql
desc history movie_demo.movies_merge;

In [0]:
%sql
select *
from movie_demo.movies_merge;

### Transaction Log en Delta Lake

In [0]:
%sql
create table if not exists movie_demo.movies_log(
  movieId int, 
  title string, 
  yearReleaseDate int, 
  releaseDate date, 
  durationTime int,
  createdDate date,
  updatedDate date
)
using delta

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT *
FROM movie_demo.movies_merge
where movieId = 125537;

In [0]:
%sql
SELECT *
FROM movie_demo.movies_log;
    


In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT *
FROM movie_demo.movies_merge
where movieId = 133575;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
DELETE
FROM movie_demo.movies_log
WHERE movieId = 125537;
    


In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
list = [118452,124606,125052,125123,125263,125537,126141,133575,142132,146269,157185]
for movieId in list:
  spark.sql(f"""INSERT INTO movie_demo.movies_log
            SELECT *
            FROM movie_demo.movies_merge
            WHERE movieId = {movieId}""")


In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT *
FROM movie_demo.movies_merge


### Convertir formato "Parquet" a "Delta"

In [0]:
%sql
create table if not exists movie_demo.movies_convert_to_delta(
  movieId int, 
  title string, 
  yearReleaseDate int, 
  releaseDate date, 
  durationTime int,
  createdDate date,
  updatedDate date
)
using parquet

In [0]:
%sql
insert into movie_demo.movies_convert_to_delta
select *
from movie_demo.movies_merge;

In [0]:
%sql
convert to delta movie_demo.movies_convert_to_delta; 

In [0]:
df = spark.table("movie_demo.movies_convert_to_delta")


In [0]:
display(df)

In [0]:
df.write.format("parquet").save("/mnt/historialpeliculas/demo/movies_convert_to_delta_new")

In [0]:
%sql
convert to delta parquet.`/mnt/historialpeliculas/demo/movies_convert_to_delta_new`;