## Lecturas y escrituras en Delta Lake
##### 1. Escribir datos en Delta Lake (tabla administrada)
##### 2. Escribir datos en Delta Lake (tabla externa)
##### 3. Leer datos en Delta Lake (tabla)
##### 4. Leer datos en Delta Lake (fichero)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS movie_demo
LOCATION "/mnt/moviehistory/demo"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

df_movie_schema = StructType([
    StructField('movieId', IntegerType(), False),
    StructField('title', StringType(), True),
    StructField('budget', DoubleType(), True),
    StructField('homePage', StringType(), True),
    StructField('overview', StringType(), True),
    StructField('popularity', DoubleType(), True),
    StructField('yearReleaseDate', IntegerType(), True),
    StructField('releaseDate', DateType(), True),
    StructField('revenue', DoubleType(), True),
    StructField('durationTime', IntegerType(), True),
    StructField('movieStatus', StringType(), True),
    StructField('tagline', StringType(), True),
    StructField('voteAverage', DoubleType(), True),
    StructField('voteCount', IntegerType(), True),
    ])

In [0]:
df_movie = spark.read.option("header",True).schema(df_movie_schema).csv("/mnt/moviehistory/demo/movie.csv")

In [0]:
df_movie.display()

In [0]:
df_movie.write.format("delta").mode("overwrite").saveAsTable("movie_demo.movies_managed")

In [0]:
df_movie.write.format("delta").mode("overwrite").save("/mnt/moviehistory/demo/movies_external")

In [0]:
%sql
CREATE TABLE movie_demo.movies_external 
USING DELTA
LOCATION "/mnt/moviehistory/demo/movies_external"
  

In [0]:
%sql
SELECT * FROM movie_demo.movies_external 

In [0]:
df_movie_external = spark.read.format("delta").load("/mnt/moviehistory/demo/movies_external")

In [0]:
display(df_movie_external)

In [0]:
df_movie_external.write.format("delta").mode("overwrite").partitionBy("yearReleaseDate").saveAsTable("movie_demo.movies_partitioned")

In [0]:
%sql
SHOW PARTITIONS movie_demo.movies_partitioned

### Update y Delete en Delta Lake
##### 1. Update en Delta Lake
##### 2. Deleta en Delta Lake

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed

In [0]:
%sql
UPDATE movie_demo.movies_managed
SET durationTime=60
WHERE yearReleaseDate=2012

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate=2012

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistory/demo/movies_managed')

deltaTable.update(
  condition = "yearReleaseDate = 2013",
  set = { "durationTime": "'100'" }
)

In [0]:
%sql
SELECT * FROM movie_demo.movies_managed
WHERE yearReleaseDate=2013

## DELETE

In [0]:
%sql
SELECT COUNT(1) FROM movie_demo.movies_managed
WHERE yearReleaseDate=2015

In [0]:
%sql
DELETE FROM movie_demo.movies_managed
WHERE yearReleaseDate=2014

In [0]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/mnt/moviehistory/demo/movies_managed')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("yearReleaseDate = 2015")

### Merge/Upsert en Delta Lake

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

df_movie_schema = StructType([
    StructField('movieId', IntegerType(), False),
    StructField('title', StringType(), True),
    StructField('budget', DoubleType(), True),
    StructField('homePage', StringType(), True),
    StructField('overview', StringType(), True),
    StructField('popularity', DoubleType(), True),
    StructField('yearReleaseDate', IntegerType(), True),
    StructField('releaseDate', DateType(), True),
    StructField('revenue', DoubleType(), True),
    StructField('durationTime', IntegerType(), True),
    StructField('movieStatus', StringType(), True),
    StructField('tagline', StringType(), True),
    StructField('voteAverage', DoubleType(), True),
    StructField('voteCount', IntegerType(), True),
    ])

In [0]:
df_movies_day1 = spark.read.option("header",True).schema(df_movie_schema).csv("/mnt/moviehistorydev/bronze/2024-12-30/movie.csv").filter("yearReleaseDate<2000")\
    .select("movieId","title","yearReleaseDate","releaseDate","durationTime")

In [0]:
df_movies_day1.display()

In [0]:
from pyspark.sql.functions import upper

df_movies_day2 = spark.read.option("header",True).schema(df_movie_schema).csv("/mnt/moviehistorydev/bronze/2024-12-30/movie.csv").filter("yearReleaseDate BETWEEN 1998 AND 2005")\
    .select("movieId",upper("title").alias("title"),"yearReleaseDate","releaseDate","durationTime")

In [0]:
df_movies_day2.display()

In [0]:
df_movies_day3 = spark.read.option("header",True).schema(df_movie_schema).csv("/mnt/moviehistorydev/bronze/2024-12-30/movie.csv").filter("yearReleaseDate BETWEEN 1983 AND 1998 OR yearReleaseDate BETWEEN 2006 AND 2010")\
    .select("movieId",upper("title").alias("title"),"yearReleaseDate","releaseDate","durationTime")

In [0]:
df_movies_day3.display()

In [0]:
df_movies_day1.createOrReplaceTempView("vw_movies_day1")
df_movies_day2.createOrReplaceTempView("vw_movies_day2")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_merge (
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)

In [0]:
%sql
MERGE INTO movie_demo.movies_merge target
USING vw_movies_day1 source
ON target.movieId = source.movieId
WHEN MATCHED THEN
  UPDATE SET
    target.title = source.title,
    target.yearReleaseDate = source.yearReleaseDate,
    target.releaseDate = source.releaseDate,
    target.durationTime = source.durationTime,
    target.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (
    movieId,title,yearReleaseDate,releaseDate,durationTime,createdDate
  )
  VALUES (
    movieId,title,yearReleaseDate,releaseDate,durationTime,current_timestamp
  )

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

## DAY 2

In [0]:
%sql
MERGE INTO movie_demo.movies_merge target
USING vw_movies_day2 source
ON target.movieId = source.movieId
WHEN MATCHED THEN
  UPDATE SET
    target.title = source.title,
    target.yearReleaseDate = source.yearReleaseDate,
    target.releaseDate = source.releaseDate,
    target.durationTime = source.durationTime,
    target.updatedDate = current_timestamp
WHEN NOT MATCHED
  THEN INSERT (
    movieId,title,yearReleaseDate,releaseDate,durationTime,createdDate
  )
  VALUES (
    movieId,title,yearReleaseDate,releaseDate,durationTime,current_timestamp
  )

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

### DAY 3

In [0]:
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/mnt/moviehistory/demo/movies_merge')

#dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('target') \
  .merge(
    df_movies_day3.alias('source'),
    'target.movieId = source.movieId'
  ) \
  .whenMatchedUpdate(set =
    {
      "target.title": "source.title",
      "target.yearReleaseDate": "source.yearReleaseDate",
      "target.releaseDate": "source.releaseDate",
      "target.durationTime": "source.durationTime",
      "target.updatedDate": "current_timestamp()"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "movieId": "movieId",
      "title": "title",
      "yearReleaseDate": "yearReleaseDate",
      "releaseDate": "releaseDate",
      "durationTime": "durationTime",
      "createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

### History, Time Travel y Vacuum
1. Historial y control de versiones
2. Viaje en el tiempo
3. Vacuum

In [0]:
%sql
DESC HISTORY movie_demo.movies_merge;

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 2

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge TIMESTAMP AS OF '2025-06-03T20:54:52.000+00:00'

In [0]:
%sql
SET spark.databricks.delta.retentionDurationCheck.enabled=false;
VACUUM movie_demo.movies_merge RETAIN 0 HOURS

In [0]:
df = spark.read.format("delta").option("timeStampAsOf","2025-06-03T20:54:52.000+00:00").load("/mnt/moviehistory/demo/movies_merge")

In [0]:
display(df)

In [0]:
%sql
DELETE FROM movie_demo.movies_merge
WHERE yearReleaseDate=2004

In [0]:
%sql
DESCRIBE HISTORY movie_demo.movies_merge

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge VERSION AS OF 9

In [0]:
%sql
MERGE INTO movie_demo.movies_merge target
USING movie_demo.movies_merge VERSION AS OF 9 source
ON target.movieId = source.movieId
WHEN NOT MATCHED
  THEN INSERT *

In [0]:
%sql
SELECT * FROM movie_demo.movies_merge

### Transaction Log en Delta Lake

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_log (
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING DELTA

In [0]:
%sql
DESCRIBE HISTORY movie_demo.movies_log

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge
WHERE movieId=125537

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge
WHERE movieId=133575

In [0]:
%sql
DELETE FROM movie_demo.movies_log
WHERE movieId=125537

In [0]:
list=[118452,124606,125052,125123,125263,125537,126141,33575,142132,146269,157185]
for movieId in list:
    spark.sql(f"""INSERT INTO movie_demo.movies_log
              SELECT * FROM movie_demo.movies_merge
              WHERE movieId={movieId}""")

In [0]:
%sql
INSERT INTO movie_demo.movies_log
SELECT * FROM movie_demo.movies_merge

## Convertir formato Parquet a Delta

In [0]:
%sql
CREATE TABLE IF NOT EXISTS movie_demo.movies_convert_to_delta (
  movieId INT,
  title STRING,
  yearReleaseDate INT,
  releaseDate DATE,
  durationTime INT,
  createdDate DATE,
  updatedDate DATE
)
USING PARQUET

In [0]:
%sql
INSERT INTO movie_demo.movies_convert_to_delta
SELECT * FROM movie_demo.movies_merge

In [0]:
%sql
CONVERT TO DELTA movie_demo.movies_convert_to_delta

In [0]:
df = spark.table("movie_demo.movies_convert_to_delta")

In [0]:
display(df)

In [0]:
df.write.format("parquet").save("/mnt/moviehistory/demo/movies_convert_to_delta_new")

In [0]:
%sql
CONVERT TO DELTA parquet.`/mnt/moviehistory/demo/movies_convert_to_delta_new`