### Read and Write en Delta Lake
1. Escribir datos en Delta Lake(Managed Table)
2. Escribir datos en Delta Lake(External Table)
3. Leer datos desde Delta Lake(Table)
4. Leer datos desde Delta Lake(File)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION "/mnt/moviehistory2025/demo"

In [0]:
from pyspark.sql.types import *

# definimos el schema
movie_schema = StructType( fields= [
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", StringType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
movie_df = spark.read \
    .option("header", True) \
    .schema(movie_schema) \
    .csv("/mnt/moviehistory2025/bronze/2024-12-30/movie.csv")

In [0]:
movie_df.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movies_managed")

In [0]:
%sql
select * from movie_demo.movies_managed limit 10

In [0]:
movie_df.write.format("delta").mode("overwrite").save("/mnt/moviehistory2025/demo/movies_external")

In [0]:
%sql
CREATE TABLE movie_demo.movies_external
USING DELTA
LOCATION "/mnt/moviehistory2025/demo/movies_external";

In [0]:
%sql
SELECT * FROM movie_demo.movies_external LIMIT 10;

In [0]:
movies_external_df = spark.read.format("delta").load("/mnt/moviehistory2025/demo/movies_external")
display(movies_external_df)

In [0]:
movie_df.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:
%sql
SHOW PARTITIONS movie_demo.movies_partitioned

### Update and Delete en Delta Lake
1. Update desde Delta Lake
2. Delete desde Delta Lake

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed;

In [0]:
%sql
UPDATE movie_demo.movies_managed
SET durationTime = 60
WHERE yearReleaseDate = 2012;

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2012;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistory2025/demo/movies_managed')

deltaTable.update(
    condition = "yearReleaseDate = 2013",
    set = {"durationTime": "100"}
)

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2013;

In [0]:
%sql
DELETE FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2014;

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2014;

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistory2025/demo/movies_managed')

deltaTable.delete("yearReleaseDate = 2015")

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate = 2015;

### Merge / Upsert en Delta Lake

In [0]:
from pyspark.sql.types import *

# definimos el schema
movie_schema = StructType( fields= [
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("budget", DoubleType(), True),
    StructField("homePage", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("yearReleaseDate", StringType(), True),
    StructField("releaseDate", DateType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("durationTime", IntegerType(), True),
    StructField("movieStatus", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("voteAverage", DoubleType(), True),
    StructField("voteCount", IntegerType(), True)
])

In [0]:
from pyspark.sql.functions import col

movie_day1_df = spark.read \
    .option("header", True) \
    .schema(movie_schema) \
    .csv("/mnt/moviehistory2025/bronze/2024-12-30/movie.csv") \
    .filter(col("yearReleaseDate").rlike("^\\d{4}$")) \
    .filter(col("yearReleaseDate").cast("int") < 2000) \
    .select("movieId", "title", "yearReleaseDate", "releaseDate", "durationTime")

In [0]:
display(movie_day1_df)

In [0]:
movie_day1_df.createOrReplaceTempView("movies_day1")

In [0]:
from pyspark.sql.functions import upper

movie_day2_df = spark.read \
                .option("header", True) \
                .schema(movie_schema) \
                .csv("/mnt/moviehistory2025/bronze/2024-12-30/movie.csv") \
                .filter("yearReleaseDate BETWEEN 1998 AND 2005") \
                .select("movieId", upper("title").alias("title"), "yearReleaseDate", "releaseDate", "durationTime")

In [0]:
display(movie_day2_df)

In [0]:
movie_day2_df.createOrReplaceTempView("movies_day2")

In [0]:
from pyspark.sql.functions import upper

movie_day3_df = spark.read \
                .option("header", True) \
                .schema(movie_schema) \
                .csv("/mnt/moviehistory2025/bronze/2024-12-30/movie.csv") \
                .filter("yearReleaseDate BETWEEN 1983 AND 1998 OR yearReleaseDate BETWEEN 2006 AND 2010") \
                .select("movieId", upper("title").alias("title"), "yearReleaseDate", "releaseDate", "durationTime")

In [0]:
display(movie_day3_df)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_merge(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)

#### Dia 1

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movies_day1 src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED THEN 
  INSERT ( movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate )
  VALUES ( movieId, title, yearReleaseDate, releaseDate, durationTime, current_timestamp )

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

#### Dia 2

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movies_day2 src
ON tgt.movieId = src.movieId
WHEN MATCHED THEN
  UPDATE SET
    tgt.title = src.title,
    tgt.yearReleaseDate = src.yearReleaseDate,
    tgt.releaseDate = src.releaseDate,
    tgt.durationTime = src.durationTime,
    tgt.updatedDate = current_timestamp
WHEN NOT MATCHED THEN 
  INSERT ( movieId, title, yearReleaseDate, releaseDate, durationTime, createdDate )
  VALUES ( movieId, title, yearReleaseDate, releaseDate, durationTime, current_timestamp )

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

#### Dia 3

In [0]:
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/mnt/moviehistory2025/demo/movies_merge')


deltaTablePeople.alias('tgt') \
  .merge(
    movie_day3_df.alias('src'),
    'tgt.movieId = src.movieId'
  ) \
  .whenMatchedUpdate(set =
    {
      "tgt.title": "src.title",
      "tgt.yearReleaseDate": "src.yearReleaseDate",
      "tgt.releaseDate": "src.releaseDate",
      "tgt.durationTime": "src.durationTime",
      "tgt.updatedDate": "current_timestamp()"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "movieId": "movieId",
      "title": "title",
      "yearReleaseDate": "yearReleaseDate",
      "releaseDate": "releaseDate",
      "durationTime": "durationTime",
      "createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

### History, Time Trave y Vacuum
1. Historia y Control de versiones
2. Viaje en el tiempo
3. Vacio

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 2;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-09-20T23:46:24.000+00:00'

In [0]:
df = spark.read.format("delta").option("timestampAsOf", '2025-09-20T23:46:24.000+00:00').load("/mnt/moviehistory2025/demo/movies_merge")
display(df)

In [0]:
%sql
VACUUM movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-09-20T23:46:24.000+00:00'

In [0]:
%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM movie_demo.movies_merge RETAIN 0 HOURS;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-09-20T23:46:24.000+00:00'

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
DELETE FROM  movie_demo.movies_merge
WHERE yearReleaseDate = 2004;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 9;

In [0]:
%sql
MERGE INTO movie_demo.movies_merge tgt
USING movie_demo.movies_merge VERSION AS OF 9 src
ON tgt.movieId = src.movieId
WHEN NOT MATCHED THEN INSERT *

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge;

### Transaction Log en Delta Lake

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_log(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING DELTA

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge
WHERE movieId = 125537;

In [0]:
%sql
SELECT * FROM movie_demo.movies_log;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge
WHERE movieId = 133575;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
%sql
DELETE FROM movie_demo.movies_log
WHERE movieId = 125537;

In [0]:
%sql
DESC HISTORY movie_demo.movies_log;

In [0]:
list = [118452, 124606, 125052, 125123, 125263, 125537, 126141, 133575, 142132, 146269, 157185]
for movieId in list:
    spark.sql(f"""INSERT INTO movie_demo.movies_log
              SELECT * FROM movie_demo.movies_merge
              WHERE movieId = {movieId}""")

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge

### Convertir formato "parquet" a  "delta"

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_convert_to_delta(
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING PARQUET

In [0]:
%sql
INSERT INTO movie_demo.movies_convert_to_delta
SELECT * FROM movie_demo.movies_merge;

In [0]:
%sql
CONVERT TO DELTA movie_demo.movies_convert_to_delta

In [0]:
df = spark.table("movie_demo.movies_convert_to_delta")
display(df)

In [0]:
df.write.format("parquet").save("/mnt/moviehistory2025/demo/movies_convert_to_delta_new")

In [0]:
%sql
CONVERT TO DELTA parquet.`/mnt/moviehistory2025/demo/movies_convert_to_delta_new`